use crate::ConnectionManager;
#[cfg(feature = "google-pubsub")]
use crate::google_pubsub_adapter::GooglePubSubAdapter;
#[cfg(feature = "kafka")]
use crate::kafka_adapter::KafkaAdapter;
use crate::local_adapter::LocalAdapter;
#[cfg(feature = "nats")]
use crate::nats_adapter::NatsAdapter;
#[cfg(feature = "pulsar")]
use crate::pulsar_adapter::PulsarAdapter;
#[cfg(feature = "rabbitmq")]
use crate::rabbitmq_adapter::RabbitMqAdapter;
#[cfg(feature = "redis")]
use crate::redis_adapter::{RedisAdapter, RedisAdapterOptions};
#[cfg(feature = "redis-cluster")]
use crate::redis_cluster_adapter::RedisClusterAdapter;
use sockudo_core::error::{Error, Result};
use std::sync::Arc;
#[cfg(feature = "google-pubsub")]
use sockudo_core::options::GooglePubSubAdapterConfig;
#[cfg(feature = "kafka")]
use sockudo_core::options::KafkaAdapterConfig;
#[cfg(feature = "nats")]
use sockudo_core::options::NatsAdapterConfig;
#[cfg(feature = "pulsar")]
use sockudo_core::options::PulsarAdapterConfig;
#[cfg(feature = "rabbitmq")]
use sockudo_core::options::RabbitMqAdapterConfig;
#[cfg(feature = "redis-cluster")]
use sockudo_core::options::RedisClusterAdapterConfig;
use sockudo_core::options::{AdapterConfig, AdapterDriver, DatabaseConfig};
#[cfg(feature = "redis")]
use sonic_rs::prelude::*;
use tracing::{info, warn};
#[derive(Clone)]
pub enum TypedAdapter {
Local(Arc<LocalAdapter>),
#[cfg(feature = "redis")]
Redis(Arc<RedisAdapter>),
#[cfg(feature = "redis-cluster")]
RedisCluster(Arc<RedisClusterAdapter>),
#[cfg(feature = "nats")]
Nats(Arc<NatsAdapter>),
#[cfg(feature = "pulsar")]
Pulsar(Arc<PulsarAdapter>),
#[cfg(feature = "google-pubsub")]
GooglePubSub(Arc<GooglePubSubAdapter>),
#[cfg(feature = "kafka")]
Kafka(Arc<KafkaAdapter>),
#[cfg(feature = "rabbitmq")]
RabbitMq(Arc<RabbitMqAdapter>),
}
impl TypedAdapter {
pub fn local_adapter(&self) -> Arc<LocalAdapter> {
match self {
TypedAdapter::Local(adapter) => adapter.clone(),
#[cfg(feature = "redis")]
TypedAdapter::Redis(adapter) => adapter.local_adapter.clone(),
#[cfg(feature = "redis-cluster")]
TypedAdapter::RedisCluster(adapter) => adapter.local_adapter.clone(),
#[cfg(feature = "nats")]
TypedAdapter::Nats(adapter) => adapter.local_adapter.clone(),
#[cfg(feature = "pulsar")]
TypedAdapter::Pulsar(adapter) => adapter.local_adapter.clone(),
#[cfg(feature = "google-pubsub")]
TypedAdapter::GooglePubSub(adapter) => adapter.local_adapter.clone(),
#[cfg(feature = "kafka")]
TypedAdapter::Kafka(adapter) => adapter.local_adapter.clone(),
#[cfg(feature = "rabbitmq")]
TypedAdapter::RabbitMq(adapter) => adapter.local_adapter.clone(),
}
}
#[cfg(feature = "delta")]
pub async fn set_delta_compression(
&self,
delta_compression: Arc<sockudo_delta::DeltaCompressionManager>,
app_manager: Arc<dyn sockudo_core::app::AppManager + Send + Sync>,
) {
self.local_adapter()
.set_delta_compression(delta_compression, app_manager)
.await;
}
#[cfg(feature = "tag-filtering")]
pub fn set_tag_filtering_enabled(&self, enabled: bool) {
self.local_adapter().set_tag_filtering_enabled(enabled);
}
#[cfg(feature = "tag-filtering")]
pub fn set_enable_tags_globally(&self, enabled: bool) {
self.local_adapter().set_enable_tags_globally(enabled);
}
#[allow(unused_variables)]
pub fn set_cache_manager(
&self,
cache_manager: Arc<dyn sockudo_core::cache::CacheManager + Send + Sync>,
idempotency_ttl: u64,
) {
match self {
TypedAdapter::Local(_) => {}
#[cfg(feature = "redis")]
TypedAdapter::Redis(adapter) => {
adapter.set_cache_manager(cache_manager, idempotency_ttl);
}
#[cfg(feature = "redis-cluster")]
TypedAdapter::RedisCluster(adapter) => {
adapter.set_cache_manager(cache_manager, idempotency_ttl);
}
#[cfg(feature = "nats")]
TypedAdapter::Nats(adapter) => {
adapter.set_cache_manager(cache_manager, idempotency_ttl);
}
#[cfg(feature = "pulsar")]
TypedAdapter::Pulsar(adapter) => {
adapter.set_cache_manager(cache_manager, idempotency_ttl);
}
#[cfg(feature = "google-pubsub")]
TypedAdapter::GooglePubSub(adapter) => {
adapter.set_cache_manager(cache_manager, idempotency_ttl);
}
#[cfg(feature = "kafka")]
TypedAdapter::Kafka(adapter) => {
adapter.set_cache_manager(cache_manager, idempotency_ttl);
}
#[cfg(feature = "rabbitmq")]
TypedAdapter::RabbitMq(adapter) => {
adapter.set_cache_manager(cache_manager, idempotency_ttl);
}
}
}
#[allow(unused_variables)]
pub async fn set_metrics(
&self,
metrics: Arc<dyn sockudo_core::metrics::MetricsInterface + Send + Sync>,
) -> Result<()> {
match self {
TypedAdapter::Local(_) => {
Ok(())
}
#[cfg(feature = "redis")]
TypedAdapter::Redis(adapter) => adapter.set_metrics(metrics).await,
#[cfg(feature = "redis-cluster")]
TypedAdapter::RedisCluster(adapter) => adapter.set_metrics(metrics).await,
#[cfg(feature = "nats")]
TypedAdapter::Nats(adapter) => adapter.set_metrics(metrics).await,
#[cfg(feature = "pulsar")]
TypedAdapter::Pulsar(adapter) => adapter.set_metrics(metrics).await,
#[cfg(feature = "google-pubsub")]
TypedAdapter::GooglePubSub(adapter) => adapter.set_metrics(metrics).await,
#[cfg(feature = "kafka")]
TypedAdapter::Kafka(adapter) => adapter.set_metrics(metrics).await,
#[cfg(feature = "rabbitmq")]
TypedAdapter::RabbitMq(adapter) => adapter.set_metrics(metrics).await,
}
}
}
pub struct AdapterFactory;
impl AdapterFactory {
#[allow(unused_variables)]
pub async fn create(
config: &AdapterConfig,
db_config: &DatabaseConfig,
) -> Result<Arc<dyn ConnectionManager + Send + Sync>> {
Self::create_with_typed(config, db_config)
.await
.map(|(adapter, _)| adapter)
}
#[allow(unused_variables)]
pub async fn create_with_typed(
config: &AdapterConfig,
db_config: &DatabaseConfig,
) -> Result<(Arc<dyn ConnectionManager + Send + Sync>, TypedAdapter)> {
info!(
"{}",
format!(
"Initializing ConnectionManager with driver: {:?}",
config.driver
)
);
match config.driver {
#[cfg(feature = "redis")]
AdapterDriver::Redis => {
let redis_url = config
.redis
.redis_pub_options
.get("url")
.and_then(|v| v.as_str())
.map(String::from)
.unwrap_or_else(|| db_config.redis.to_url());
let adapter_options = RedisAdapterOptions {
url: redis_url,
prefix: config.redis.prefix.clone(),
request_timeout_ms: config.redis.requests_timeout,
cluster_mode: config.redis.cluster_mode,
};
match RedisAdapter::new(adapter_options).await {
Ok(mut adapter) => {
adapter.set_cluster_health(&config.cluster_health).await?;
adapter.set_socket_counting(config.enable_socket_counting);
let adapter = Arc::new(adapter);
let typed = TypedAdapter::Redis(adapter.clone());
Ok((adapter, typed))
}
Err(e) => {
if !config.fallback_to_local {
tracing::error!("Failed to initialize Redis adapter: {}", e);
return Err(e);
}
warn!(
"Failed to initialize Redis adapter: {}, falling back to local adapter",
e
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
}
}
#[cfg(feature = "redis-cluster")]
AdapterDriver::RedisCluster => {
let nodes = if !config.cluster.nodes.is_empty() {
db_config
.redis
.normalize_cluster_seed_urls(&config.cluster.nodes)
} else {
db_config.redis.cluster_node_urls()
};
if nodes.is_empty() {
if !config.fallback_to_local {
return Err(Error::HorizontalAdapter(
"Redis Cluster adapter selected but no nodes configured".to_string(),
));
}
warn!(
"Redis Cluster adapter selected, but no nodes configured. Falling back to local adapter."
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
return Ok((local_adapter, typed));
}
let cluster_adapter_config = RedisClusterAdapterConfig {
nodes,
prefix: config.cluster.prefix.clone(),
request_timeout_ms: config.cluster.request_timeout_ms,
use_connection_manager: config.cluster.use_connection_manager,
use_sharded_pubsub: config.cluster.use_sharded_pubsub,
};
match RedisClusterAdapter::new(cluster_adapter_config).await {
Ok(mut adapter) => {
adapter.set_cluster_health(&config.cluster_health).await?;
adapter.set_socket_counting(config.enable_socket_counting);
let adapter = Arc::new(adapter);
let typed = TypedAdapter::RedisCluster(adapter.clone());
Ok((adapter, typed))
}
Err(e) => {
if !config.fallback_to_local {
tracing::error!("Failed to initialize Redis Cluster adapter: {}", e);
return Err(e);
}
warn!(
"Failed to initialize Redis Cluster adapter: {}, falling back to local adapter",
e
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
}
}
#[cfg(feature = "nats")]
AdapterDriver::Nats => {
let nats_cfg = NatsAdapterConfig {
servers: config.nats.servers.clone(),
prefix: config.nats.prefix.clone(),
request_timeout_ms: config.nats.request_timeout_ms,
username: config.nats.username.clone(),
password: config.nats.password.clone(),
token: config.nats.token.clone(),
connection_timeout_ms: config.nats.connection_timeout_ms,
nodes_number: config.nats.nodes_number,
discovery_max_wait_ms: config.nats.discovery_max_wait_ms,
discovery_idle_wait_ms: config.nats.discovery_idle_wait_ms,
};
match NatsAdapter::new(nats_cfg).await {
Ok(mut adapter) => {
adapter.set_cluster_health(&config.cluster_health).await?;
adapter.set_socket_counting(config.enable_socket_counting);
let adapter = Arc::new(adapter);
let typed = TypedAdapter::Nats(adapter.clone());
Ok((adapter, typed))
}
Err(e) => {
if !config.fallback_to_local {
tracing::error!("Failed to initialize NATS adapter: {}", e);
return Err(e);
}
warn!(
"Failed to initialize NATS adapter: {}, falling back to local adapter",
e
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
}
}
#[cfg(feature = "pulsar")]
AdapterDriver::Pulsar => {
let pulsar_cfg = PulsarAdapterConfig {
url: config.pulsar.url.clone(),
prefix: config.pulsar.prefix.clone(),
request_timeout_ms: config.pulsar.request_timeout_ms,
token: config.pulsar.token.clone(),
nodes_number: config.pulsar.nodes_number,
};
match PulsarAdapter::new(pulsar_cfg).await {
Ok(mut adapter) => {
adapter.set_cluster_health(&config.cluster_health).await?;
adapter.set_socket_counting(config.enable_socket_counting);
let adapter = Arc::new(adapter);
let typed = TypedAdapter::Pulsar(adapter.clone());
Ok((adapter, typed))
}
Err(e) => {
if !config.fallback_to_local {
tracing::error!("Failed to initialize Pulsar adapter: {}", e);
return Err(e);
}
warn!(
"Failed to initialize Pulsar adapter: {}, falling back to local adapter",
e
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
}
}
#[cfg(feature = "rabbitmq")]
AdapterDriver::RabbitMq => {
let rabbitmq_cfg = RabbitMqAdapterConfig {
url: config.rabbitmq.url.clone(),
prefix: config.rabbitmq.prefix.clone(),
request_timeout_ms: config.rabbitmq.request_timeout_ms,
connection_timeout_ms: config.rabbitmq.connection_timeout_ms,
nodes_number: config.rabbitmq.nodes_number,
};
match RabbitMqAdapter::new(rabbitmq_cfg).await {
Ok(mut adapter) => {
adapter.set_cluster_health(&config.cluster_health).await?;
adapter.set_socket_counting(config.enable_socket_counting);
let adapter = Arc::new(adapter);
let typed = TypedAdapter::RabbitMq(adapter.clone());
Ok((adapter, typed))
}
Err(e) => {
if !config.fallback_to_local {
tracing::error!("Failed to initialize RabbitMQ adapter: {}", e);
return Err(e);
}
warn!(
"Failed to initialize RabbitMQ adapter: {}, falling back to local adapter",
e
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
}
}
#[cfg(feature = "google-pubsub")]
AdapterDriver::GooglePubSub => {
let google_pubsub_cfg = GooglePubSubAdapterConfig {
project_id: config.google_pubsub.project_id.clone(),
prefix: config.google_pubsub.prefix.clone(),
request_timeout_ms: config.google_pubsub.request_timeout_ms,
emulator_host: config.google_pubsub.emulator_host.clone(),
nodes_number: config.google_pubsub.nodes_number,
};
match GooglePubSubAdapter::new(google_pubsub_cfg).await {
Ok(mut adapter) => {
adapter.set_cluster_health(&config.cluster_health).await?;
adapter.set_socket_counting(config.enable_socket_counting);
let adapter = Arc::new(adapter);
let typed = TypedAdapter::GooglePubSub(adapter.clone());
Ok((adapter, typed))
}
Err(e) => {
if !config.fallback_to_local {
tracing::error!("Failed to initialize Google Pub/Sub adapter: {}", e);
return Err(e);
}
warn!(
"Failed to initialize Google Pub/Sub adapter: {}, falling back to local adapter",
e
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
}
}
#[cfg(feature = "kafka")]
AdapterDriver::Kafka => {
let kafka_cfg = KafkaAdapterConfig {
brokers: config.kafka.brokers.clone(),
prefix: config.kafka.prefix.clone(),
request_timeout_ms: config.kafka.request_timeout_ms,
security_protocol: config.kafka.security_protocol.clone(),
sasl_mechanism: config.kafka.sasl_mechanism.clone(),
sasl_username: config.kafka.sasl_username.clone(),
sasl_password: config.kafka.sasl_password.clone(),
nodes_number: config.kafka.nodes_number,
};
match KafkaAdapter::new(kafka_cfg).await {
Ok(mut adapter) => {
adapter.set_cluster_health(&config.cluster_health).await?;
adapter.set_socket_counting(config.enable_socket_counting);
let adapter = Arc::new(adapter);
let typed = TypedAdapter::Kafka(adapter.clone());
Ok((adapter, typed))
}
Err(e) => {
if !config.fallback_to_local {
tracing::error!("Failed to initialize Kafka adapter: {}", e);
return Err(e);
}
warn!(
"Failed to initialize Kafka adapter: {}, falling back to local adapter",
e
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
}
}
AdapterDriver::Local => {
info!("{}", "Using local adapter.".to_string());
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
#[cfg(not(feature = "redis"))]
AdapterDriver::Redis => {
if !config.fallback_to_local {
return Err(Error::HorizontalAdapter(
"Redis adapter requested but not compiled in. Fallback to local adapter is disabled. Build with --features redis or set adapter.fallback_to_local = true".to_string()
));
}
warn!(
"Redis adapter requested but not compiled in. Falling back to local adapter."
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
#[cfg(not(feature = "redis-cluster"))]
AdapterDriver::RedisCluster => {
if !config.fallback_to_local {
return Err(Error::HorizontalAdapter(
"Redis Cluster adapter requested but not compiled in. Fallback to local adapter is disabled. Build with --features redis-cluster or set adapter.fallback_to_local = true".to_string()
));
}
warn!(
"Redis Cluster adapter requested but not compiled in. Falling back to local adapter."
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
#[cfg(not(feature = "nats"))]
AdapterDriver::Nats => {
if !config.fallback_to_local {
return Err(Error::HorizontalAdapter(
"NATS adapter requested but not compiled in. Fallback to local adapter is disabled. Build with --features nats or set adapter.fallback_to_local = true".to_string()
));
}
warn!("NATS adapter requested but not compiled in. Falling back to local adapter.");
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
#[cfg(not(feature = "pulsar"))]
AdapterDriver::Pulsar => {
if !config.fallback_to_local {
return Err(Error::HorizontalAdapter(
"Pulsar adapter requested but not compiled in. Fallback to local adapter is disabled. Build with --features pulsar or set adapter.fallback_to_local = true".to_string()
));
}
warn!(
"Pulsar adapter requested but not compiled in. Falling back to local adapter."
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
#[cfg(not(feature = "rabbitmq"))]
AdapterDriver::RabbitMq => {
if !config.fallback_to_local {
return Err(Error::HorizontalAdapter(
"RabbitMQ adapter requested but not compiled in. Fallback to local adapter is disabled. Build with --features rabbitmq or set adapter.fallback_to_local = true".to_string()
));
}
warn!(
"RabbitMQ adapter requested but not compiled in. Falling back to local adapter."
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
#[cfg(not(feature = "google-pubsub"))]
AdapterDriver::GooglePubSub => {
if !config.fallback_to_local {
return Err(Error::HorizontalAdapter(
"Google Pub/Sub adapter requested but not compiled in. Fallback to local adapter is disabled. Build with --features google-pubsub or set adapter.fallback_to_local = true".to_string()
));
}
warn!(
"Google Pub/Sub adapter requested but not compiled in. Falling back to local adapter."
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
#[cfg(not(feature = "kafka"))]
AdapterDriver::Kafka => {
if !config.fallback_to_local {
return Err(Error::HorizontalAdapter(
"Kafka adapter requested but not compiled in. Fallback to local adapter is disabled. Build with --features kafka or set adapter.fallback_to_local = true".to_string()
));
}
warn!(
"Kafka adapter requested but not compiled in. Falling back to local adapter."
);
let local_adapter = Arc::new(LocalAdapter::new_with_buffer_multiplier(
config.buffer_multiplier_per_cpu,
));
let typed = TypedAdapter::Local(local_adapter.clone());
Ok((local_adapter, typed))
}
}
}
}