Skip to main content

sockudo_queue/
manager.rs

1use sockudo_core::error::Result;
2
3#[cfg(feature = "google-pubsub")]
4use sockudo_core::options::GooglePubSubAdapterConfig;
5#[cfg(feature = "kafka")]
6use sockudo_core::options::KafkaAdapterConfig;
7#[cfg(feature = "nats")]
8use sockudo_core::options::NatsAdapterConfig;
9#[cfg(feature = "pulsar")]
10use sockudo_core::options::PulsarAdapterConfig;
11#[cfg(feature = "rabbitmq")]
12use sockudo_core::options::RabbitMqAdapterConfig;
13#[cfg(feature = "sns")]
14use sockudo_core::options::SnsQueueConfig;
15#[cfg(feature = "sqs")]
16use sockudo_core::options::SqsQueueConfig;
17use sockudo_core::queue::QueueInterface;
18use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
19
20#[cfg(feature = "google-pubsub")]
21use crate::google_pubsub_queue_manager::GooglePubSubQueueManager;
22#[cfg(feature = "kafka")]
23use crate::kafka_queue_manager::KafkaQueueManager;
24use crate::memory_queue_manager::MemoryQueueManager;
25#[cfg(feature = "nats")]
26use crate::nats_queue_manager::NatsJetStreamQueueManager;
27#[cfg(feature = "pulsar")]
28use crate::pulsar_queue_manager::PulsarQueueManager;
29#[cfg(feature = "rabbitmq")]
30use crate::rabbitmq_queue_manager::RabbitMqQueueManager;
31#[cfg(feature = "redis-cluster")]
32use crate::redis_cluster_queue_manager::RedisClusterQueueManager;
33#[cfg(feature = "redis")]
34use crate::redis_queue_manager::RedisQueueManager;
35#[cfg(feature = "sns")]
36use crate::sns_queue_manager::SnsQueueManager;
37#[cfg(feature = "sqs")]
38use crate::sqs_queue_manager::SqsQueueManager;
39use tracing::*;
40
41/// General Queue Manager interface wrapper
42pub struct QueueManagerFactory;
43
44impl QueueManagerFactory {
45    /// Creates a queue manager instance based on the specified driver.
46    #[allow(unused_variables)]
47    pub async fn create(
48        driver: &str,
49        redis_url: Option<&str>,
50        prefix: Option<&str>,
51        concurrency: Option<usize>,
52    ) -> Result<Box<dyn QueueInterface>> {
53        match driver {
54            #[cfg(feature = "redis")]
55            "redis" => {
56                let url = redis_url.unwrap_or("redis://127.0.0.1:6379/");
57                let prefix_str = prefix.unwrap_or("sockudo");
58                let concurrency_val = concurrency.unwrap_or(5);
59                info!(
60                    "Creating Redis queue manager (Prefix: {}, Concurrency: {})",
61                    prefix_str, concurrency_val
62                );
63                debug!("Redis queue manager URL: {}", url);
64                let manager = RedisQueueManager::new(url, prefix_str, concurrency_val).await?;
65                Ok(Box::new(manager))
66            }
67            #[cfg(feature = "redis-cluster")]
68            "redis-cluster" => {
69                let nodes_str = redis_url.unwrap_or(
70                    "redis://127.0.0.1:7000,redis://127.0.0.1:7001,redis://127.0.0.1:7002",
71                );
72                let cluster_nodes: Vec<String> =
73                    nodes_str.split(',').map(|s| s.trim().to_string()).collect();
74                let prefix_str = prefix.unwrap_or("sockudo");
75                let concurrency_val = concurrency.unwrap_or(5);
76
77                info!(
78                    "Creating Redis Cluster queue manager (Prefix: {}, Concurrency: {})",
79                    prefix_str, concurrency_val
80                );
81                debug!("Redis Cluster queue manager nodes: {:?}", cluster_nodes);
82
83                let manager =
84                    RedisClusterQueueManager::new(cluster_nodes, prefix_str, concurrency_val)
85                        .await?;
86                Ok(Box::new(manager))
87            }
88            #[cfg(feature = "nats")]
89            "nats" => {
90                warn!(
91                    "NATS queue manager should be created via create_nats(). Falling back to memory queue."
92                );
93                let manager = MemoryQueueManager::new();
94                manager.start_processing();
95                Ok(Box::new(manager))
96            }
97            "memory" => {
98                info!("{}", "Creating Memory queue manager".to_string());
99                let manager = MemoryQueueManager::new();
100                manager.start_processing();
101                Ok(Box::new(manager))
102            }
103            #[cfg(not(feature = "redis"))]
104            "redis" => {
105                warn!(
106                    "Redis queue manager requested but not compiled in. Falling back to memory queue."
107                );
108                let manager = MemoryQueueManager::new();
109                manager.start_processing();
110                Ok(Box::new(manager))
111            }
112            #[cfg(not(feature = "redis-cluster"))]
113            "redis-cluster" => {
114                warn!(
115                    "Redis Cluster queue manager requested but not compiled in. Falling back to memory queue."
116                );
117                let manager = MemoryQueueManager::new();
118                manager.start_processing();
119                Ok(Box::new(manager))
120            }
121            #[cfg(feature = "sqs")]
122            "sqs" => {
123                warn!(
124                    "SQS queue manager should be created via create_sqs(). Falling back to memory queue."
125                );
126                let manager = MemoryQueueManager::new();
127                manager.start_processing();
128                Ok(Box::new(manager))
129            }
130            #[cfg(not(feature = "sqs"))]
131            "sqs" => {
132                warn!(
133                    "SQS queue manager requested but not compiled in. Falling back to memory queue."
134                );
135                let manager = MemoryQueueManager::new();
136                manager.start_processing();
137                Ok(Box::new(manager))
138            }
139            #[cfg(feature = "sns")]
140            "sns" => {
141                warn!(
142                    "SNS queue manager should be created via create_sns(). Falling back to memory queue."
143                );
144                let manager = MemoryQueueManager::new();
145                manager.start_processing();
146                Ok(Box::new(manager))
147            }
148            #[cfg(not(feature = "sns"))]
149            "sns" => {
150                warn!(
151                    "SNS queue manager requested but not compiled in. Falling back to memory queue."
152                );
153                let manager = MemoryQueueManager::new();
154                manager.start_processing();
155                Ok(Box::new(manager))
156            }
157            other => Err(sockudo_core::error::Error::Queue(format!(
158                "Unsupported queue driver: {other}"
159            ))),
160        }
161    }
162
163    /// Creates an SQS queue manager instance with the given configuration.
164    #[cfg(feature = "sqs")]
165    pub async fn create_sqs(config: SqsQueueConfig) -> Result<Box<dyn QueueInterface>> {
166        info!(
167            "Creating SQS queue manager (Region: {}, Concurrency: {}, FIFO: {})",
168            config.region, config.concurrency, config.fifo
169        );
170        if let Some(ref url_prefix) = config.queue_url_prefix {
171            debug!("SQS queue URL prefix: {}", url_prefix);
172        }
173        let manager = SqsQueueManager::new(config).await?;
174        Ok(Box::new(manager))
175    }
176
177    #[cfg(not(feature = "sqs"))]
178    #[allow(unused_variables)]
179    pub async fn create_sqs(
180        config: sockudo_core::options::SqsQueueConfig,
181    ) -> Result<Box<dyn QueueInterface>> {
182        warn!("SQS queue manager requested but not compiled in. Falling back to memory queue.");
183        let manager = MemoryQueueManager::new();
184        manager.start_processing();
185        Ok(Box::new(manager))
186    }
187
188    #[cfg(feature = "sns")]
189    pub async fn create_sns(config: SnsQueueConfig) -> Result<Box<dyn QueueInterface>> {
190        info!(
191            "Creating SNS queue manager (Region: {}, Topic: {})",
192            config.region, config.topic_arn
193        );
194        let manager = SnsQueueManager::new(config).await?;
195        Ok(Box::new(manager))
196    }
197
198    #[cfg(not(feature = "sns"))]
199    #[allow(unused_variables)]
200    pub async fn create_sns(
201        config: sockudo_core::options::SnsQueueConfig,
202    ) -> Result<Box<dyn QueueInterface>> {
203        warn!("SNS queue manager requested but not compiled in. Falling back to memory queue.");
204        let manager = MemoryQueueManager::new();
205        manager.start_processing();
206        Ok(Box::new(manager))
207    }
208
209    #[cfg(feature = "rabbitmq")]
210    pub async fn create_rabbitmq(config: RabbitMqAdapterConfig) -> Result<Box<dyn QueueInterface>> {
211        let manager = RabbitMqQueueManager::new(config).await?;
212        Ok(Box::new(manager))
213    }
214
215    #[cfg(not(feature = "rabbitmq"))]
216    #[allow(unused_variables)]
217    pub async fn create_rabbitmq(
218        config: sockudo_core::options::RabbitMqAdapterConfig,
219    ) -> Result<Box<dyn QueueInterface>> {
220        warn!(
221            "RabbitMQ queue manager requested but not compiled in. Falling back to memory queue."
222        );
223        let manager = MemoryQueueManager::new();
224        manager.start_processing();
225        Ok(Box::new(manager))
226    }
227
228    #[cfg(feature = "kafka")]
229    pub async fn create_kafka(config: KafkaAdapterConfig) -> Result<Box<dyn QueueInterface>> {
230        let manager = KafkaQueueManager::new(config).await?;
231        Ok(Box::new(manager))
232    }
233
234    #[cfg(not(feature = "kafka"))]
235    #[allow(unused_variables)]
236    pub async fn create_kafka(
237        config: sockudo_core::options::KafkaAdapterConfig,
238    ) -> Result<Box<dyn QueueInterface>> {
239        warn!("Kafka queue manager requested but not compiled in. Falling back to memory queue.");
240        let manager = MemoryQueueManager::new();
241        manager.start_processing();
242        Ok(Box::new(manager))
243    }
244
245    #[cfg(feature = "pulsar")]
246    pub async fn create_pulsar(config: PulsarAdapterConfig) -> Result<Box<dyn QueueInterface>> {
247        let manager = PulsarQueueManager::new(config).await?;
248        Ok(Box::new(manager))
249    }
250
251    #[cfg(not(feature = "pulsar"))]
252    #[allow(unused_variables)]
253    pub async fn create_pulsar(
254        config: sockudo_core::options::PulsarAdapterConfig,
255    ) -> Result<Box<dyn QueueInterface>> {
256        warn!("Pulsar queue manager requested but not compiled in. Falling back to memory queue.");
257        let manager = MemoryQueueManager::new();
258        manager.start_processing();
259        Ok(Box::new(manager))
260    }
261
262    #[cfg(feature = "google-pubsub")]
263    pub async fn create_google_pubsub(
264        config: GooglePubSubAdapterConfig,
265    ) -> Result<Box<dyn QueueInterface>> {
266        let manager = GooglePubSubQueueManager::new(config).await?;
267        Ok(Box::new(manager))
268    }
269
270    #[cfg(not(feature = "google-pubsub"))]
271    #[allow(unused_variables)]
272    pub async fn create_google_pubsub(
273        config: sockudo_core::options::GooglePubSubAdapterConfig,
274    ) -> Result<Box<dyn QueueInterface>> {
275        warn!(
276            "Google Pub/Sub queue manager requested but not compiled in. Falling back to memory queue."
277        );
278        let manager = MemoryQueueManager::new();
279        manager.start_processing();
280        Ok(Box::new(manager))
281    }
282
283    #[cfg(feature = "nats")]
284    pub async fn create_nats(config: NatsAdapterConfig) -> Result<Box<dyn QueueInterface>> {
285        let manager = NatsJetStreamQueueManager::new(config).await?;
286        Ok(Box::new(manager))
287    }
288
289    #[cfg(not(feature = "nats"))]
290    #[allow(unused_variables)]
291    pub async fn create_nats(
292        config: sockudo_core::options::NatsAdapterConfig,
293    ) -> Result<Box<dyn QueueInterface>> {
294        warn!("NATS queue manager requested but not compiled in. Falling back to memory queue.");
295        let manager = MemoryQueueManager::new();
296        manager.start_processing();
297        Ok(Box::new(manager))
298    }
299}
300
301pub struct QueueManager {
302    driver: Box<dyn QueueInterface>,
303}
304
305impl QueueManager {
306    /// Creates a new QueueManager wrapping a specific driver implementation.
307    pub fn new(driver: Box<dyn QueueInterface>) -> Self {
308        Self { driver }
309    }
310
311    /// Adds data to the specified queue via the underlying driver.
312    pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
313        self.driver.add_to_queue(queue_name, data).await
314    }
315
316    /// Registers a processor for the specified queue and starts processing (if applicable for the driver).
317    pub async fn process_queue(
318        &self,
319        queue_name: &str,
320        callback: JobProcessorFnAsync,
321    ) -> Result<()> {
322        self.driver.process_queue(queue_name, callback).await
323    }
324
325    /// Disconnects the underlying driver (if necessary).
326    pub async fn disconnect(&self) -> Result<()> {
327        self.driver.disconnect().await
328    }
329
330    /// Checks the health of the underlying queue driver.
331    pub async fn check_health(&self) -> Result<()> {
332        self.driver.check_health().await
333    }
334}