sockudo_queue/
memory_queue_manager.rs1use crate::ArcJobProcessorFn;
2use async_trait::async_trait;
3use dashmap::DashMap;
4use sockudo_core::queue::QueueInterface;
5use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
6use std::sync::Arc;
7use std::time::Duration;
8use tracing::{debug, info, warn};
9
10const MAX_QUEUE_SIZE: usize = 100_000;
12
13pub struct MemoryQueueManager {
15 queues: Arc<DashMap<String, Vec<JobData>, ahash::RandomState>>,
16 processors: Arc<DashMap<String, ArcJobProcessorFn, ahash::RandomState>>,
17}
18
19impl Default for MemoryQueueManager {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl MemoryQueueManager {
26 pub fn new() -> Self {
27 let queues = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
28 let processors = Arc::new(DashMap::with_hasher(ahash::RandomState::new()));
29
30 Self { queues, processors }
31 }
32
33 pub fn start_processing(&self) {
35 let queues = Arc::clone(&self.queues);
36 let processors = Arc::clone(&self.processors);
37
38 info!("{}", "Starting memory queue processing loop...".to_string());
39
40 tokio::spawn(async move {
41 let mut interval = tokio::time::interval(Duration::from_millis(500));
42
43 loop {
44 interval.tick().await;
45
46 let queue_names: Vec<String> = queues
47 .iter()
48 .map(|entry| entry.key().clone())
49 .filter(|name| processors.contains_key(name))
50 .collect();
51
52 for queue_name in queue_names {
53 if let Some(processor) = processors.get(&queue_name)
54 && let Some((key, mut jobs_vec)) = queues.remove(&queue_name)
55 {
56 if !jobs_vec.is_empty() {
57 debug!(
58 "Processing {} jobs from memory queue {}",
59 jobs_vec.len(),
60 key
61 );
62
63 queues.insert(key, Vec::new());
64
65 for job in jobs_vec.drain(..) {
66 let processor_clone = Arc::clone(&processor);
67 tokio::spawn(async move {
68 if let Err(e) = processor_clone(job).await {
69 tracing::error!("Failed to process webhook job: {}", e);
70 }
71 });
72 }
73 } else {
74 queues.insert(key, jobs_vec);
75 }
76 }
77 }
78 }
79 });
80 }
81}
82
83#[async_trait]
84impl QueueInterface for MemoryQueueManager {
85 async fn add_to_queue(
86 &self,
87 queue_name: &str,
88 data: JobData,
89 ) -> sockudo_core::error::Result<()> {
90 let mut queue = self.queues.entry(queue_name.to_string()).or_default();
91
92 if queue.len() >= MAX_QUEUE_SIZE {
93 let to_remove = queue.len() - MAX_QUEUE_SIZE + 1;
94 warn!(
95 "Memory queue '{}' at capacity ({}), dropping {} oldest job(s)",
96 queue_name, MAX_QUEUE_SIZE, to_remove
97 );
98 queue.drain(0..to_remove);
99 }
100
101 queue.push(data);
102 Ok(())
103 }
104
105 async fn process_queue(
106 &self,
107 queue_name: &str,
108 callback: JobProcessorFnAsync,
109 ) -> sockudo_core::error::Result<()> {
110 self.processors
111 .insert(queue_name.to_string(), Arc::from(callback));
112 debug!("Registered processor for memory queue: {}", queue_name);
113
114 Ok(())
115 }
116
117 async fn disconnect(&self) -> sockudo_core::error::Result<()> {
118 self.queues.clear();
119 Ok(())
120 }
121
122 async fn check_health(&self) -> sockudo_core::error::Result<()> {
123 Ok(())
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use sockudo_core::webhook_types::{JobData, JobPayload};
131
132 #[tokio::test]
133 async fn test_add_to_queue() {
134 let manager = MemoryQueueManager::new();
135 let data = JobData {
136 app_key: "test_key".to_string(),
137 app_id: "test_id".to_string(),
138 app_secret: "test_secret".to_string(),
139 payload: JobPayload {
140 time_ms: chrono::Utc::now().timestamp_millis(),
141 events: vec![],
142 },
143 original_signature: "test_signature".to_string(),
144 };
145
146 manager
147 .add_to_queue("test_queue", data.clone())
148 .await
149 .unwrap();
150
151 assert_eq!(manager.queues.get("test_queue").unwrap().len(), 1);
152 }
153
154 #[tokio::test]
155 async fn test_disconnect() {
156 let manager = MemoryQueueManager::new();
157 let data = JobData {
158 app_key: "test_key".to_string(),
159 app_id: "test_id".to_string(),
160 app_secret: "test_secret".to_string(),
161 payload: JobPayload {
162 time_ms: chrono::Utc::now().timestamp_millis(),
163 events: vec![],
164 },
165 original_signature: "test_signature".to_string(),
166 };
167
168 manager.add_to_queue("test_queue", data).await.unwrap();
169 assert!(!manager.queues.is_empty());
170
171 manager.disconnect().await.unwrap();
172 assert!(manager.queues.is_empty());
173 }
174}