Skip to main content

sockudo_queue/
manager.rs

1use sockudo_core::error::Result;
2
3#[cfg(feature = "sns")]
4use sockudo_core::options::SnsQueueConfig;
5#[cfg(feature = "sqs")]
6use sockudo_core::options::SqsQueueConfig;
7use sockudo_core::queue::QueueInterface;
8use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
9
10use crate::memory_queue_manager::MemoryQueueManager;
11#[cfg(feature = "redis-cluster")]
12use crate::redis_cluster_queue_manager::RedisClusterQueueManager;
13#[cfg(feature = "redis")]
14use crate::redis_queue_manager::RedisQueueManager;
15#[cfg(feature = "sns")]
16use crate::sns_queue_manager::SnsQueueManager;
17#[cfg(feature = "sqs")]
18use crate::sqs_queue_manager::SqsQueueManager;
19use tracing::*;
20
21/// General Queue Manager interface wrapper
22pub struct QueueManagerFactory;
23
24impl QueueManagerFactory {
25    /// Creates a queue manager instance based on the specified driver.
26    #[allow(unused_variables)]
27    pub async fn create(
28        driver: &str,
29        redis_url: Option<&str>,
30        prefix: Option<&str>,
31        concurrency: Option<usize>,
32    ) -> Result<Box<dyn QueueInterface>> {
33        match driver {
34            #[cfg(feature = "redis")]
35            "redis" => {
36                let url = redis_url.unwrap_or("redis://127.0.0.1:6379/");
37                let prefix_str = prefix.unwrap_or("sockudo");
38                let concurrency_val = concurrency.unwrap_or(5);
39                info!(
40                    "Creating Redis queue manager (Prefix: {}, Concurrency: {})",
41                    prefix_str, concurrency_val
42                );
43                debug!("Redis queue manager URL: {}", url);
44                let manager = RedisQueueManager::new(url, prefix_str, concurrency_val).await?;
45                Ok(Box::new(manager))
46            }
47            #[cfg(feature = "redis-cluster")]
48            "redis-cluster" => {
49                let nodes_str = redis_url.unwrap_or(
50                    "redis://127.0.0.1:7000,redis://127.0.0.1:7001,redis://127.0.0.1:7002",
51                );
52                let cluster_nodes: Vec<String> =
53                    nodes_str.split(',').map(|s| s.trim().to_string()).collect();
54                let prefix_str = prefix.unwrap_or("sockudo");
55                let concurrency_val = concurrency.unwrap_or(5);
56
57                info!(
58                    "Creating Redis Cluster queue manager (Prefix: {}, Concurrency: {})",
59                    prefix_str, concurrency_val
60                );
61                debug!("Redis Cluster queue manager nodes: {:?}", cluster_nodes);
62
63                let manager =
64                    RedisClusterQueueManager::new(cluster_nodes, prefix_str, concurrency_val)
65                        .await?;
66                Ok(Box::new(manager))
67            }
68            "memory" => {
69                info!("{}", "Creating Memory queue manager".to_string());
70                let manager = MemoryQueueManager::new();
71                manager.start_processing();
72                Ok(Box::new(manager))
73            }
74            #[cfg(not(feature = "redis"))]
75            "redis" => {
76                warn!(
77                    "Redis queue manager requested but not compiled in. Falling back to memory queue."
78                );
79                let manager = MemoryQueueManager::new();
80                manager.start_processing();
81                Ok(Box::new(manager))
82            }
83            #[cfg(not(feature = "redis-cluster"))]
84            "redis-cluster" => {
85                warn!(
86                    "Redis Cluster queue manager requested but not compiled in. Falling back to memory queue."
87                );
88                let manager = MemoryQueueManager::new();
89                manager.start_processing();
90                Ok(Box::new(manager))
91            }
92            #[cfg(feature = "sqs")]
93            "sqs" => {
94                warn!(
95                    "SQS queue manager should be created via create_sqs(). Falling back to memory queue."
96                );
97                let manager = MemoryQueueManager::new();
98                manager.start_processing();
99                Ok(Box::new(manager))
100            }
101            #[cfg(not(feature = "sqs"))]
102            "sqs" => {
103                warn!(
104                    "SQS queue manager requested but not compiled in. Falling back to memory queue."
105                );
106                let manager = MemoryQueueManager::new();
107                manager.start_processing();
108                Ok(Box::new(manager))
109            }
110            #[cfg(feature = "sns")]
111            "sns" => {
112                warn!(
113                    "SNS queue manager should be created via create_sns(). Falling back to memory queue."
114                );
115                let manager = MemoryQueueManager::new();
116                manager.start_processing();
117                Ok(Box::new(manager))
118            }
119            #[cfg(not(feature = "sns"))]
120            "sns" => {
121                warn!(
122                    "SNS queue manager requested but not compiled in. Falling back to memory queue."
123                );
124                let manager = MemoryQueueManager::new();
125                manager.start_processing();
126                Ok(Box::new(manager))
127            }
128            other => Err(sockudo_core::error::Error::Queue(format!(
129                "Unsupported queue driver: {other}"
130            ))),
131        }
132    }
133
134    /// Creates an SQS queue manager instance with the given configuration.
135    #[cfg(feature = "sqs")]
136    pub async fn create_sqs(config: SqsQueueConfig) -> Result<Box<dyn QueueInterface>> {
137        info!(
138            "Creating SQS queue manager (Region: {}, Concurrency: {}, FIFO: {})",
139            config.region, config.concurrency, config.fifo
140        );
141        if let Some(ref url_prefix) = config.queue_url_prefix {
142            debug!("SQS queue URL prefix: {}", url_prefix);
143        }
144        let manager = SqsQueueManager::new(config).await?;
145        Ok(Box::new(manager))
146    }
147
148    #[cfg(not(feature = "sqs"))]
149    #[allow(unused_variables)]
150    pub async fn create_sqs(
151        config: sockudo_core::options::SqsQueueConfig,
152    ) -> Result<Box<dyn QueueInterface>> {
153        warn!("SQS queue manager requested but not compiled in. Falling back to memory queue.");
154        let manager = MemoryQueueManager::new();
155        manager.start_processing();
156        Ok(Box::new(manager))
157    }
158
159    #[cfg(feature = "sns")]
160    pub async fn create_sns(config: SnsQueueConfig) -> Result<Box<dyn QueueInterface>> {
161        info!(
162            "Creating SNS queue manager (Region: {}, Topic: {})",
163            config.region, config.topic_arn
164        );
165        let manager = SnsQueueManager::new(config).await?;
166        Ok(Box::new(manager))
167    }
168
169    #[cfg(not(feature = "sns"))]
170    #[allow(unused_variables)]
171    pub async fn create_sns(
172        config: sockudo_core::options::SnsQueueConfig,
173    ) -> Result<Box<dyn QueueInterface>> {
174        warn!("SNS queue manager requested but not compiled in. Falling back to memory queue.");
175        let manager = MemoryQueueManager::new();
176        manager.start_processing();
177        Ok(Box::new(manager))
178    }
179}
180
181pub struct QueueManager {
182    driver: Box<dyn QueueInterface>,
183}
184
185impl QueueManager {
186    /// Creates a new QueueManager wrapping a specific driver implementation.
187    pub fn new(driver: Box<dyn QueueInterface>) -> Self {
188        Self { driver }
189    }
190
191    /// Adds data to the specified queue via the underlying driver.
192    pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
193        self.driver.add_to_queue(queue_name, data).await
194    }
195
196    /// Registers a processor for the specified queue and starts processing (if applicable for the driver).
197    pub async fn process_queue(
198        &self,
199        queue_name: &str,
200        callback: JobProcessorFnAsync,
201    ) -> Result<()> {
202        self.driver.process_queue(queue_name, callback).await
203    }
204
205    /// Disconnects the underlying driver (if necessary).
206    pub async fn disconnect(&self) -> Result<()> {
207        self.driver.disconnect().await
208    }
209
210    /// Checks the health of the underlying queue driver.
211    pub async fn check_health(&self) -> Result<()> {
212        self.driver.check_health().await
213    }
214}