1use sockudo_core::error::Result;
2
3#[cfg(feature = "google-pubsub")]
4use sockudo_core::options::GooglePubSubAdapterConfig;
5#[cfg(feature = "kafka")]
6use sockudo_core::options::KafkaAdapterConfig;
7#[cfg(feature = "nats")]
8use sockudo_core::options::NatsAdapterConfig;
9#[cfg(feature = "pulsar")]
10use sockudo_core::options::PulsarAdapterConfig;
11#[cfg(feature = "rabbitmq")]
12use sockudo_core::options::RabbitMqAdapterConfig;
13#[cfg(feature = "sns")]
14use sockudo_core::options::SnsQueueConfig;
15#[cfg(feature = "sqs")]
16use sockudo_core::options::SqsQueueConfig;
17use sockudo_core::queue::QueueInterface;
18use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
19
20#[cfg(feature = "google-pubsub")]
21use crate::google_pubsub_queue_manager::GooglePubSubQueueManager;
22#[cfg(feature = "kafka")]
23use crate::kafka_queue_manager::KafkaQueueManager;
24use crate::memory_queue_manager::MemoryQueueManager;
25#[cfg(feature = "nats")]
26use crate::nats_queue_manager::NatsJetStreamQueueManager;
27#[cfg(feature = "pulsar")]
28use crate::pulsar_queue_manager::PulsarQueueManager;
29#[cfg(feature = "rabbitmq")]
30use crate::rabbitmq_queue_manager::RabbitMqQueueManager;
31#[cfg(feature = "redis-cluster")]
32use crate::redis_cluster_queue_manager::RedisClusterQueueManager;
33#[cfg(feature = "redis")]
34use crate::redis_queue_manager::RedisQueueManager;
35#[cfg(feature = "sns")]
36use crate::sns_queue_manager::SnsQueueManager;
37#[cfg(feature = "sqs")]
38use crate::sqs_queue_manager::SqsQueueManager;
39use tracing::*;
40
41pub struct QueueManagerFactory;
43
44impl QueueManagerFactory {
45 #[allow(unused_variables)]
47 pub async fn create(
48 driver: &str,
49 redis_url: Option<&str>,
50 prefix: Option<&str>,
51 concurrency: Option<usize>,
52 ) -> Result<Box<dyn QueueInterface>> {
53 match driver {
54 #[cfg(feature = "redis")]
55 "redis" => {
56 let url = redis_url.unwrap_or("redis://127.0.0.1:6379/");
57 let prefix_str = prefix.unwrap_or("sockudo");
58 let concurrency_val = concurrency.unwrap_or(5);
59 info!(
60 "Creating Redis queue manager (Prefix: {}, Concurrency: {})",
61 prefix_str, concurrency_val
62 );
63 debug!("Redis queue manager URL: {}", url);
64 let manager = RedisQueueManager::new(url, prefix_str, concurrency_val).await?;
65 Ok(Box::new(manager))
66 }
67 #[cfg(feature = "redis-cluster")]
68 "redis-cluster" => {
69 let nodes_str = redis_url.unwrap_or(
70 "redis://127.0.0.1:7000,redis://127.0.0.1:7001,redis://127.0.0.1:7002",
71 );
72 let cluster_nodes: Vec<String> =
73 nodes_str.split(',').map(|s| s.trim().to_string()).collect();
74 let prefix_str = prefix.unwrap_or("sockudo");
75 let concurrency_val = concurrency.unwrap_or(5);
76
77 info!(
78 "Creating Redis Cluster queue manager (Prefix: {}, Concurrency: {})",
79 prefix_str, concurrency_val
80 );
81 debug!("Redis Cluster queue manager nodes: {:?}", cluster_nodes);
82
83 let manager =
84 RedisClusterQueueManager::new(cluster_nodes, prefix_str, concurrency_val)
85 .await?;
86 Ok(Box::new(manager))
87 }
88 #[cfg(feature = "nats")]
89 "nats" => {
90 warn!(
91 "NATS queue manager should be created via create_nats(). Falling back to memory queue."
92 );
93 let manager = MemoryQueueManager::new();
94 manager.start_processing();
95 Ok(Box::new(manager))
96 }
97 "memory" => {
98 info!("{}", "Creating Memory queue manager".to_string());
99 let manager = MemoryQueueManager::new();
100 manager.start_processing();
101 Ok(Box::new(manager))
102 }
103 #[cfg(not(feature = "redis"))]
104 "redis" => {
105 warn!(
106 "Redis queue manager requested but not compiled in. Falling back to memory queue."
107 );
108 let manager = MemoryQueueManager::new();
109 manager.start_processing();
110 Ok(Box::new(manager))
111 }
112 #[cfg(not(feature = "redis-cluster"))]
113 "redis-cluster" => {
114 warn!(
115 "Redis Cluster queue manager requested but not compiled in. Falling back to memory queue."
116 );
117 let manager = MemoryQueueManager::new();
118 manager.start_processing();
119 Ok(Box::new(manager))
120 }
121 #[cfg(feature = "sqs")]
122 "sqs" => {
123 warn!(
124 "SQS queue manager should be created via create_sqs(). Falling back to memory queue."
125 );
126 let manager = MemoryQueueManager::new();
127 manager.start_processing();
128 Ok(Box::new(manager))
129 }
130 #[cfg(not(feature = "sqs"))]
131 "sqs" => {
132 warn!(
133 "SQS queue manager requested but not compiled in. Falling back to memory queue."
134 );
135 let manager = MemoryQueueManager::new();
136 manager.start_processing();
137 Ok(Box::new(manager))
138 }
139 #[cfg(feature = "sns")]
140 "sns" => {
141 warn!(
142 "SNS queue manager should be created via create_sns(). Falling back to memory queue."
143 );
144 let manager = MemoryQueueManager::new();
145 manager.start_processing();
146 Ok(Box::new(manager))
147 }
148 #[cfg(not(feature = "sns"))]
149 "sns" => {
150 warn!(
151 "SNS queue manager requested but not compiled in. Falling back to memory queue."
152 );
153 let manager = MemoryQueueManager::new();
154 manager.start_processing();
155 Ok(Box::new(manager))
156 }
157 other => Err(sockudo_core::error::Error::Queue(format!(
158 "Unsupported queue driver: {other}"
159 ))),
160 }
161 }
162
163 #[cfg(feature = "sqs")]
165 pub async fn create_sqs(config: SqsQueueConfig) -> Result<Box<dyn QueueInterface>> {
166 info!(
167 "Creating SQS queue manager (Region: {}, Concurrency: {}, FIFO: {})",
168 config.region, config.concurrency, config.fifo
169 );
170 if let Some(ref url_prefix) = config.queue_url_prefix {
171 debug!("SQS queue URL prefix: {}", url_prefix);
172 }
173 let manager = SqsQueueManager::new(config).await?;
174 Ok(Box::new(manager))
175 }
176
177 #[cfg(not(feature = "sqs"))]
178 #[allow(unused_variables)]
179 pub async fn create_sqs(
180 config: sockudo_core::options::SqsQueueConfig,
181 ) -> Result<Box<dyn QueueInterface>> {
182 warn!("SQS queue manager requested but not compiled in. Falling back to memory queue.");
183 let manager = MemoryQueueManager::new();
184 manager.start_processing();
185 Ok(Box::new(manager))
186 }
187
188 #[cfg(feature = "sns")]
189 pub async fn create_sns(config: SnsQueueConfig) -> Result<Box<dyn QueueInterface>> {
190 info!(
191 "Creating SNS queue manager (Region: {}, Topic: {})",
192 config.region, config.topic_arn
193 );
194 let manager = SnsQueueManager::new(config).await?;
195 Ok(Box::new(manager))
196 }
197
198 #[cfg(not(feature = "sns"))]
199 #[allow(unused_variables)]
200 pub async fn create_sns(
201 config: sockudo_core::options::SnsQueueConfig,
202 ) -> Result<Box<dyn QueueInterface>> {
203 warn!("SNS queue manager requested but not compiled in. Falling back to memory queue.");
204 let manager = MemoryQueueManager::new();
205 manager.start_processing();
206 Ok(Box::new(manager))
207 }
208
209 #[cfg(feature = "rabbitmq")]
210 pub async fn create_rabbitmq(config: RabbitMqAdapterConfig) -> Result<Box<dyn QueueInterface>> {
211 let manager = RabbitMqQueueManager::new(config).await?;
212 Ok(Box::new(manager))
213 }
214
215 #[cfg(not(feature = "rabbitmq"))]
216 #[allow(unused_variables)]
217 pub async fn create_rabbitmq(
218 config: sockudo_core::options::RabbitMqAdapterConfig,
219 ) -> Result<Box<dyn QueueInterface>> {
220 warn!(
221 "RabbitMQ queue manager requested but not compiled in. Falling back to memory queue."
222 );
223 let manager = MemoryQueueManager::new();
224 manager.start_processing();
225 Ok(Box::new(manager))
226 }
227
228 #[cfg(feature = "kafka")]
229 pub async fn create_kafka(config: KafkaAdapterConfig) -> Result<Box<dyn QueueInterface>> {
230 let manager = KafkaQueueManager::new(config).await?;
231 Ok(Box::new(manager))
232 }
233
234 #[cfg(not(feature = "kafka"))]
235 #[allow(unused_variables)]
236 pub async fn create_kafka(
237 config: sockudo_core::options::KafkaAdapterConfig,
238 ) -> Result<Box<dyn QueueInterface>> {
239 warn!("Kafka queue manager requested but not compiled in. Falling back to memory queue.");
240 let manager = MemoryQueueManager::new();
241 manager.start_processing();
242 Ok(Box::new(manager))
243 }
244
245 #[cfg(feature = "pulsar")]
246 pub async fn create_pulsar(config: PulsarAdapterConfig) -> Result<Box<dyn QueueInterface>> {
247 let manager = PulsarQueueManager::new(config).await?;
248 Ok(Box::new(manager))
249 }
250
251 #[cfg(not(feature = "pulsar"))]
252 #[allow(unused_variables)]
253 pub async fn create_pulsar(
254 config: sockudo_core::options::PulsarAdapterConfig,
255 ) -> Result<Box<dyn QueueInterface>> {
256 warn!("Pulsar queue manager requested but not compiled in. Falling back to memory queue.");
257 let manager = MemoryQueueManager::new();
258 manager.start_processing();
259 Ok(Box::new(manager))
260 }
261
262 #[cfg(feature = "google-pubsub")]
263 pub async fn create_google_pubsub(
264 config: GooglePubSubAdapterConfig,
265 ) -> Result<Box<dyn QueueInterface>> {
266 let manager = GooglePubSubQueueManager::new(config).await?;
267 Ok(Box::new(manager))
268 }
269
270 #[cfg(not(feature = "google-pubsub"))]
271 #[allow(unused_variables)]
272 pub async fn create_google_pubsub(
273 config: sockudo_core::options::GooglePubSubAdapterConfig,
274 ) -> Result<Box<dyn QueueInterface>> {
275 warn!(
276 "Google Pub/Sub queue manager requested but not compiled in. Falling back to memory queue."
277 );
278 let manager = MemoryQueueManager::new();
279 manager.start_processing();
280 Ok(Box::new(manager))
281 }
282
283 #[cfg(feature = "nats")]
284 pub async fn create_nats(config: NatsAdapterConfig) -> Result<Box<dyn QueueInterface>> {
285 let manager = NatsJetStreamQueueManager::new(config).await?;
286 Ok(Box::new(manager))
287 }
288
289 #[cfg(not(feature = "nats"))]
290 #[allow(unused_variables)]
291 pub async fn create_nats(
292 config: sockudo_core::options::NatsAdapterConfig,
293 ) -> Result<Box<dyn QueueInterface>> {
294 warn!("NATS queue manager requested but not compiled in. Falling back to memory queue.");
295 let manager = MemoryQueueManager::new();
296 manager.start_processing();
297 Ok(Box::new(manager))
298 }
299}
300
301pub struct QueueManager {
302 driver: Box<dyn QueueInterface>,
303}
304
305impl QueueManager {
306 pub fn new(driver: Box<dyn QueueInterface>) -> Self {
308 Self { driver }
309 }
310
311 pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
313 self.driver.add_to_queue(queue_name, data).await
314 }
315
316 pub async fn process_queue(
318 &self,
319 queue_name: &str,
320 callback: JobProcessorFnAsync,
321 ) -> Result<()> {
322 self.driver.process_queue(queue_name, callback).await
323 }
324
325 pub async fn disconnect(&self) -> Result<()> {
327 self.driver.disconnect().await
328 }
329
330 pub async fn check_health(&self) -> Result<()> {
332 self.driver.check_health().await
333 }
334}