Skip to main content

sockudo_queue/
memory_queue_manager.rs

1use crate::ArcJobProcessorFn;
2use ahash::AHashMap as HashMap;
3use async_trait::async_trait;
4use dashmap::DashMap;
5use sockudo_core::queue::QueueInterface;
6use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
7use std::collections::VecDeque;
8use std::sync::Arc;
9use std::sync::RwLock;
10use std::time::Duration;
11use tracing::{debug, info, warn};
12
13/// Maximum number of jobs per queue to prevent unbounded memory growth
14const MAX_QUEUE_SIZE: usize = 100_000;
15
16/// Memory-based queue manager for simple deployments
17pub struct MemoryQueueManager {
18    queues: Arc<DashMap<String, VecDeque<JobData>, ahash::RandomState>>,
19    processors: Arc<RwLock<HashMap<String, ArcJobProcessorFn>>>,
20}
21
22impl Default for MemoryQueueManager {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl MemoryQueueManager {
29    pub fn new() -> Self {
30        let queues = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
31        let processors = Arc::new(RwLock::new(HashMap::new()));
32
33        Self { queues, processors }
34    }
35
36    /// Starts the background processing loop. Should be called once after setup.
37    pub fn start_processing(&self) {
38        let queues = Arc::clone(&self.queues);
39        let processors = Arc::clone(&self.processors);
40
41        info!("{}", "Starting memory queue processing loop...".to_string());
42
43        tokio::spawn(async move {
44            let mut interval = tokio::time::interval(Duration::from_millis(500));
45
46            loop {
47                interval.tick().await;
48
49                let queue_names: Vec<String> =
50                    queues.iter().map(|entry| entry.key().clone()).collect();
51
52                for queue_name in queue_names {
53                    let processor = {
54                        let processors_guard = processors.read().unwrap();
55                        processors_guard.get(&queue_name).cloned()
56                    };
57
58                    if let Some(processor) = processor
59                        && let Some(mut jobs_queue) = queues.get_mut(&queue_name)
60                    {
61                        if jobs_queue.is_empty() {
62                            continue;
63                        }
64
65                        let jobs: Vec<JobData> = jobs_queue.drain(..).collect();
66                        debug!(
67                            "Processing {} jobs from memory queue {}",
68                            jobs.len(),
69                            queue_name
70                        );
71                        drop(jobs_queue);
72
73                        for job in jobs {
74                            let processor_clone = processor.clone();
75                            tokio::spawn(async move {
76                                if let Err(e) = processor_clone(job).await {
77                                    tracing::error!("Failed to process webhook job: {}", e);
78                                }
79                            });
80                        }
81                    }
82                }
83            }
84        });
85    }
86}
87
88#[async_trait]
89impl QueueInterface for MemoryQueueManager {
90    async fn add_to_queue(
91        &self,
92        queue_name: &str,
93        data: JobData,
94    ) -> sockudo_core::error::Result<()> {
95        let mut queue = self.queues.entry(queue_name.to_string()).or_default();
96
97        if queue.len() >= MAX_QUEUE_SIZE {
98            let to_remove = queue.len() - MAX_QUEUE_SIZE + 1;
99            warn!(
100                "Memory queue '{}' at capacity ({}), dropping {} oldest job(s)",
101                queue_name, MAX_QUEUE_SIZE, to_remove
102            );
103            for _ in 0..to_remove {
104                queue.pop_front();
105            }
106        }
107
108        queue.push_back(data);
109        Ok(())
110    }
111
112    async fn process_queue(
113        &self,
114        queue_name: &str,
115        callback: JobProcessorFnAsync,
116    ) -> sockudo_core::error::Result<()> {
117        self.processors
118            .write()
119            .unwrap()
120            .insert(queue_name.to_string(), Arc::from(callback));
121        debug!("Registered processor for memory queue: {}", queue_name);
122
123        Ok(())
124    }
125
126    async fn disconnect(&self) -> sockudo_core::error::Result<()> {
127        self.queues.clear();
128        self.processors.write().unwrap().clear();
129        Ok(())
130    }
131
132    async fn check_health(&self) -> sockudo_core::error::Result<()> {
133        Ok(())
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use sockudo_core::webhook_types::{JobData, JobPayload};
141
142    #[tokio::test]
143    async fn test_add_to_queue() {
144        let manager = MemoryQueueManager::new();
145        let data = JobData {
146            app_key: "test_key".to_string(),
147            app_id: "test_id".to_string(),
148            app_secret: "test_secret".to_string(),
149            payload: JobPayload {
150                time_ms: chrono::Utc::now().timestamp_millis(),
151                events: vec![],
152            },
153            original_signature: "test_signature".to_string(),
154        };
155
156        manager
157            .add_to_queue("test_queue", data.clone())
158            .await
159            .unwrap();
160
161        assert_eq!(manager.queues.get("test_queue").unwrap().len(), 1);
162    }
163
164    #[tokio::test]
165    async fn test_disconnect() {
166        let manager = MemoryQueueManager::new();
167        let data = JobData {
168            app_key: "test_key".to_string(),
169            app_id: "test_id".to_string(),
170            app_secret: "test_secret".to_string(),
171            payload: JobPayload {
172                time_ms: chrono::Utc::now().timestamp_millis(),
173                events: vec![],
174            },
175            original_signature: "test_signature".to_string(),
176        };
177
178        manager.add_to_queue("test_queue", data).await.unwrap();
179        assert!(!manager.queues.is_empty());
180
181        manager.disconnect().await.unwrap();
182        assert!(manager.queues.is_empty());
183    }
184}