Skip to main content

sockudo_queue/
memory_queue_manager.rs

1use crate::ArcJobProcessorFn;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use sockudo_core::queue::QueueInterface;
5use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
6use std::sync::Arc;
7use std::time::Duration;
8use tracing::{debug, info, warn};
9
10/// Maximum number of jobs per queue to prevent unbounded memory growth
11const MAX_QUEUE_SIZE: usize = 100_000;
12
13/// Memory-based queue manager for simple deployments
14pub struct MemoryQueueManager {
15    queues: Arc<DashMap<String, Vec<JobData>, ahash::RandomState>>,
16    processors: Arc<DashMap<String, ArcJobProcessorFn, ahash::RandomState>>,
17}
18
19impl Default for MemoryQueueManager {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl MemoryQueueManager {
26    pub fn new() -> Self {
27        let queues = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
28        let processors = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
29
30        Self { queues, processors }
31    }
32
33    /// Starts the background processing loop. Should be called once after setup.
34    pub fn start_processing(&self) {
35        let queues = Arc::clone(&self.queues);
36        let processors = Arc::clone(&self.processors);
37
38        info!("{}", "Starting memory queue processing loop...".to_string());
39
40        tokio::spawn(async move {
41            let mut interval = tokio::time::interval(Duration::from_millis(500));
42
43            loop {
44                interval.tick().await;
45
46                let queue_names: Vec<String> = queues
47                    .iter()
48                    .map(|entry| entry.key().clone())
49                    .filter(|name| processors.contains_key(name))
50                    .collect();
51
52                for queue_name in queue_names {
53                    if let Some(processor) = processors.get(&queue_name)
54                        && let Some((key, mut jobs_vec)) = queues.remove(&queue_name)
55                    {
56                        if !jobs_vec.is_empty() {
57                            debug!(
58                                "Processing {} jobs from memory queue {}",
59                                jobs_vec.len(),
60                                key
61                            );
62
63                            queues.insert(key, Vec::new());
64
65                            for job in jobs_vec.drain(..) {
66                                let processor_clone = Arc::clone(&processor);
67                                tokio::spawn(async move {
68                                    if let Err(e) = processor_clone(job).await {
69                                        tracing::error!("Failed to process webhook job: {}", e);
70                                    }
71                                });
72                            }
73                        } else {
74                            queues.insert(key, jobs_vec);
75                        }
76                    }
77                }
78            }
79        });
80    }
81}
82
83#[async_trait]
84impl QueueInterface for MemoryQueueManager {
85    async fn add_to_queue(
86        &self,
87        queue_name: &str,
88        data: JobData,
89    ) -> sockudo_core::error::Result<()> {
90        let mut queue = self.queues.entry(queue_name.to_string()).or_default();
91
92        if queue.len() >= MAX_QUEUE_SIZE {
93            let to_remove = queue.len() - MAX_QUEUE_SIZE + 1;
94            warn!(
95                "Memory queue '{}' at capacity ({}), dropping {} oldest job(s)",
96                queue_name, MAX_QUEUE_SIZE, to_remove
97            );
98            queue.drain(0..to_remove);
99        }
100
101        queue.push(data);
102        Ok(())
103    }
104
105    async fn process_queue(
106        &self,
107        queue_name: &str,
108        callback: JobProcessorFnAsync,
109    ) -> sockudo_core::error::Result<()> {
110        self.processors
111            .insert(queue_name.to_string(), Arc::from(callback));
112        debug!("Registered processor for memory queue: {}", queue_name);
113
114        Ok(())
115    }
116
117    async fn disconnect(&self) -> sockudo_core::error::Result<()> {
118        self.queues.clear();
119        Ok(())
120    }
121
122    async fn check_health(&self) -> sockudo_core::error::Result<()> {
123        Ok(())
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use sockudo_core::webhook_types::{JobData, JobPayload};
131
132    #[tokio::test]
133    async fn test_add_to_queue() {
134        let manager = MemoryQueueManager::new();
135        let data = JobData {
136            app_key: "test_key".to_string(),
137            app_id: "test_id".to_string(),
138            app_secret: "test_secret".to_string(),
139            payload: JobPayload {
140                time_ms: chrono::Utc::now().timestamp_millis(),
141                events: vec![],
142            },
143            original_signature: "test_signature".to_string(),
144        };
145
146        manager
147            .add_to_queue("test_queue", data.clone())
148            .await
149            .unwrap();
150
151        assert_eq!(manager.queues.get("test_queue").unwrap().len(), 1);
152    }
153
154    #[tokio::test]
155    async fn test_disconnect() {
156        let manager = MemoryQueueManager::new();
157        let data = JobData {
158            app_key: "test_key".to_string(),
159            app_id: "test_id".to_string(),
160            app_secret: "test_secret".to_string(),
161            payload: JobPayload {
162                time_ms: chrono::Utc::now().timestamp_millis(),
163                events: vec![],
164            },
165            original_signature: "test_signature".to_string(),
166        };
167
168        manager.add_to_queue("test_queue", data).await.unwrap();
169        assert!(!manager.queues.is_empty());
170
171        manager.disconnect().await.unwrap();
172        assert!(manager.queues.is_empty());
173    }
174}