Skip to main content

sockudo_queue/
manager.rs

1use sockudo_core::error::Result;
2
3#[cfg(feature = "google-pubsub")]
4use sockudo_core::options::GooglePubSubAdapterConfig;
5#[cfg(feature = "iggy")]
6use sockudo_core::options::IggyConfig;
7#[cfg(feature = "kafka")]
8use sockudo_core::options::KafkaAdapterConfig;
9#[cfg(feature = "nats")]
10use sockudo_core::options::NatsAdapterConfig;
11#[cfg(feature = "pulsar")]
12use sockudo_core::options::PulsarAdapterConfig;
13#[cfg(feature = "rabbitmq")]
14use sockudo_core::options::RabbitMqAdapterConfig;
15#[cfg(feature = "sns")]
16use sockudo_core::options::SnsQueueConfig;
17#[cfg(feature = "sqs")]
18use sockudo_core::options::SqsQueueConfig;
19use sockudo_core::queue::QueueInterface;
20use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
21
22#[cfg(feature = "google-pubsub")]
23use crate::google_pubsub_queue_manager::GooglePubSubQueueManager;
24#[cfg(feature = "iggy")]
25use crate::iggy_queue_manager::IggyQueueManager;
26#[cfg(feature = "kafka")]
27use crate::kafka_queue_manager::KafkaQueueManager;
28use crate::memory_queue_manager::MemoryQueueManager;
29#[cfg(feature = "nats")]
30use crate::nats_queue_manager::NatsJetStreamQueueManager;
31#[cfg(feature = "pulsar")]
32use crate::pulsar_queue_manager::PulsarQueueManager;
33#[cfg(feature = "rabbitmq")]
34use crate::rabbitmq_queue_manager::RabbitMqQueueManager;
35#[cfg(feature = "redis-cluster")]
36use crate::redis_cluster_queue_manager::RedisClusterQueueManager;
37#[cfg(feature = "redis")]
38use crate::redis_queue_manager::RedisQueueManager;
39#[cfg(feature = "sns")]
40use crate::sns_queue_manager::SnsQueueManager;
41#[cfg(feature = "sqs")]
42use crate::sqs_queue_manager::SqsQueueManager;
43use tracing::*;
44
45/// General Queue Manager interface wrapper
46pub struct QueueManagerFactory;
47
48impl QueueManagerFactory {
49    /// Creates a queue manager instance based on the specified driver.
50    #[allow(unused_variables)]
51    pub async fn create(
52        driver: &str,
53        redis_url: Option<&str>,
54        prefix: Option<&str>,
55        concurrency: Option<usize>,
56    ) -> Result<Box<dyn QueueInterface>> {
57        match driver {
58            #[cfg(feature = "redis")]
59            "redis" => {
60                let url = redis_url.unwrap_or("redis://127.0.0.1:6379/");
61                let prefix_str = prefix.unwrap_or("sockudo");
62                let concurrency_val = concurrency.unwrap_or(5);
63                info!(
64                    "Creating Redis queue manager (Prefix: {}, Concurrency: {})",
65                    prefix_str, concurrency_val
66                );
67                debug!("Redis queue manager URL: {}", url);
68                let manager = RedisQueueManager::new(url, prefix_str, concurrency_val).await?;
69                Ok(Box::new(manager))
70            }
71            #[cfg(feature = "redis-cluster")]
72            "redis-cluster" => {
73                let nodes_str = redis_url.unwrap_or(
74                    "redis://127.0.0.1:7000,redis://127.0.0.1:7001,redis://127.0.0.1:7002",
75                );
76                let cluster_nodes: Vec<String> =
77                    nodes_str.split(',').map(|s| s.trim().to_string()).collect();
78                let prefix_str = prefix.unwrap_or("sockudo");
79                let concurrency_val = concurrency.unwrap_or(5);
80
81                info!(
82                    "Creating Redis Cluster queue manager (Prefix: {}, Concurrency: {})",
83                    prefix_str, concurrency_val
84                );
85                debug!("Redis Cluster queue manager nodes: {:?}", cluster_nodes);
86
87                let manager =
88                    RedisClusterQueueManager::new(cluster_nodes, prefix_str, concurrency_val)
89                        .await?;
90                Ok(Box::new(manager))
91            }
92            #[cfg(feature = "nats")]
93            "nats" => {
94                warn!(
95                    "NATS queue manager should be created via create_nats(). Falling back to memory queue."
96                );
97                let manager = MemoryQueueManager::new();
98                manager.start_processing();
99                Ok(Box::new(manager))
100            }
101            "memory" => {
102                info!("{}", "Creating Memory queue manager".to_string());
103                let manager = MemoryQueueManager::new();
104                manager.start_processing();
105                Ok(Box::new(manager))
106            }
107            #[cfg(not(feature = "redis"))]
108            "redis" => {
109                warn!(
110                    "Redis queue manager requested but not compiled in. Falling back to memory queue."
111                );
112                let manager = MemoryQueueManager::new();
113                manager.start_processing();
114                Ok(Box::new(manager))
115            }
116            #[cfg(not(feature = "redis-cluster"))]
117            "redis-cluster" => {
118                warn!(
119                    "Redis Cluster queue manager requested but not compiled in. Falling back to memory queue."
120                );
121                let manager = MemoryQueueManager::new();
122                manager.start_processing();
123                Ok(Box::new(manager))
124            }
125            #[cfg(feature = "sqs")]
126            "sqs" => {
127                warn!(
128                    "SQS queue manager should be created via create_sqs(). Falling back to memory queue."
129                );
130                let manager = MemoryQueueManager::new();
131                manager.start_processing();
132                Ok(Box::new(manager))
133            }
134            #[cfg(not(feature = "sqs"))]
135            "sqs" => {
136                warn!(
137                    "SQS queue manager requested but not compiled in. Falling back to memory queue."
138                );
139                let manager = MemoryQueueManager::new();
140                manager.start_processing();
141                Ok(Box::new(manager))
142            }
143            #[cfg(feature = "sns")]
144            "sns" => {
145                warn!(
146                    "SNS queue manager should be created via create_sns(). Falling back to memory queue."
147                );
148                let manager = MemoryQueueManager::new();
149                manager.start_processing();
150                Ok(Box::new(manager))
151            }
152            #[cfg(not(feature = "sns"))]
153            "sns" => {
154                warn!(
155                    "SNS queue manager requested but not compiled in. Falling back to memory queue."
156                );
157                let manager = MemoryQueueManager::new();
158                manager.start_processing();
159                Ok(Box::new(manager))
160            }
161            other => Err(sockudo_core::error::Error::Queue(format!(
162                "Unsupported queue driver: {other}"
163            ))),
164        }
165    }
166
167    /// Creates an SQS queue manager instance with the given configuration.
168    #[cfg(feature = "sqs")]
169    pub async fn create_sqs(config: SqsQueueConfig) -> Result<Box<dyn QueueInterface>> {
170        info!(
171            "Creating SQS queue manager (Region: {}, Concurrency: {}, FIFO: {})",
172            config.region, config.concurrency, config.fifo
173        );
174        if let Some(ref url_prefix) = config.queue_url_prefix {
175            debug!("SQS queue URL prefix: {}", url_prefix);
176        }
177        let manager = SqsQueueManager::new(config).await?;
178        Ok(Box::new(manager))
179    }
180
181    #[cfg(not(feature = "sqs"))]
182    #[allow(unused_variables)]
183    pub async fn create_sqs(
184        config: sockudo_core::options::SqsQueueConfig,
185    ) -> Result<Box<dyn QueueInterface>> {
186        warn!("SQS queue manager requested but not compiled in. Falling back to memory queue.");
187        let manager = MemoryQueueManager::new();
188        manager.start_processing();
189        Ok(Box::new(manager))
190    }
191
192    #[cfg(feature = "sns")]
193    pub async fn create_sns(config: SnsQueueConfig) -> Result<Box<dyn QueueInterface>> {
194        info!(
195            "Creating SNS queue manager (Region: {}, Topic: {})",
196            config.region, config.topic_arn
197        );
198        let manager = SnsQueueManager::new(config).await?;
199        Ok(Box::new(manager))
200    }
201
202    #[cfg(not(feature = "sns"))]
203    #[allow(unused_variables)]
204    pub async fn create_sns(
205        config: sockudo_core::options::SnsQueueConfig,
206    ) -> Result<Box<dyn QueueInterface>> {
207        warn!("SNS queue manager requested but not compiled in. Falling back to memory queue.");
208        let manager = MemoryQueueManager::new();
209        manager.start_processing();
210        Ok(Box::new(manager))
211    }
212
213    #[cfg(feature = "rabbitmq")]
214    pub async fn create_rabbitmq(config: RabbitMqAdapterConfig) -> Result<Box<dyn QueueInterface>> {
215        let manager = RabbitMqQueueManager::new(config).await?;
216        Ok(Box::new(manager))
217    }
218
219    #[cfg(not(feature = "rabbitmq"))]
220    #[allow(unused_variables)]
221    pub async fn create_rabbitmq(
222        config: sockudo_core::options::RabbitMqAdapterConfig,
223    ) -> Result<Box<dyn QueueInterface>> {
224        warn!(
225            "RabbitMQ queue manager requested but not compiled in. Falling back to memory queue."
226        );
227        let manager = MemoryQueueManager::new();
228        manager.start_processing();
229        Ok(Box::new(manager))
230    }
231
232    #[cfg(feature = "kafka")]
233    pub async fn create_kafka(config: KafkaAdapterConfig) -> Result<Box<dyn QueueInterface>> {
234        let manager = KafkaQueueManager::new(config).await?;
235        Ok(Box::new(manager))
236    }
237
238    #[cfg(not(feature = "kafka"))]
239    #[allow(unused_variables)]
240    pub async fn create_kafka(
241        config: sockudo_core::options::KafkaAdapterConfig,
242    ) -> Result<Box<dyn QueueInterface>> {
243        warn!("Kafka queue manager requested but not compiled in. Falling back to memory queue.");
244        let manager = MemoryQueueManager::new();
245        manager.start_processing();
246        Ok(Box::new(manager))
247    }
248
249    #[cfg(feature = "iggy")]
250    pub async fn create_iggy(config: IggyConfig) -> Result<Box<dyn QueueInterface>> {
251        let manager = IggyQueueManager::new(config).await?;
252        Ok(Box::new(manager))
253    }
254
255    #[cfg(not(feature = "iggy"))]
256    #[allow(unused_variables)]
257    pub async fn create_iggy(
258        config: sockudo_core::options::IggyConfig,
259    ) -> Result<Box<dyn QueueInterface>> {
260        warn!(
261            "Apache Iggy queue manager requested but not compiled in. Falling back to memory queue."
262        );
263        let manager = MemoryQueueManager::new();
264        manager.start_processing();
265        Ok(Box::new(manager))
266    }
267
268    #[cfg(feature = "pulsar")]
269    pub async fn create_pulsar(config: PulsarAdapterConfig) -> Result<Box<dyn QueueInterface>> {
270        let manager = PulsarQueueManager::new(config).await?;
271        Ok(Box::new(manager))
272    }
273
274    #[cfg(not(feature = "pulsar"))]
275    #[allow(unused_variables)]
276    pub async fn create_pulsar(
277        config: sockudo_core::options::PulsarAdapterConfig,
278    ) -> Result<Box<dyn QueueInterface>> {
279        warn!("Pulsar queue manager requested but not compiled in. Falling back to memory queue.");
280        let manager = MemoryQueueManager::new();
281        manager.start_processing();
282        Ok(Box::new(manager))
283    }
284
285    #[cfg(feature = "google-pubsub")]
286    pub async fn create_google_pubsub(
287        config: GooglePubSubAdapterConfig,
288    ) -> Result<Box<dyn QueueInterface>> {
289        let manager = GooglePubSubQueueManager::new(config).await?;
290        Ok(Box::new(manager))
291    }
292
293    #[cfg(not(feature = "google-pubsub"))]
294    #[allow(unused_variables)]
295    pub async fn create_google_pubsub(
296        config: sockudo_core::options::GooglePubSubAdapterConfig,
297    ) -> Result<Box<dyn QueueInterface>> {
298        warn!(
299            "Google Pub/Sub queue manager requested but not compiled in. Falling back to memory queue."
300        );
301        let manager = MemoryQueueManager::new();
302        manager.start_processing();
303        Ok(Box::new(manager))
304    }
305
306    #[cfg(feature = "nats")]
307    pub async fn create_nats(config: NatsAdapterConfig) -> Result<Box<dyn QueueInterface>> {
308        let manager = NatsJetStreamQueueManager::new(config).await?;
309        Ok(Box::new(manager))
310    }
311
312    #[cfg(not(feature = "nats"))]
313    #[allow(unused_variables)]
314    pub async fn create_nats(
315        config: sockudo_core::options::NatsAdapterConfig,
316    ) -> Result<Box<dyn QueueInterface>> {
317        warn!("NATS queue manager requested but not compiled in. Falling back to memory queue.");
318        let manager = MemoryQueueManager::new();
319        manager.start_processing();
320        Ok(Box::new(manager))
321    }
322}
323
324pub struct QueueManager {
325    driver: Box<dyn QueueInterface>,
326}
327
328impl QueueManager {
329    /// Creates a new QueueManager wrapping a specific driver implementation.
330    pub fn new(driver: Box<dyn QueueInterface>) -> Self {
331        Self { driver }
332    }
333
334    /// Adds data to the specified queue via the underlying driver.
335    pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
336        self.driver.add_to_queue(queue_name, data).await
337    }
338
339    /// Registers a processor for the specified queue and starts processing (if applicable for the driver).
340    pub async fn process_queue(
341        &self,
342        queue_name: &str,
343        callback: JobProcessorFnAsync,
344    ) -> Result<()> {
345        self.driver.process_queue(queue_name, callback).await
346    }
347
348    /// Disconnects the underlying driver (if necessary).
349    pub async fn disconnect(&self) -> Result<()> {
350        self.driver.disconnect().await
351    }
352
353    /// Checks the health of the underlying queue driver.
354    pub async fn check_health(&self) -> Result<()> {
355        self.driver.check_health().await
356    }
357}