sockudo_queue/
memory_queue_manager.rs1use crate::ArcJobProcessorFn;
2use ahash::AHashMap as HashMap;
3use async_trait::async_trait;
4use dashmap::DashMap;
5use sockudo_core::queue::QueueInterface;
6use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
7use std::collections::VecDeque;
8use std::sync::Arc;
9use std::sync::RwLock;
10use std::time::Duration;
11use tracing::{debug, info, warn};
12
13const MAX_QUEUE_SIZE: usize = 100_000;
15
16pub struct MemoryQueueManager {
18 queues: Arc<DashMap<String, VecDeque<JobData>, ahash::RandomState>>,
19 processors: Arc<RwLock<HashMap<String, ArcJobProcessorFn>>>,
20}
21
22impl Default for MemoryQueueManager {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl MemoryQueueManager {
29 pub fn new() -> Self {
30 let queues = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
31 let processors = Arc::new(RwLock::new(HashMap::new()));
32
33 Self { queues, processors }
34 }
35
36 pub fn start_processing(&self) {
38 let queues = Arc::clone(&self.queues);
39 let processors = Arc::clone(&self.processors);
40
41 info!("{}", "Starting memory queue processing loop...".to_string());
42
43 tokio::spawn(async move {
44 let mut interval = tokio::time::interval(Duration::from_millis(500));
45
46 loop {
47 interval.tick().await;
48
49 let queue_names: Vec<String> =
50 queues.iter().map(|entry| entry.key().clone()).collect();
51
52 for queue_name in queue_names {
53 let processor = {
54 let processors_guard = processors.read().unwrap();
55 processors_guard.get(&queue_name).cloned()
56 };
57
58 if let Some(processor) = processor
59 && let Some(mut jobs_queue) = queues.get_mut(&queue_name)
60 {
61 if jobs_queue.is_empty() {
62 continue;
63 }
64
65 let jobs: Vec<JobData> = jobs_queue.drain(..).collect();
66 debug!(
67 "Processing {} jobs from memory queue {}",
68 jobs.len(),
69 queue_name
70 );
71 drop(jobs_queue);
72
73 for job in jobs {
74 let processor_clone = processor.clone();
75 tokio::spawn(async move {
76 if let Err(e) = processor_clone(job).await {
77 tracing::error!("Failed to process webhook job: {}", e);
78 }
79 });
80 }
81 }
82 }
83 }
84 });
85 }
86}
87
88#[async_trait]
89impl QueueInterface for MemoryQueueManager {
90 async fn add_to_queue(
91 &self,
92 queue_name: &str,
93 data: JobData,
94 ) -> sockudo_core::error::Result<()> {
95 let mut queue = self.queues.entry(queue_name.to_string()).or_default();
96
97 if queue.len() >= MAX_QUEUE_SIZE {
98 let to_remove = queue.len() - MAX_QUEUE_SIZE + 1;
99 warn!(
100 "Memory queue '{}' at capacity ({}), dropping {} oldest job(s)",
101 queue_name, MAX_QUEUE_SIZE, to_remove
102 );
103 for _ in 0..to_remove {
104 queue.pop_front();
105 }
106 }
107
108 queue.push_back(data);
109 Ok(())
110 }
111
112 async fn process_queue(
113 &self,
114 queue_name: &str,
115 callback: JobProcessorFnAsync,
116 ) -> sockudo_core::error::Result<()> {
117 self.processors
118 .write()
119 .unwrap()
120 .insert(queue_name.to_string(), Arc::from(callback));
121 debug!("Registered processor for memory queue: {}", queue_name);
122
123 Ok(())
124 }
125
126 async fn disconnect(&self) -> sockudo_core::error::Result<()> {
127 self.queues.clear();
128 self.processors.write().unwrap().clear();
129 Ok(())
130 }
131
132 async fn check_health(&self) -> sockudo_core::error::Result<()> {
133 Ok(())
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use sockudo_core::webhook_types::{JobData, JobPayload};
141
142 #[tokio::test]
143 async fn test_add_to_queue() {
144 let manager = MemoryQueueManager::new();
145 let data = JobData {
146 app_key: "test_key".to_string(),
147 app_id: "test_id".to_string(),
148 app_secret: "test_secret".to_string(),
149 payload: JobPayload {
150 time_ms: chrono::Utc::now().timestamp_millis(),
151 events: vec![],
152 },
153 original_signature: "test_signature".to_string(),
154 };
155
156 manager
157 .add_to_queue("test_queue", data.clone())
158 .await
159 .unwrap();
160
161 assert_eq!(manager.queues.get("test_queue").unwrap().len(), 1);
162 }
163
164 #[tokio::test]
165 async fn test_disconnect() {
166 let manager = MemoryQueueManager::new();
167 let data = JobData {
168 app_key: "test_key".to_string(),
169 app_id: "test_id".to_string(),
170 app_secret: "test_secret".to_string(),
171 payload: JobPayload {
172 time_ms: chrono::Utc::now().timestamp_millis(),
173 events: vec![],
174 },
175 original_signature: "test_signature".to_string(),
176 };
177
178 manager.add_to_queue("test_queue", data).await.unwrap();
179 assert!(!manager.queues.is_empty());
180
181 manager.disconnect().await.unwrap();
182 assert!(manager.queues.is_empty());
183 }
184}