use sockudo_core::error::Result;
#[cfg(feature = "google-pubsub")]
use sockudo_core::options::GooglePubSubAdapterConfig;
#[cfg(feature = "iggy")]
use sockudo_core::options::IggyConfig;
#[cfg(feature = "kafka")]
use sockudo_core::options::KafkaAdapterConfig;
#[cfg(feature = "nats")]
use sockudo_core::options::NatsAdapterConfig;
#[cfg(feature = "pulsar")]
use sockudo_core::options::PulsarAdapterConfig;
#[cfg(feature = "rabbitmq")]
use sockudo_core::options::RabbitMqAdapterConfig;
#[cfg(feature = "sns")]
use sockudo_core::options::SnsQueueConfig;
#[cfg(feature = "sqs")]
use sockudo_core::options::SqsQueueConfig;
use sockudo_core::queue::QueueInterface;
use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
#[cfg(feature = "google-pubsub")]
use crate::google_pubsub_queue_manager::GooglePubSubQueueManager;
#[cfg(feature = "iggy")]
use crate::iggy_queue_manager::IggyQueueManager;
#[cfg(feature = "kafka")]
use crate::kafka_queue_manager::KafkaQueueManager;
use crate::memory_queue_manager::MemoryQueueManager;
#[cfg(feature = "nats")]
use crate::nats_queue_manager::NatsJetStreamQueueManager;
#[cfg(feature = "pulsar")]
use crate::pulsar_queue_manager::PulsarQueueManager;
#[cfg(feature = "rabbitmq")]
use crate::rabbitmq_queue_manager::RabbitMqQueueManager;
#[cfg(feature = "redis-cluster")]
use crate::redis_cluster_queue_manager::RedisClusterQueueManager;
#[cfg(feature = "redis")]
use crate::redis_queue_manager::RedisQueueManager;
#[cfg(feature = "sns")]
use crate::sns_queue_manager::SnsQueueManager;
#[cfg(feature = "sqs")]
use crate::sqs_queue_manager::SqsQueueManager;
use tracing::*;
pub struct QueueManagerFactory;
impl QueueManagerFactory {
#[allow(unused_variables)]
pub async fn create(
driver: &str,
redis_url: Option<&str>,
prefix: Option<&str>,
concurrency: Option<usize>,
) -> Result<Box<dyn QueueInterface>> {
match driver {
#[cfg(feature = "redis")]
"redis" => {
let url = redis_url.unwrap_or("redis://127.0.0.1:6379/");
let prefix_str = prefix.unwrap_or("sockudo");
let concurrency_val = concurrency.unwrap_or(5);
info!(
"Creating Redis queue manager (Prefix: {}, Concurrency: {})",
prefix_str, concurrency_val
);
debug!("Redis queue manager URL: {}", url);
let manager = RedisQueueManager::new(url, prefix_str, concurrency_val).await?;
Ok(Box::new(manager))
}
#[cfg(feature = "redis-cluster")]
"redis-cluster" => {
let nodes_str = redis_url.unwrap_or(
"redis://127.0.0.1:7000,redis://127.0.0.1:7001,redis://127.0.0.1:7002",
);
let cluster_nodes: Vec<String> =
nodes_str.split(',').map(|s| s.trim().to_string()).collect();
let prefix_str = prefix.unwrap_or("sockudo");
let concurrency_val = concurrency.unwrap_or(5);
info!(
"Creating Redis Cluster queue manager (Prefix: {}, Concurrency: {})",
prefix_str, concurrency_val
);
debug!("Redis Cluster queue manager nodes: {:?}", cluster_nodes);
let manager =
RedisClusterQueueManager::new(cluster_nodes, prefix_str, concurrency_val)
.await?;
Ok(Box::new(manager))
}
#[cfg(feature = "nats")]
"nats" => {
warn!(
"NATS queue manager should be created via create_nats(). Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
"memory" => {
info!("{}", "Creating Memory queue manager".to_string());
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(not(feature = "redis"))]
"redis" => {
warn!(
"Redis queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(not(feature = "redis-cluster"))]
"redis-cluster" => {
warn!(
"Redis Cluster queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "sqs")]
"sqs" => {
warn!(
"SQS queue manager should be created via create_sqs(). Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(not(feature = "sqs"))]
"sqs" => {
warn!(
"SQS queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "sns")]
"sns" => {
warn!(
"SNS queue manager should be created via create_sns(). Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(not(feature = "sns"))]
"sns" => {
warn!(
"SNS queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
other => Err(sockudo_core::error::Error::Queue(format!(
"Unsupported queue driver: {other}"
))),
}
}
#[cfg(feature = "sqs")]
pub async fn create_sqs(config: SqsQueueConfig) -> Result<Box<dyn QueueInterface>> {
info!(
"Creating SQS queue manager (Region: {}, Concurrency: {}, FIFO: {})",
config.region, config.concurrency, config.fifo
);
if let Some(ref url_prefix) = config.queue_url_prefix {
debug!("SQS queue URL prefix: {}", url_prefix);
}
let manager = SqsQueueManager::new(config).await?;
Ok(Box::new(manager))
}
#[cfg(not(feature = "sqs"))]
#[allow(unused_variables)]
pub async fn create_sqs(
config: sockudo_core::options::SqsQueueConfig,
) -> Result<Box<dyn QueueInterface>> {
warn!("SQS queue manager requested but not compiled in. Falling back to memory queue.");
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "sns")]
pub async fn create_sns(config: SnsQueueConfig) -> Result<Box<dyn QueueInterface>> {
info!(
"Creating SNS queue manager (Region: {}, Topic: {})",
config.region, config.topic_arn
);
let manager = SnsQueueManager::new(config).await?;
Ok(Box::new(manager))
}
#[cfg(not(feature = "sns"))]
#[allow(unused_variables)]
pub async fn create_sns(
config: sockudo_core::options::SnsQueueConfig,
) -> Result<Box<dyn QueueInterface>> {
warn!("SNS queue manager requested but not compiled in. Falling back to memory queue.");
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "rabbitmq")]
pub async fn create_rabbitmq(config: RabbitMqAdapterConfig) -> Result<Box<dyn QueueInterface>> {
let manager = RabbitMqQueueManager::new(config).await?;
Ok(Box::new(manager))
}
#[cfg(not(feature = "rabbitmq"))]
#[allow(unused_variables)]
pub async fn create_rabbitmq(
config: sockudo_core::options::RabbitMqAdapterConfig,
) -> Result<Box<dyn QueueInterface>> {
warn!(
"RabbitMQ queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "kafka")]
pub async fn create_kafka(config: KafkaAdapterConfig) -> Result<Box<dyn QueueInterface>> {
let manager = KafkaQueueManager::new(config).await?;
Ok(Box::new(manager))
}
#[cfg(not(feature = "kafka"))]
#[allow(unused_variables)]
pub async fn create_kafka(
config: sockudo_core::options::KafkaAdapterConfig,
) -> Result<Box<dyn QueueInterface>> {
warn!("Kafka queue manager requested but not compiled in. Falling back to memory queue.");
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "iggy")]
pub async fn create_iggy(config: IggyConfig) -> Result<Box<dyn QueueInterface>> {
let manager = IggyQueueManager::new(config).await?;
Ok(Box::new(manager))
}
#[cfg(not(feature = "iggy"))]
#[allow(unused_variables)]
pub async fn create_iggy(
config: sockudo_core::options::IggyConfig,
) -> Result<Box<dyn QueueInterface>> {
warn!(
"Apache Iggy queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "pulsar")]
pub async fn create_pulsar(config: PulsarAdapterConfig) -> Result<Box<dyn QueueInterface>> {
let manager = PulsarQueueManager::new(config).await?;
Ok(Box::new(manager))
}
#[cfg(not(feature = "pulsar"))]
#[allow(unused_variables)]
pub async fn create_pulsar(
config: sockudo_core::options::PulsarAdapterConfig,
) -> Result<Box<dyn QueueInterface>> {
warn!("Pulsar queue manager requested but not compiled in. Falling back to memory queue.");
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "google-pubsub")]
pub async fn create_google_pubsub(
config: GooglePubSubAdapterConfig,
) -> Result<Box<dyn QueueInterface>> {
let manager = GooglePubSubQueueManager::new(config).await?;
Ok(Box::new(manager))
}
#[cfg(not(feature = "google-pubsub"))]
#[allow(unused_variables)]
pub async fn create_google_pubsub(
config: sockudo_core::options::GooglePubSubAdapterConfig,
) -> Result<Box<dyn QueueInterface>> {
warn!(
"Google Pub/Sub queue manager requested but not compiled in. Falling back to memory queue."
);
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
#[cfg(feature = "nats")]
pub async fn create_nats(config: NatsAdapterConfig) -> Result<Box<dyn QueueInterface>> {
let manager = NatsJetStreamQueueManager::new(config).await?;
Ok(Box::new(manager))
}
#[cfg(not(feature = "nats"))]
#[allow(unused_variables)]
pub async fn create_nats(
config: sockudo_core::options::NatsAdapterConfig,
) -> Result<Box<dyn QueueInterface>> {
warn!("NATS queue manager requested but not compiled in. Falling back to memory queue.");
let manager = MemoryQueueManager::new();
manager.start_processing();
Ok(Box::new(manager))
}
}
pub struct QueueManager {
driver: Box<dyn QueueInterface>,
}
impl QueueManager {
pub fn new(driver: Box<dyn QueueInterface>) -> Self {
Self { driver }
}
pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
self.driver.add_to_queue(queue_name, data).await
}
pub async fn process_queue(
&self,
queue_name: &str,
callback: JobProcessorFnAsync,
) -> Result<()> {
self.driver.process_queue(queue_name, callback).await
}
pub async fn disconnect(&self) -> Result<()> {
self.driver.disconnect().await
}
pub async fn check_health(&self) -> Result<()> {
self.driver.check_health().await
}
}