foxtive_worker/backends/
memory.rs1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex};
4use tokio::sync::Notify;
5
6use crate::backends::ReceiveResult;
7use crate::backends::contract::MessageBackend;
8use crate::error::WorkerResult;
9use crate::message::{AckHandle, Message, MessageMetadata, ReceivedMessage};
10
11#[derive(Debug)]
13pub struct MemoryAckHandle {
14 message_id: String,
15 backend: Arc<MemoryBackendInner>,
16}
17
18#[async_trait]
19impl AckHandle for MemoryAckHandle {
20 async fn ack(&self) -> WorkerResult<()> {
21 self.backend.ack(&self.message_id)
22 }
23
24 async fn nack(&self, requeue: bool) -> WorkerResult<()> {
25 self.backend.nack(&self.message_id, requeue)
26 }
27}
28
29#[derive(Debug)]
31struct MemoryBackendInner {
32 queue: Mutex<VecDeque<Message<serde_json::Value>>>,
33 unacked: Mutex<std::collections::HashMap<String, Message<serde_json::Value>>>,
34 notify: Notify,
35 shutdown: Mutex<bool>,
36}
37
38impl MemoryBackendInner {
39 fn ack(&self, message_id: &str) -> WorkerResult<()> {
40 let mut unacked = self.unacked.lock().unwrap();
41 unacked.remove(message_id);
42 Ok(())
43 }
44
45 fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
46 let mut unacked = self.unacked.lock().unwrap();
47 if let Some(message) = unacked.remove(message_id)
48 && requeue
49 {
50 self.queue.lock().unwrap().push_back(message);
51 self.notify.notify_one();
52 }
53 Ok(())
54 }
55}
56
57pub struct MemoryBackend {
70 inner: Arc<MemoryBackendInner>,
71 source: String,
72}
73
74impl std::fmt::Debug for MemoryBackend {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("MemoryBackend")
77 .field("source", &self.source)
78 .finish()
79 }
80}
81
82impl MemoryBackend {
83 pub fn new() -> Self {
85 Self::with_source("memory-queue")
86 }
87
88 pub fn with_source(source: impl Into<String>) -> Self {
90 Self {
91 inner: Arc::new(MemoryBackendInner {
92 queue: Mutex::new(VecDeque::new()),
93 unacked: Mutex::new(std::collections::HashMap::new()),
94 notify: Notify::new(),
95 shutdown: Mutex::new(false),
96 }),
97 source: source.into(),
98 }
99 }
100
101 pub fn enqueue(&self, payload: serde_json::Value) -> String {
109 let message_id = uuid::Uuid::new_v4().to_string();
110 let message = Message {
111 id: message_id.clone(),
112 payload,
113 metadata: MessageMetadata::new(&self.source),
114 };
115
116 let mut queue = self.inner.queue.lock().unwrap();
117 queue.push_back(message);
118
119 self.inner.notify.notify_one();
121
122 message_id
123 }
124
125 pub fn enqueue_batch(&self, payloads: Vec<serde_json::Value>) -> Vec<String> {
127 payloads.into_iter().map(|p| self.enqueue(p)).collect()
128 }
129
130 pub fn queue_len(&self) -> usize {
132 let queue = self.inner.queue.lock().unwrap();
133 queue.len()
134 }
135
136 pub fn unacked_count(&self) -> usize {
138 let unacked = self.inner.unacked.lock().unwrap();
139 unacked.len()
140 }
141
142 pub fn clear(&self) {
144 let mut queue = self.inner.queue.lock().unwrap();
145 queue.clear();
146 }
147}
148
149impl Default for MemoryBackend {
150 fn default() -> Self {
151 Self::new()
152 }
153}
154
155#[async_trait]
156impl MessageBackend for MemoryBackend {
157 async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
158 {
160 let shutdown = self.inner.shutdown.lock().unwrap();
161 if *shutdown {
162 return Ok(ReceiveResult::Shutdown);
163 }
164 }
165
166 loop {
168 {
170 let mut queue = self.inner.queue.lock().unwrap();
171 if let Some(message) = queue.pop_front() {
172 let message_id = message.id.clone();
173
174 {
176 let mut unacked = self.inner.unacked.lock().unwrap();
177 unacked.insert(message_id.clone(), message.clone());
178 }
179
180 let ack_handle = Arc::new(MemoryAckHandle {
181 message_id,
182 backend: self.inner.clone(),
183 });
184
185 return Ok(ReceiveResult::Message(ReceivedMessage::new(
186 message, ack_handle,
187 )));
188 }
189 }
190
191 {
193 let shutdown = self.inner.shutdown.lock().unwrap();
194 if *shutdown {
195 return Ok(ReceiveResult::Shutdown);
196 }
197 }
198
199 self.inner.notify.notified().await;
201 }
202 }
203
204 async fn ack(&self, message_id: &str) -> WorkerResult<()> {
205 self.inner.ack(message_id)
206 }
207
208 async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
209 self.inner.nack(message_id, requeue)
210 }
211
212 async fn health_check(&self) -> WorkerResult<()> {
213 Ok(())
215 }
216
217 async fn shutdown(&self) -> WorkerResult<()> {
218 let mut shutdown = self.inner.shutdown.lock().unwrap();
219 *shutdown = true;
220 self.inner.notify.notify_waiters();
222 Ok(())
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use crate::backends::ReceiveResult;
230
231 #[tokio::test]
232 async fn test_enqueue_and_receive() {
233 let backend = MemoryBackend::new();
234
235 backend.enqueue(serde_json::json!({"test": "data"}));
236
237 let result = backend.receive().await.unwrap();
238 assert!(result.is_message());
239
240 if let ReceiveResult::Message(message) = result {
241 assert_eq!(message.message.payload["test"], "data");
242 } else {
243 panic!("Expected Message variant");
244 }
245 }
246
247 #[tokio::test]
248 async fn test_ack_removes_from_unacked() {
249 let backend = MemoryBackend::new();
250
251 backend.enqueue(serde_json::json!({"test": "data"}));
252
253 let result = backend.receive().await.unwrap();
254 if let ReceiveResult::Message(received) = result {
255 assert_eq!(backend.unacked_count(), 1);
256 received.ack().await.unwrap();
257 assert_eq!(backend.unacked_count(), 0);
258 } else {
259 panic!("Expected Message variant");
260 }
261 }
262
263 #[tokio::test]
264 async fn test_nack_with_requeue() {
265 let backend = MemoryBackend::new();
266
267 backend.enqueue(serde_json::json!({"test": "data"}));
268
269 let result = backend.receive().await.unwrap();
270 if let ReceiveResult::Message(received) = result {
271 assert_eq!(backend.queue_len(), 0);
272 received.nack(true).await.unwrap();
273 assert_eq!(backend.queue_len(), 1); } else {
275 panic!("Expected Message variant");
276 }
277 }
278
279 #[tokio::test]
280 async fn test_nack_without_requeue() {
281 let backend = MemoryBackend::new();
282
283 backend.enqueue(serde_json::json!({"test": "data"}));
284
285 let result = backend.receive().await.unwrap();
286 if let ReceiveResult::Message(received) = result {
287 received.nack(false).await.unwrap();
288 assert_eq!(backend.queue_len(), 0); assert_eq!(backend.unacked_count(), 0); } else {
291 panic!("Expected Message variant");
292 }
293 }
294
295 #[tokio::test]
296 async fn test_shutdown() {
297 let backend = MemoryBackend::new();
298
299 backend.shutdown().await.unwrap();
300
301 let result = backend.receive().await.unwrap();
302 assert!(result.is_shutdown());
303 }
304
305 #[tokio::test]
306 async fn test_health_check() {
307 let backend = MemoryBackend::new();
308 assert!(backend.health_check().await.is_ok());
309 }
310
311 #[tokio::test]
312 async fn test_queue_len() {
313 let backend = MemoryBackend::new();
314
315 backend.enqueue(serde_json::json!({"msg": 1}));
316 backend.enqueue(serde_json::json!({"msg": 2}));
317 backend.enqueue(serde_json::json!({"msg": 3}));
318
319 assert_eq!(backend.queue_len(), 3);
320 }
321
322 #[tokio::test]
323 async fn test_clear() {
324 let backend = MemoryBackend::new();
325
326 backend.enqueue(serde_json::json!({"msg": 1}));
327 backend.enqueue(serde_json::json!({"msg": 2}));
328
329 backend.clear();
330 assert_eq!(backend.queue_len(), 0);
331 }
332}