foxtive_worker/backends/
memory.rs1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex};
4use tokio::sync::Notify;
5
6use crate::MessageProperties;
7use crate::backends::ReceiveResult;
8use crate::backends::contract::MessageBackend;
9use crate::error::WorkerResult;
10use crate::message::{AckHandle, Message, MessageMetadata, ReceivedMessage};
11
12#[derive(Debug)]
14pub struct MemoryAckHandle {
15 message_id: String,
16 backend: Arc<MemoryBackendInner>,
17}
18
19#[async_trait]
20impl AckHandle for MemoryAckHandle {
21 async fn ack(&self) -> WorkerResult<()> {
22 self.backend.ack(&self.message_id)
23 }
24
25 async fn nack(&self, requeue: bool) -> WorkerResult<()> {
26 self.backend.nack(&self.message_id, requeue)
27 }
28}
29
30#[derive(Debug)]
32struct MemoryBackendInner {
33 queue: Mutex<VecDeque<Message<serde_json::Value>>>,
34 unacked: Mutex<std::collections::HashMap<String, Message<serde_json::Value>>>,
35 notify: Notify,
36 shutdown: Mutex<bool>,
37}
38
39impl MemoryBackendInner {
40 fn ack(&self, message_id: &str) -> WorkerResult<()> {
41 let mut unacked = self.unacked.lock().unwrap();
42 unacked.remove(message_id);
43 Ok(())
44 }
45
46 fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
47 let mut unacked = self.unacked.lock().unwrap();
48 if let Some(message) = unacked.remove(message_id)
49 && requeue
50 {
51 self.queue.lock().unwrap().push_back(message);
52 self.notify.notify_one();
53 }
54 Ok(())
55 }
56}
57
58pub struct MemoryBackend {
71 inner: Arc<MemoryBackendInner>,
72 source: String,
73}
74
75impl std::fmt::Debug for MemoryBackend {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("MemoryBackend")
78 .field("source", &self.source)
79 .finish()
80 }
81}
82
83impl MemoryBackend {
84 pub fn new() -> Self {
86 Self::with_source("memory-queue")
87 }
88
89 pub fn with_source(source: impl Into<String>) -> Self {
91 Self {
92 inner: Arc::new(MemoryBackendInner {
93 queue: Mutex::new(VecDeque::new()),
94 unacked: Mutex::new(std::collections::HashMap::new()),
95 notify: Notify::new(),
96 shutdown: Mutex::new(false),
97 }),
98 source: source.into(),
99 }
100 }
101
102 pub fn enqueue(&self, payload: serde_json::Value) -> String {
110 self.enqueue_with_properties(payload, None)
111 }
112
113 pub fn enqueue_with_properties(
122 &self,
123 payload: serde_json::Value,
124 properties: Option<MessageProperties>,
125 ) -> String {
126 let message_id = uuid::Uuid::new_v4().to_string();
127 let mut metadata = MessageMetadata::new(&self.source);
128 if let Some(props) = properties {
129 metadata = metadata.with_properties(props);
130 }
131
132 let message = Message {
133 id: message_id.clone(),
134 payload,
135 metadata,
136 };
137
138 let mut queue = self.inner.queue.lock().unwrap();
139 queue.push_back(message);
140
141 self.inner.notify.notify_one();
143
144 message_id
145 }
146
147 pub fn enqueue_batch(&self, payloads: Vec<serde_json::Value>) -> Vec<String> {
149 payloads.into_iter().map(|p| self.enqueue(p)).collect()
150 }
151
152 pub fn queue_len(&self) -> usize {
154 let queue = self.inner.queue.lock().unwrap();
155 queue.len()
156 }
157
158 pub fn unacked_count(&self) -> usize {
160 let unacked = self.inner.unacked.lock().unwrap();
161 unacked.len()
162 }
163
164 pub fn clear(&self) {
166 let mut queue = self.inner.queue.lock().unwrap();
167 queue.clear();
168 }
169}
170
171impl Default for MemoryBackend {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177#[async_trait]
178impl MessageBackend for MemoryBackend {
179 async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
180 {
182 let shutdown = self.inner.shutdown.lock().unwrap();
183 if *shutdown {
184 return Ok(ReceiveResult::Shutdown);
185 }
186 }
187
188 loop {
190 {
192 let mut queue = self.inner.queue.lock().unwrap();
193 if let Some(message) = queue.pop_front() {
194 let message_id = message.id.clone();
195
196 {
198 let mut unacked = self.inner.unacked.lock().unwrap();
199 unacked.insert(message_id.clone(), message.clone());
200 }
201
202 let ack_handle = Arc::new(MemoryAckHandle {
203 message_id,
204 backend: self.inner.clone(),
205 });
206
207 return Ok(ReceiveResult::Message(Box::from(ReceivedMessage::new(
208 message, ack_handle,
209 ))));
210 }
211 }
212
213 {
215 let shutdown = self.inner.shutdown.lock().unwrap();
216 if *shutdown {
217 return Ok(ReceiveResult::Shutdown);
218 }
219 }
220
221 self.inner.notify.notified().await;
223 }
224 }
225
226 async fn ack(&self, message_id: &str) -> WorkerResult<()> {
227 self.inner.ack(message_id)
228 }
229
230 async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
231 self.inner.nack(message_id, requeue)
232 }
233
234 async fn health_check(&self) -> WorkerResult<()> {
235 Ok(())
237 }
238
239 async fn shutdown(&self) -> WorkerResult<()> {
240 let mut shutdown = self.inner.shutdown.lock().unwrap();
241 *shutdown = true;
242 self.inner.notify.notify_waiters();
244 Ok(())
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use crate::backends::ReceiveResult;
252
253 #[tokio::test]
254 async fn test_enqueue_and_receive() {
255 let backend = MemoryBackend::new();
256
257 backend.enqueue(serde_json::json!({"test": "data"}));
258
259 let result = backend.receive().await.unwrap();
260 assert!(result.is_message());
261
262 if let ReceiveResult::Message(message) = result {
263 assert_eq!(message.message.payload["test"], "data");
264 } else {
265 panic!("Expected Message variant");
266 }
267 }
268
269 #[tokio::test]
270 async fn test_ack_removes_from_unacked() {
271 let backend = MemoryBackend::new();
272
273 backend.enqueue(serde_json::json!({"test": "data"}));
274
275 let result = backend.receive().await.unwrap();
276 if let ReceiveResult::Message(received) = result {
277 assert_eq!(backend.unacked_count(), 1);
278 received.ack().await.unwrap();
279 assert_eq!(backend.unacked_count(), 0);
280 } else {
281 panic!("Expected Message variant");
282 }
283 }
284
285 #[tokio::test]
286 async fn test_nack_with_requeue() {
287 let backend = MemoryBackend::new();
288
289 backend.enqueue(serde_json::json!({"test": "data"}));
290
291 let result = backend.receive().await.unwrap();
292 if let ReceiveResult::Message(received) = result {
293 assert_eq!(backend.queue_len(), 0);
294 received.nack(true).await.unwrap();
295 assert_eq!(backend.queue_len(), 1); } else {
297 panic!("Expected Message variant");
298 }
299 }
300
301 #[tokio::test]
302 async fn test_nack_without_requeue() {
303 let backend = MemoryBackend::new();
304
305 backend.enqueue(serde_json::json!({"test": "data"}));
306
307 let result = backend.receive().await.unwrap();
308 if let ReceiveResult::Message(received) = result {
309 received.nack(false).await.unwrap();
310 assert_eq!(backend.queue_len(), 0); assert_eq!(backend.unacked_count(), 0); } else {
313 panic!("Expected Message variant");
314 }
315 }
316
317 #[tokio::test]
318 async fn test_shutdown() {
319 let backend = MemoryBackend::new();
320
321 backend.shutdown().await.unwrap();
322
323 let result = backend.receive().await.unwrap();
324 assert!(result.is_shutdown());
325 }
326
327 #[tokio::test]
328 async fn test_health_check() {
329 let backend = MemoryBackend::new();
330 assert!(backend.health_check().await.is_ok());
331 }
332
333 #[tokio::test]
334 async fn test_queue_len() {
335 let backend = MemoryBackend::new();
336
337 backend.enqueue(serde_json::json!({"msg": 1}));
338 backend.enqueue(serde_json::json!({"msg": 2}));
339 backend.enqueue(serde_json::json!({"msg": 3}));
340
341 assert_eq!(backend.queue_len(), 3);
342 }
343
344 #[tokio::test]
345 async fn test_clear() {
346 let backend = MemoryBackend::new();
347
348 backend.enqueue(serde_json::json!({"msg": 1}));
349 backend.enqueue(serde_json::json!({"msg": 2}));
350
351 backend.clear();
352 assert_eq!(backend.queue_len(), 0);
353 }
354}