sockudo_queue/
memory_queue_manager.rs1use crate::ArcJobProcessorFn;
2use ahash::AHashMap as HashMap;
3use async_trait::async_trait;
4use dashmap::DashMap;
5use futures_util::stream::{self, StreamExt};
6use sockudo_core::queue::QueueInterface;
7use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
8use std::collections::VecDeque;
9use std::sync::Arc;
10use std::sync::RwLock;
11use std::time::Duration;
12use tracing::{debug, info, warn};
13
14const MAX_QUEUE_SIZE: usize = 100_000;
16const MAX_BATCH_SIZE: usize = 1_024;
17const MAX_CONCURRENT_JOBS_PER_QUEUE: usize = 64;
18
19pub struct MemoryQueueManager {
21 queues: Arc<DashMap<String, VecDeque<JobData>, ahash::RandomState>>,
22 processors: Arc<RwLock<HashMap<String, ArcJobProcessorFn>>>,
23}
24
25impl Default for MemoryQueueManager {
26 fn default() -> Self {
27 Self::new()
28 }
29}
30
31impl MemoryQueueManager {
32 pub fn new() -> Self {
33 let queues = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
34 let processors = Arc::new(RwLock::new(HashMap::new()));
35
36 Self { queues, processors }
37 }
38
39 pub fn start_processing(&self) {
41 let queues = Arc::clone(&self.queues);
42 let processors = Arc::clone(&self.processors);
43
44 info!("{}", "Starting memory queue processing loop...".to_string());
45
46 tokio::spawn(async move {
47 let mut interval = tokio::time::interval(Duration::from_millis(500));
48
49 loop {
50 interval.tick().await;
51
52 let queue_names: Vec<String> =
53 queues.iter().map(|entry| entry.key().clone()).collect();
54
55 for queue_name in queue_names {
56 let processor = {
57 let processors_guard = processors.read().unwrap();
58 processors_guard.get(&queue_name).cloned()
59 };
60
61 if let Some(processor) = processor
62 && let Some(mut jobs_queue) = queues.get_mut(&queue_name)
63 {
64 if jobs_queue.is_empty() {
65 continue;
66 }
67
68 let batch_len = MAX_BATCH_SIZE.min(jobs_queue.len());
69 let jobs: Vec<JobData> = jobs_queue.drain(..batch_len).collect();
70 debug!(
71 "Processing {} jobs from memory queue {}",
72 jobs.len(),
73 queue_name
74 );
75 drop(jobs_queue);
76
77 stream::iter(jobs)
78 .for_each_concurrent(MAX_CONCURRENT_JOBS_PER_QUEUE, |job| {
79 let processor_clone = processor.clone();
80 async move {
81 if let Err(e) = processor_clone(job).await {
82 tracing::error!("Failed to process webhook job: {}", e);
83 }
84 }
85 })
86 .await;
87 }
88 }
89 }
90 });
91 }
92}
93
94#[async_trait]
95impl QueueInterface for MemoryQueueManager {
96 async fn add_to_queue(
97 &self,
98 queue_name: &str,
99 data: JobData,
100 ) -> sockudo_core::error::Result<()> {
101 let mut queue = self.queues.entry(queue_name.to_string()).or_default();
102
103 if queue.len() >= MAX_QUEUE_SIZE {
104 let to_remove = queue.len() - MAX_QUEUE_SIZE + 1;
105 warn!(
106 "Memory queue '{}' at capacity ({}), dropping {} oldest job(s)",
107 queue_name, MAX_QUEUE_SIZE, to_remove
108 );
109 for _ in 0..to_remove {
110 queue.pop_front();
111 }
112 }
113
114 queue.push_back(data);
115 Ok(())
116 }
117
118 async fn process_queue(
119 &self,
120 queue_name: &str,
121 callback: JobProcessorFnAsync,
122 ) -> sockudo_core::error::Result<()> {
123 self.processors
124 .write()
125 .unwrap()
126 .insert(queue_name.to_string(), Arc::from(callback));
127 debug!("Registered processor for memory queue: {}", queue_name);
128
129 Ok(())
130 }
131
132 async fn disconnect(&self) -> sockudo_core::error::Result<()> {
133 self.queues.clear();
134 self.processors.write().unwrap().clear();
135 Ok(())
136 }
137
138 async fn check_health(&self) -> sockudo_core::error::Result<()> {
139 Ok(())
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use sockudo_core::webhook_types::{JobData, JobPayload};
147 use std::sync::atomic::{AtomicUsize, Ordering};
148 use tokio::sync::Notify;
149 use tokio::time::{Duration, timeout};
150
151 #[tokio::test]
152 async fn test_add_to_queue() {
153 let manager = MemoryQueueManager::new();
154 let data = JobData {
155 app_key: "test_key".to_string(),
156 app_id: "test_id".to_string(),
157 app_secret: "test_secret".to_string(),
158 payload: JobPayload {
159 time_ms: chrono::Utc::now().timestamp_millis(),
160 events: vec![],
161 },
162 original_signature: "test_signature".to_string(),
163 };
164
165 manager
166 .add_to_queue("test_queue", data.clone())
167 .await
168 .unwrap();
169
170 assert_eq!(manager.queues.get("test_queue").unwrap().len(), 1);
171 }
172
173 #[tokio::test]
174 async fn test_disconnect() {
175 let manager = MemoryQueueManager::new();
176 let data = JobData {
177 app_key: "test_key".to_string(),
178 app_id: "test_id".to_string(),
179 app_secret: "test_secret".to_string(),
180 payload: JobPayload {
181 time_ms: chrono::Utc::now().timestamp_millis(),
182 events: vec![],
183 },
184 original_signature: "test_signature".to_string(),
185 };
186
187 manager.add_to_queue("test_queue", data).await.unwrap();
188 assert!(!manager.queues.is_empty());
189
190 manager.disconnect().await.unwrap();
191 assert!(manager.queues.is_empty());
192 }
193
194 #[tokio::test]
195 async fn processing_limits_per_queue_concurrency() {
196 let manager = MemoryQueueManager::new();
197 manager.start_processing();
198
199 let active = Arc::new(AtomicUsize::new(0));
200 let max_active = Arc::new(AtomicUsize::new(0));
201 let completed = Arc::new(AtomicUsize::new(0));
202 let done = Arc::new(Notify::new());
203 let total_jobs = MAX_CONCURRENT_JOBS_PER_QUEUE * 2;
204
205 let active_clone = Arc::clone(&active);
206 let max_active_clone = Arc::clone(&max_active);
207 let completed_clone = Arc::clone(&completed);
208 let done_clone = Arc::clone(&done);
209
210 manager
211 .process_queue(
212 "bounded",
213 Box::new(move |_job| {
214 let active = Arc::clone(&active_clone);
215 let max_active = Arc::clone(&max_active_clone);
216 let completed = Arc::clone(&completed_clone);
217 let done = Arc::clone(&done_clone);
218 Box::pin(async move {
219 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
220 max_active.fetch_max(current, Ordering::SeqCst);
221 tokio::time::sleep(Duration::from_millis(25)).await;
222 active.fetch_sub(1, Ordering::SeqCst);
223 if completed.fetch_add(1, Ordering::SeqCst) + 1 == total_jobs {
224 done.notify_waiters();
225 }
226 Ok(())
227 })
228 }),
229 )
230 .await
231 .unwrap();
232
233 for _ in 0..total_jobs {
234 manager
235 .add_to_queue(
236 "bounded",
237 JobData {
238 app_key: "test_key".to_string(),
239 app_id: "test_id".to_string(),
240 app_secret: "test_secret".to_string(),
241 payload: JobPayload {
242 time_ms: chrono::Utc::now().timestamp_millis(),
243 events: vec![],
244 },
245 original_signature: "test_signature".to_string(),
246 },
247 )
248 .await
249 .unwrap();
250 }
251
252 timeout(Duration::from_secs(3), done.notified())
253 .await
254 .expect("jobs should finish");
255
256 assert!(max_active.load(Ordering::SeqCst) <= MAX_CONCURRENT_JOBS_PER_QUEUE);
257 }
258}