1use sockudo_core::error::Result;
2
3#[cfg(feature = "google-pubsub")]
4use sockudo_core::options::GooglePubSubAdapterConfig;
5#[cfg(feature = "iggy")]
6use sockudo_core::options::IggyConfig;
7#[cfg(feature = "kafka")]
8use sockudo_core::options::KafkaAdapterConfig;
9#[cfg(feature = "nats")]
10use sockudo_core::options::NatsAdapterConfig;
11#[cfg(feature = "pulsar")]
12use sockudo_core::options::PulsarAdapterConfig;
13#[cfg(feature = "rabbitmq")]
14use sockudo_core::options::RabbitMqAdapterConfig;
15#[cfg(feature = "sns")]
16use sockudo_core::options::SnsQueueConfig;
17#[cfg(feature = "sqs")]
18use sockudo_core::options::SqsQueueConfig;
19use sockudo_core::queue::QueueInterface;
20use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
21
22#[cfg(feature = "google-pubsub")]
23use crate::google_pubsub_queue_manager::GooglePubSubQueueManager;
24#[cfg(feature = "iggy")]
25use crate::iggy_queue_manager::IggyQueueManager;
26#[cfg(feature = "kafka")]
27use crate::kafka_queue_manager::KafkaQueueManager;
28use crate::memory_queue_manager::MemoryQueueManager;
29#[cfg(feature = "nats")]
30use crate::nats_queue_manager::NatsJetStreamQueueManager;
31#[cfg(feature = "pulsar")]
32use crate::pulsar_queue_manager::PulsarQueueManager;
33#[cfg(feature = "rabbitmq")]
34use crate::rabbitmq_queue_manager::RabbitMqQueueManager;
35#[cfg(feature = "redis-cluster")]
36use crate::redis_cluster_queue_manager::RedisClusterQueueManager;
37#[cfg(feature = "redis")]
38use crate::redis_queue_manager::RedisQueueManager;
39#[cfg(feature = "sns")]
40use crate::sns_queue_manager::SnsQueueManager;
41#[cfg(feature = "sqs")]
42use crate::sqs_queue_manager::SqsQueueManager;
43use tracing::*;
44
45pub struct QueueManagerFactory;
47
48impl QueueManagerFactory {
49 #[allow(unused_variables)]
51 pub async fn create(
52 driver: &str,
53 redis_url: Option<&str>,
54 prefix: Option<&str>,
55 concurrency: Option<usize>,
56 ) -> Result<Box<dyn QueueInterface>> {
57 match driver {
58 #[cfg(feature = "redis")]
59 "redis" => {
60 let url = redis_url.unwrap_or("redis://127.0.0.1:6379/");
61 let prefix_str = prefix.unwrap_or("sockudo");
62 let concurrency_val = concurrency.unwrap_or(5);
63 info!(
64 "Creating Redis queue manager (Prefix: {}, Concurrency: {})",
65 prefix_str, concurrency_val
66 );
67 debug!("Redis queue manager URL: {}", url);
68 let manager = RedisQueueManager::new(url, prefix_str, concurrency_val).await?;
69 Ok(Box::new(manager))
70 }
71 #[cfg(feature = "redis-cluster")]
72 "redis-cluster" => {
73 let nodes_str = redis_url.unwrap_or(
74 "redis://127.0.0.1:7000,redis://127.0.0.1:7001,redis://127.0.0.1:7002",
75 );
76 let cluster_nodes: Vec<String> =
77 nodes_str.split(',').map(|s| s.trim().to_string()).collect();
78 let prefix_str = prefix.unwrap_or("sockudo");
79 let concurrency_val = concurrency.unwrap_or(5);
80
81 info!(
82 "Creating Redis Cluster queue manager (Prefix: {}, Concurrency: {})",
83 prefix_str, concurrency_val
84 );
85 debug!("Redis Cluster queue manager nodes: {:?}", cluster_nodes);
86
87 let manager =
88 RedisClusterQueueManager::new(cluster_nodes, prefix_str, concurrency_val)
89 .await?;
90 Ok(Box::new(manager))
91 }
92 #[cfg(feature = "nats")]
93 "nats" => {
94 warn!(
95 "NATS queue manager should be created via create_nats(). Falling back to memory queue."
96 );
97 let manager = MemoryQueueManager::new();
98 manager.start_processing();
99 Ok(Box::new(manager))
100 }
101 "memory" => {
102 info!("{}", "Creating Memory queue manager".to_string());
103 let manager = MemoryQueueManager::new();
104 manager.start_processing();
105 Ok(Box::new(manager))
106 }
107 #[cfg(not(feature = "redis"))]
108 "redis" => {
109 warn!(
110 "Redis queue manager requested but not compiled in. Falling back to memory queue."
111 );
112 let manager = MemoryQueueManager::new();
113 manager.start_processing();
114 Ok(Box::new(manager))
115 }
116 #[cfg(not(feature = "redis-cluster"))]
117 "redis-cluster" => {
118 warn!(
119 "Redis Cluster queue manager requested but not compiled in. Falling back to memory queue."
120 );
121 let manager = MemoryQueueManager::new();
122 manager.start_processing();
123 Ok(Box::new(manager))
124 }
125 #[cfg(feature = "sqs")]
126 "sqs" => {
127 warn!(
128 "SQS queue manager should be created via create_sqs(). Falling back to memory queue."
129 );
130 let manager = MemoryQueueManager::new();
131 manager.start_processing();
132 Ok(Box::new(manager))
133 }
134 #[cfg(not(feature = "sqs"))]
135 "sqs" => {
136 warn!(
137 "SQS queue manager requested but not compiled in. Falling back to memory queue."
138 );
139 let manager = MemoryQueueManager::new();
140 manager.start_processing();
141 Ok(Box::new(manager))
142 }
143 #[cfg(feature = "sns")]
144 "sns" => {
145 warn!(
146 "SNS queue manager should be created via create_sns(). Falling back to memory queue."
147 );
148 let manager = MemoryQueueManager::new();
149 manager.start_processing();
150 Ok(Box::new(manager))
151 }
152 #[cfg(not(feature = "sns"))]
153 "sns" => {
154 warn!(
155 "SNS queue manager requested but not compiled in. Falling back to memory queue."
156 );
157 let manager = MemoryQueueManager::new();
158 manager.start_processing();
159 Ok(Box::new(manager))
160 }
161 other => Err(sockudo_core::error::Error::Queue(format!(
162 "Unsupported queue driver: {other}"
163 ))),
164 }
165 }
166
167 #[cfg(feature = "sqs")]
169 pub async fn create_sqs(config: SqsQueueConfig) -> Result<Box<dyn QueueInterface>> {
170 info!(
171 "Creating SQS queue manager (Region: {}, Concurrency: {}, FIFO: {})",
172 config.region, config.concurrency, config.fifo
173 );
174 if let Some(ref url_prefix) = config.queue_url_prefix {
175 debug!("SQS queue URL prefix: {}", url_prefix);
176 }
177 let manager = SqsQueueManager::new(config).await?;
178 Ok(Box::new(manager))
179 }
180
181 #[cfg(not(feature = "sqs"))]
182 #[allow(unused_variables)]
183 pub async fn create_sqs(
184 config: sockudo_core::options::SqsQueueConfig,
185 ) -> Result<Box<dyn QueueInterface>> {
186 warn!("SQS queue manager requested but not compiled in. Falling back to memory queue.");
187 let manager = MemoryQueueManager::new();
188 manager.start_processing();
189 Ok(Box::new(manager))
190 }
191
192 #[cfg(feature = "sns")]
193 pub async fn create_sns(config: SnsQueueConfig) -> Result<Box<dyn QueueInterface>> {
194 info!(
195 "Creating SNS queue manager (Region: {}, Topic: {})",
196 config.region, config.topic_arn
197 );
198 let manager = SnsQueueManager::new(config).await?;
199 Ok(Box::new(manager))
200 }
201
202 #[cfg(not(feature = "sns"))]
203 #[allow(unused_variables)]
204 pub async fn create_sns(
205 config: sockudo_core::options::SnsQueueConfig,
206 ) -> Result<Box<dyn QueueInterface>> {
207 warn!("SNS queue manager requested but not compiled in. Falling back to memory queue.");
208 let manager = MemoryQueueManager::new();
209 manager.start_processing();
210 Ok(Box::new(manager))
211 }
212
213 #[cfg(feature = "rabbitmq")]
214 pub async fn create_rabbitmq(config: RabbitMqAdapterConfig) -> Result<Box<dyn QueueInterface>> {
215 let manager = RabbitMqQueueManager::new(config).await?;
216 Ok(Box::new(manager))
217 }
218
219 #[cfg(not(feature = "rabbitmq"))]
220 #[allow(unused_variables)]
221 pub async fn create_rabbitmq(
222 config: sockudo_core::options::RabbitMqAdapterConfig,
223 ) -> Result<Box<dyn QueueInterface>> {
224 warn!(
225 "RabbitMQ queue manager requested but not compiled in. Falling back to memory queue."
226 );
227 let manager = MemoryQueueManager::new();
228 manager.start_processing();
229 Ok(Box::new(manager))
230 }
231
232 #[cfg(feature = "kafka")]
233 pub async fn create_kafka(config: KafkaAdapterConfig) -> Result<Box<dyn QueueInterface>> {
234 let manager = KafkaQueueManager::new(config).await?;
235 Ok(Box::new(manager))
236 }
237
238 #[cfg(not(feature = "kafka"))]
239 #[allow(unused_variables)]
240 pub async fn create_kafka(
241 config: sockudo_core::options::KafkaAdapterConfig,
242 ) -> Result<Box<dyn QueueInterface>> {
243 warn!("Kafka queue manager requested but not compiled in. Falling back to memory queue.");
244 let manager = MemoryQueueManager::new();
245 manager.start_processing();
246 Ok(Box::new(manager))
247 }
248
249 #[cfg(feature = "iggy")]
250 pub async fn create_iggy(config: IggyConfig) -> Result<Box<dyn QueueInterface>> {
251 let manager = IggyQueueManager::new(config).await?;
252 Ok(Box::new(manager))
253 }
254
255 #[cfg(not(feature = "iggy"))]
256 #[allow(unused_variables)]
257 pub async fn create_iggy(
258 config: sockudo_core::options::IggyConfig,
259 ) -> Result<Box<dyn QueueInterface>> {
260 warn!(
261 "Apache Iggy queue manager requested but not compiled in. Falling back to memory queue."
262 );
263 let manager = MemoryQueueManager::new();
264 manager.start_processing();
265 Ok(Box::new(manager))
266 }
267
268 #[cfg(feature = "pulsar")]
269 pub async fn create_pulsar(config: PulsarAdapterConfig) -> Result<Box<dyn QueueInterface>> {
270 let manager = PulsarQueueManager::new(config).await?;
271 Ok(Box::new(manager))
272 }
273
274 #[cfg(not(feature = "pulsar"))]
275 #[allow(unused_variables)]
276 pub async fn create_pulsar(
277 config: sockudo_core::options::PulsarAdapterConfig,
278 ) -> Result<Box<dyn QueueInterface>> {
279 warn!("Pulsar queue manager requested but not compiled in. Falling back to memory queue.");
280 let manager = MemoryQueueManager::new();
281 manager.start_processing();
282 Ok(Box::new(manager))
283 }
284
285 #[cfg(feature = "google-pubsub")]
286 pub async fn create_google_pubsub(
287 config: GooglePubSubAdapterConfig,
288 ) -> Result<Box<dyn QueueInterface>> {
289 let manager = GooglePubSubQueueManager::new(config).await?;
290 Ok(Box::new(manager))
291 }
292
293 #[cfg(not(feature = "google-pubsub"))]
294 #[allow(unused_variables)]
295 pub async fn create_google_pubsub(
296 config: sockudo_core::options::GooglePubSubAdapterConfig,
297 ) -> Result<Box<dyn QueueInterface>> {
298 warn!(
299 "Google Pub/Sub queue manager requested but not compiled in. Falling back to memory queue."
300 );
301 let manager = MemoryQueueManager::new();
302 manager.start_processing();
303 Ok(Box::new(manager))
304 }
305
306 #[cfg(feature = "nats")]
307 pub async fn create_nats(config: NatsAdapterConfig) -> Result<Box<dyn QueueInterface>> {
308 let manager = NatsJetStreamQueueManager::new(config).await?;
309 Ok(Box::new(manager))
310 }
311
312 #[cfg(not(feature = "nats"))]
313 #[allow(unused_variables)]
314 pub async fn create_nats(
315 config: sockudo_core::options::NatsAdapterConfig,
316 ) -> Result<Box<dyn QueueInterface>> {
317 warn!("NATS queue manager requested but not compiled in. Falling back to memory queue.");
318 let manager = MemoryQueueManager::new();
319 manager.start_processing();
320 Ok(Box::new(manager))
321 }
322}
323
324pub struct QueueManager {
325 driver: Box<dyn QueueInterface>,
326}
327
328impl QueueManager {
329 pub fn new(driver: Box<dyn QueueInterface>) -> Self {
331 Self { driver }
332 }
333
334 pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
336 self.driver.add_to_queue(queue_name, data).await
337 }
338
339 pub async fn process_queue(
341 &self,
342 queue_name: &str,
343 callback: JobProcessorFnAsync,
344 ) -> Result<()> {
345 self.driver.process_queue(queue_name, callback).await
346 }
347
348 pub async fn disconnect(&self) -> Result<()> {
350 self.driver.disconnect().await
351 }
352
353 pub async fn check_health(&self) -> Result<()> {
355 self.driver.check_health().await
356 }
357}