Skip to main content

sockudo_queue/
memory_queue_manager.rs

1use crate::ArcJobProcessorFn;
2use ahash::AHashMap as HashMap;
3use async_trait::async_trait;
4use dashmap::DashMap;
5use futures_util::stream::{self, StreamExt};
6use sockudo_core::queue::QueueInterface;
7use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
8use std::collections::VecDeque;
9use std::sync::Arc;
10use std::sync::RwLock;
11use std::time::Duration;
12use tracing::{debug, info, warn};
13
14/// Maximum number of jobs per queue to prevent unbounded memory growth
15const MAX_QUEUE_SIZE: usize = 100_000;
16const MAX_BATCH_SIZE: usize = 1_024;
17const MAX_CONCURRENT_JOBS_PER_QUEUE: usize = 64;
18
19/// Memory-based queue manager for simple deployments
20pub struct MemoryQueueManager {
21    queues: Arc<DashMap<String, VecDeque<JobData>, ahash::RandomState>>,
22    processors: Arc<RwLock<HashMap<String, ArcJobProcessorFn>>>,
23}
24
25impl Default for MemoryQueueManager {
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31impl MemoryQueueManager {
32    pub fn new() -> Self {
33        let queues = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
34        let processors = Arc::new(RwLock::new(HashMap::new()));
35
36        Self { queues, processors }
37    }
38
39    /// Starts the background processing loop. Should be called once after setup.
40    pub fn start_processing(&self) {
41        let queues = Arc::clone(&self.queues);
42        let processors = Arc::clone(&self.processors);
43
44        info!("{}", "Starting memory queue processing loop...".to_string());
45
46        tokio::spawn(async move {
47            let mut interval = tokio::time::interval(Duration::from_millis(500));
48
49            loop {
50                interval.tick().await;
51
52                let queue_names: Vec<String> =
53                    queues.iter().map(|entry| entry.key().clone()).collect();
54
55                for queue_name in queue_names {
56                    let processor = {
57                        let processors_guard = processors.read().unwrap();
58                        processors_guard.get(&queue_name).cloned()
59                    };
60
61                    if let Some(processor) = processor
62                        && let Some(mut jobs_queue) = queues.get_mut(&queue_name)
63                    {
64                        if jobs_queue.is_empty() {
65                            continue;
66                        }
67
68                        let batch_len = MAX_BATCH_SIZE.min(jobs_queue.len());
69                        let jobs: Vec<JobData> = jobs_queue.drain(..batch_len).collect();
70                        debug!(
71                            "Processing {} jobs from memory queue {}",
72                            jobs.len(),
73                            queue_name
74                        );
75                        drop(jobs_queue);
76
77                        stream::iter(jobs)
78                            .for_each_concurrent(MAX_CONCURRENT_JOBS_PER_QUEUE, |job| {
79                                let processor_clone = processor.clone();
80                                async move {
81                                    if let Err(e) = processor_clone(job).await {
82                                        tracing::error!("Failed to process webhook job: {}", e);
83                                    }
84                                }
85                            })
86                            .await;
87                    }
88                }
89            }
90        });
91    }
92}
93
94#[async_trait]
95impl QueueInterface for MemoryQueueManager {
96    async fn add_to_queue(
97        &self,
98        queue_name: &str,
99        data: JobData,
100    ) -> sockudo_core::error::Result<()> {
101        let mut queue = self.queues.entry(queue_name.to_string()).or_default();
102
103        if queue.len() >= MAX_QUEUE_SIZE {
104            let to_remove = queue.len() - MAX_QUEUE_SIZE + 1;
105            warn!(
106                "Memory queue '{}' at capacity ({}), dropping {} oldest job(s)",
107                queue_name, MAX_QUEUE_SIZE, to_remove
108            );
109            for _ in 0..to_remove {
110                queue.pop_front();
111            }
112        }
113
114        queue.push_back(data);
115        Ok(())
116    }
117
118    async fn process_queue(
119        &self,
120        queue_name: &str,
121        callback: JobProcessorFnAsync,
122    ) -> sockudo_core::error::Result<()> {
123        self.processors
124            .write()
125            .unwrap()
126            .insert(queue_name.to_string(), Arc::from(callback));
127        debug!("Registered processor for memory queue: {}", queue_name);
128
129        Ok(())
130    }
131
132    async fn disconnect(&self) -> sockudo_core::error::Result<()> {
133        self.queues.clear();
134        self.processors.write().unwrap().clear();
135        Ok(())
136    }
137
138    async fn check_health(&self) -> sockudo_core::error::Result<()> {
139        Ok(())
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use sockudo_core::webhook_types::{JobData, JobPayload};
147    use std::sync::atomic::{AtomicUsize, Ordering};
148    use tokio::sync::Notify;
149    use tokio::time::{Duration, timeout};
150
151    #[tokio::test]
152    async fn test_add_to_queue() {
153        let manager = MemoryQueueManager::new();
154        let data = JobData {
155            app_key: "test_key".to_string(),
156            app_id: "test_id".to_string(),
157            app_secret: "test_secret".to_string(),
158            payload: JobPayload {
159                time_ms: chrono::Utc::now().timestamp_millis(),
160                events: vec![],
161            },
162            original_signature: "test_signature".to_string(),
163        };
164
165        manager
166            .add_to_queue("test_queue", data.clone())
167            .await
168            .unwrap();
169
170        assert_eq!(manager.queues.get("test_queue").unwrap().len(), 1);
171    }
172
173    #[tokio::test]
174    async fn test_disconnect() {
175        let manager = MemoryQueueManager::new();
176        let data = JobData {
177            app_key: "test_key".to_string(),
178            app_id: "test_id".to_string(),
179            app_secret: "test_secret".to_string(),
180            payload: JobPayload {
181                time_ms: chrono::Utc::now().timestamp_millis(),
182                events: vec![],
183            },
184            original_signature: "test_signature".to_string(),
185        };
186
187        manager.add_to_queue("test_queue", data).await.unwrap();
188        assert!(!manager.queues.is_empty());
189
190        manager.disconnect().await.unwrap();
191        assert!(manager.queues.is_empty());
192    }
193
194    #[tokio::test]
195    async fn processing_limits_per_queue_concurrency() {
196        let manager = MemoryQueueManager::new();
197        manager.start_processing();
198
199        let active = Arc::new(AtomicUsize::new(0));
200        let max_active = Arc::new(AtomicUsize::new(0));
201        let completed = Arc::new(AtomicUsize::new(0));
202        let done = Arc::new(Notify::new());
203        let total_jobs = MAX_CONCURRENT_JOBS_PER_QUEUE * 2;
204
205        let active_clone = Arc::clone(&active);
206        let max_active_clone = Arc::clone(&max_active);
207        let completed_clone = Arc::clone(&completed);
208        let done_clone = Arc::clone(&done);
209
210        manager
211            .process_queue(
212                "bounded",
213                Box::new(move |_job| {
214                    let active = Arc::clone(&active_clone);
215                    let max_active = Arc::clone(&max_active_clone);
216                    let completed = Arc::clone(&completed_clone);
217                    let done = Arc::clone(&done_clone);
218                    Box::pin(async move {
219                        let current = active.fetch_add(1, Ordering::SeqCst) + 1;
220                        max_active.fetch_max(current, Ordering::SeqCst);
221                        tokio::time::sleep(Duration::from_millis(25)).await;
222                        active.fetch_sub(1, Ordering::SeqCst);
223                        if completed.fetch_add(1, Ordering::SeqCst) + 1 == total_jobs {
224                            done.notify_waiters();
225                        }
226                        Ok(())
227                    })
228                }),
229            )
230            .await
231            .unwrap();
232
233        for _ in 0..total_jobs {
234            manager
235                .add_to_queue(
236                    "bounded",
237                    JobData {
238                        app_key: "test_key".to_string(),
239                        app_id: "test_id".to_string(),
240                        app_secret: "test_secret".to_string(),
241                        payload: JobPayload {
242                            time_ms: chrono::Utc::now().timestamp_millis(),
243                            events: vec![],
244                        },
245                        original_signature: "test_signature".to_string(),
246                    },
247                )
248                .await
249                .unwrap();
250        }
251
252        timeout(Duration::from_secs(3), done.notified())
253            .await
254            .expect("jobs should finish");
255
256        assert!(max_active.load(Ordering::SeqCst) <= MAX_CONCURRENT_JOBS_PER_QUEUE);
257    }
258}