use crate::adapter::Adapter;
use crate::adapter::local_adapter::LocalAdapter;
use crate::adapter::nats_adapter::{NatsAdapter, NatsAdapterConfig};
use crate::adapter::redis_adapter::{RedisAdapter, RedisAdapterConfig as RedisAdapterOptions};
use crate::adapter::redis_cluster_adapter::{RedisClusterAdapter, RedisClusterAdapterConfig};
use crate::error::Result;
use crate::options::{AdapterConfig, AdapterDriver, DatabaseConfig}; use tracing::{info, warn};
pub struct AdapterFactory;
impl AdapterFactory {
pub async fn create(
config: &AdapterConfig,
db_config: &DatabaseConfig,
debug_enabled: bool,
) -> Result<Box<dyn Adapter + Send + Sync>> {
info!(
"{}",
format!("Initializing Adapter with driver: {:?}", config.driver)
);
match config.driver {
AdapterDriver::Redis => {
let redis_url = config
.redis
.redis_pub_options
.get("url")
.and_then(|v| v.as_str())
.map(String::from)
.unwrap_or_else(|| {
format!("redis://{}:{}", db_config.redis.host, db_config.redis.port)
});
let adapter_options = RedisAdapterOptions {
url: redis_url,
prefix: config.redis.prefix.clone(),
request_timeout_ms: config.redis.requests_timeout,
use_connection_manager: true,
cluster_mode: config.redis.cluster_mode,
};
match RedisAdapter::new(adapter_options).await {
Ok(adapter) => Ok(Box::new(adapter)),
Err(e) => {
warn!(
"{}",
format!(
"Failed to initialize Redis adapter: {}, falling back to local adapter",
e
)
);
Ok(Box::new(LocalAdapter::new()))
}
}
}
AdapterDriver::RedisCluster => {
let nodes = if !config.cluster.nodes.is_empty() {
config.cluster.nodes.clone()
} else {
db_config
.redis
.cluster_nodes
.iter()
.map(|node| format!("redis://{}:{}", node.host, node.port))
.collect()
};
if nodes.is_empty() {
warn!("{}", "Redis Cluster Adapter selected, but no nodes configured. Falling back to local adapter.".to_string());
return Ok(Box::new(LocalAdapter::new()));
}
let cluster_adapter_config = RedisClusterAdapterConfig {
nodes,
prefix: config.cluster.prefix.clone(),
request_timeout_ms: config.cluster.request_timeout_ms,
use_connection_manager: config.cluster.use_connection_manager,
};
match RedisClusterAdapter::new(cluster_adapter_config).await {
Ok(adapter) => Ok(Box::new(adapter)),
Err(e) => {
warn!(
"{}",
format!(
"Failed to initialize Redis Cluster adapter: {}, falling back to local adapter",
e
)
);
Ok(Box::new(LocalAdapter::new()))
}
}
}
AdapterDriver::Nats => {
let nats_cfg = NatsAdapterConfig {
servers: config.nats.servers.clone(),
prefix: config.nats.prefix.clone(),
request_timeout_ms: config.nats.request_timeout_ms,
username: config.nats.username.clone(),
password: config.nats.password.clone(),
token: config.nats.token.clone(),
connection_timeout_ms: config.nats.connection_timeout_ms,
nodes_number: config.nats.nodes_number,
};
match NatsAdapter::new(nats_cfg).await {
Ok(adapter) => Ok(Box::new(adapter)),
Err(e) => {
warn!(
"{}",
format!(
"Failed to initialize NATS adapter: {}, falling back to local adapter",
e
)
);
Ok(Box::new(LocalAdapter::new()))
}
}
}
AdapterDriver::Local | _ => {
info!("{}", "Using local adapter.".to_string());
Ok(Box::new(LocalAdapter::new()))
}
}
}
}