1use sockudo_core::error::Result;
2
3#[cfg(feature = "sns")]
4use sockudo_core::options::SnsQueueConfig;
5#[cfg(feature = "sqs")]
6use sockudo_core::options::SqsQueueConfig;
7use sockudo_core::queue::QueueInterface;
8use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
9
10use crate::memory_queue_manager::MemoryQueueManager;
11#[cfg(feature = "redis-cluster")]
12use crate::redis_cluster_queue_manager::RedisClusterQueueManager;
13#[cfg(feature = "redis")]
14use crate::redis_queue_manager::RedisQueueManager;
15#[cfg(feature = "sns")]
16use crate::sns_queue_manager::SnsQueueManager;
17#[cfg(feature = "sqs")]
18use crate::sqs_queue_manager::SqsQueueManager;
19use tracing::*;
20
21pub struct QueueManagerFactory;
23
24impl QueueManagerFactory {
25 #[allow(unused_variables)]
27 pub async fn create(
28 driver: &str,
29 redis_url: Option<&str>,
30 prefix: Option<&str>,
31 concurrency: Option<usize>,
32 ) -> Result<Box<dyn QueueInterface>> {
33 match driver {
34 #[cfg(feature = "redis")]
35 "redis" => {
36 let url = redis_url.unwrap_or("redis://127.0.0.1:6379/");
37 let prefix_str = prefix.unwrap_or("sockudo");
38 let concurrency_val = concurrency.unwrap_or(5);
39 info!(
40 "Creating Redis queue manager (Prefix: {}, Concurrency: {})",
41 prefix_str, concurrency_val
42 );
43 debug!("Redis queue manager URL: {}", url);
44 let manager = RedisQueueManager::new(url, prefix_str, concurrency_val).await?;
45 Ok(Box::new(manager))
46 }
47 #[cfg(feature = "redis-cluster")]
48 "redis-cluster" => {
49 let nodes_str = redis_url.unwrap_or(
50 "redis://127.0.0.1:7000,redis://127.0.0.1:7001,redis://127.0.0.1:7002",
51 );
52 let cluster_nodes: Vec<String> =
53 nodes_str.split(',').map(|s| s.trim().to_string()).collect();
54 let prefix_str = prefix.unwrap_or("sockudo");
55 let concurrency_val = concurrency.unwrap_or(5);
56
57 info!(
58 "Creating Redis Cluster queue manager (Prefix: {}, Concurrency: {})",
59 prefix_str, concurrency_val
60 );
61 debug!("Redis Cluster queue manager nodes: {:?}", cluster_nodes);
62
63 let manager =
64 RedisClusterQueueManager::new(cluster_nodes, prefix_str, concurrency_val)
65 .await?;
66 Ok(Box::new(manager))
67 }
68 "memory" => {
69 info!("{}", "Creating Memory queue manager".to_string());
70 let manager = MemoryQueueManager::new();
71 manager.start_processing();
72 Ok(Box::new(manager))
73 }
74 #[cfg(not(feature = "redis"))]
75 "redis" => {
76 warn!(
77 "Redis queue manager requested but not compiled in. Falling back to memory queue."
78 );
79 let manager = MemoryQueueManager::new();
80 manager.start_processing();
81 Ok(Box::new(manager))
82 }
83 #[cfg(not(feature = "redis-cluster"))]
84 "redis-cluster" => {
85 warn!(
86 "Redis Cluster queue manager requested but not compiled in. Falling back to memory queue."
87 );
88 let manager = MemoryQueueManager::new();
89 manager.start_processing();
90 Ok(Box::new(manager))
91 }
92 #[cfg(feature = "sqs")]
93 "sqs" => {
94 warn!(
95 "SQS queue manager should be created via create_sqs(). Falling back to memory queue."
96 );
97 let manager = MemoryQueueManager::new();
98 manager.start_processing();
99 Ok(Box::new(manager))
100 }
101 #[cfg(not(feature = "sqs"))]
102 "sqs" => {
103 warn!(
104 "SQS queue manager requested but not compiled in. Falling back to memory queue."
105 );
106 let manager = MemoryQueueManager::new();
107 manager.start_processing();
108 Ok(Box::new(manager))
109 }
110 #[cfg(feature = "sns")]
111 "sns" => {
112 warn!(
113 "SNS queue manager should be created via create_sns(). Falling back to memory queue."
114 );
115 let manager = MemoryQueueManager::new();
116 manager.start_processing();
117 Ok(Box::new(manager))
118 }
119 #[cfg(not(feature = "sns"))]
120 "sns" => {
121 warn!(
122 "SNS queue manager requested but not compiled in. Falling back to memory queue."
123 );
124 let manager = MemoryQueueManager::new();
125 manager.start_processing();
126 Ok(Box::new(manager))
127 }
128 other => Err(sockudo_core::error::Error::Queue(format!(
129 "Unsupported queue driver: {other}"
130 ))),
131 }
132 }
133
134 #[cfg(feature = "sqs")]
136 pub async fn create_sqs(config: SqsQueueConfig) -> Result<Box<dyn QueueInterface>> {
137 info!(
138 "Creating SQS queue manager (Region: {}, Concurrency: {}, FIFO: {})",
139 config.region, config.concurrency, config.fifo
140 );
141 if let Some(ref url_prefix) = config.queue_url_prefix {
142 debug!("SQS queue URL prefix: {}", url_prefix);
143 }
144 let manager = SqsQueueManager::new(config).await?;
145 Ok(Box::new(manager))
146 }
147
148 #[cfg(not(feature = "sqs"))]
149 #[allow(unused_variables)]
150 pub async fn create_sqs(
151 config: sockudo_core::options::SqsQueueConfig,
152 ) -> Result<Box<dyn QueueInterface>> {
153 warn!("SQS queue manager requested but not compiled in. Falling back to memory queue.");
154 let manager = MemoryQueueManager::new();
155 manager.start_processing();
156 Ok(Box::new(manager))
157 }
158
159 #[cfg(feature = "sns")]
160 pub async fn create_sns(config: SnsQueueConfig) -> Result<Box<dyn QueueInterface>> {
161 info!(
162 "Creating SNS queue manager (Region: {}, Topic: {})",
163 config.region, config.topic_arn
164 );
165 let manager = SnsQueueManager::new(config).await?;
166 Ok(Box::new(manager))
167 }
168
169 #[cfg(not(feature = "sns"))]
170 #[allow(unused_variables)]
171 pub async fn create_sns(
172 config: sockudo_core::options::SnsQueueConfig,
173 ) -> Result<Box<dyn QueueInterface>> {
174 warn!("SNS queue manager requested but not compiled in. Falling back to memory queue.");
175 let manager = MemoryQueueManager::new();
176 manager.start_processing();
177 Ok(Box::new(manager))
178 }
179}
180
181pub struct QueueManager {
182 driver: Box<dyn QueueInterface>,
183}
184
185impl QueueManager {
186 pub fn new(driver: Box<dyn QueueInterface>) -> Self {
188 Self { driver }
189 }
190
191 pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
193 self.driver.add_to_queue(queue_name, data).await
194 }
195
196 pub async fn process_queue(
198 &self,
199 queue_name: &str,
200 callback: JobProcessorFnAsync,
201 ) -> Result<()> {
202 self.driver.process_queue(queue_name, callback).await
203 }
204
205 pub async fn disconnect(&self) -> Result<()> {
207 self.driver.disconnect().await
208 }
209
210 pub async fn check_health(&self) -> Result<()> {
212 self.driver.check_health().await
213 }
214}