foxtive_worker/backends/
memory.rs1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex};
4use tokio::sync::Notify;
5
6use crate::backends::contract::MessageBackend;
7use crate::backends::ReceiveResult;
8use crate::error::WorkerResult;
9use crate::message::{AckHandle, Message, MessageMetadata, ReceivedMessage};
10
11#[derive(Debug)]
13pub struct MemoryAckHandle {
14 message_id: String,
15 backend: Arc<MemoryBackendInner>,
16}
17
18#[async_trait]
19impl AckHandle for MemoryAckHandle {
20 async fn ack(&self) -> WorkerResult<()> {
21 self.backend.ack(&self.message_id)
22 }
23
24 async fn nack(&self, requeue: bool) -> WorkerResult<()> {
25 self.backend.nack(&self.message_id, requeue)
26 }
27}
28
29#[derive(Debug)]
31struct MemoryBackendInner {
32 queue: Mutex<VecDeque<Message<serde_json::Value>>>,
33 unacked: Mutex<std::collections::HashMap<String, Message<serde_json::Value>>>,
34 notify: Notify,
35 shutdown: Mutex<bool>,
36}
37
38impl MemoryBackendInner {
39 fn ack(&self, message_id: &str) -> WorkerResult<()> {
40 let mut unacked = self.unacked.lock().unwrap();
41 unacked.remove(message_id);
42 Ok(())
43 }
44
45 fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
46 let mut unacked = self.unacked.lock().unwrap();
47 if let Some(message) = unacked.remove(message_id)
48 && requeue {
49 self.queue.lock().unwrap().push_back(message);
50 self.notify.notify_one();
51 }
52 Ok(())
53 }
54}
55
56pub struct MemoryBackend {
69 inner: Arc<MemoryBackendInner>,
70 source: String,
71}
72
73impl std::fmt::Debug for MemoryBackend {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("MemoryBackend")
76 .field("source", &self.source)
77 .finish()
78 }
79}
80
81impl MemoryBackend {
82 pub fn new() -> Self {
84 Self::with_source("memory-queue")
85 }
86
87 pub fn with_source(source: impl Into<String>) -> Self {
89 Self {
90 inner: Arc::new(MemoryBackendInner {
91 queue: Mutex::new(VecDeque::new()),
92 unacked: Mutex::new(std::collections::HashMap::new()),
93 notify: Notify::new(),
94 shutdown: Mutex::new(false),
95 }),
96 source: source.into(),
97 }
98 }
99
100 pub fn enqueue(&self, payload: serde_json::Value) -> String {
108 let message_id = uuid::Uuid::new_v4().to_string();
109 let message = Message {
110 id: message_id.clone(),
111 payload,
112 metadata: MessageMetadata::new(&self.source),
113 };
114
115 let mut queue = self.inner.queue.lock().unwrap();
116 queue.push_back(message);
117
118 self.inner.notify.notify_one();
120
121 message_id
122 }
123
124 pub fn enqueue_batch(&self, payloads: Vec<serde_json::Value>) -> Vec<String> {
126 payloads.into_iter().map(|p| self.enqueue(p)).collect()
127 }
128
129 pub fn queue_len(&self) -> usize {
131 let queue = self.inner.queue.lock().unwrap();
132 queue.len()
133 }
134
135 pub fn unacked_count(&self) -> usize {
137 let unacked = self.inner.unacked.lock().unwrap();
138 unacked.len()
139 }
140
141 pub fn clear(&self) {
143 let mut queue = self.inner.queue.lock().unwrap();
144 queue.clear();
145 }
146}
147
148impl Default for MemoryBackend {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154#[async_trait]
155impl MessageBackend for MemoryBackend {
156 async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
157 {
159 let shutdown = self.inner.shutdown.lock().unwrap();
160 if *shutdown {
161 return Ok(ReceiveResult::Shutdown);
162 }
163 }
164
165 loop {
167 {
169 let mut queue = self.inner.queue.lock().unwrap();
170 if let Some(message) = queue.pop_front() {
171 let message_id = message.id.clone();
172
173 {
175 let mut unacked = self.inner.unacked.lock().unwrap();
176 unacked.insert(message_id.clone(), message.clone());
177 }
178
179 let ack_handle = Arc::new(MemoryAckHandle {
180 message_id,
181 backend: self.inner.clone(),
182 });
183
184 return Ok(ReceiveResult::Message(ReceivedMessage::new(message, ack_handle)));
185 }
186 }
187
188 {
190 let shutdown = self.inner.shutdown.lock().unwrap();
191 if *shutdown {
192 return Ok(ReceiveResult::Shutdown);
193 }
194 }
195
196 self.inner.notify.notified().await;
198 }
199 }
200
201 async fn ack(&self, message_id: &str) -> WorkerResult<()> {
202 self.inner.ack(message_id)
203 }
204
205 async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
206 self.inner.nack(message_id, requeue)
207 }
208
209 async fn health_check(&self) -> WorkerResult<()> {
210 Ok(())
212 }
213
214 async fn shutdown(&self) -> WorkerResult<()> {
215 let mut shutdown = self.inner.shutdown.lock().unwrap();
216 *shutdown = true;
217 self.inner.notify.notify_waiters();
219 Ok(())
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use crate::backends::ReceiveResult;
227
228 #[tokio::test]
229 async fn test_enqueue_and_receive() {
230 let backend = MemoryBackend::new();
231
232 backend.enqueue(serde_json::json!({"test": "data"}));
233
234 let result = backend.receive().await.unwrap();
235 assert!(result.is_message());
236
237 if let ReceiveResult::Message(message) = result {
238 assert_eq!(message.message.payload["test"], "data");
239 } else {
240 panic!("Expected Message variant");
241 }
242 }
243
244 #[tokio::test]
245 async fn test_ack_removes_from_unacked() {
246 let backend = MemoryBackend::new();
247
248 backend.enqueue(serde_json::json!({"test": "data"}));
249
250 let result = backend.receive().await.unwrap();
251 if let ReceiveResult::Message(received) = result {
252 assert_eq!(backend.unacked_count(), 1);
253 received.ack().await.unwrap();
254 assert_eq!(backend.unacked_count(), 0);
255 } else {
256 panic!("Expected Message variant");
257 }
258 }
259
260 #[tokio::test]
261 async fn test_nack_with_requeue() {
262 let backend = MemoryBackend::new();
263
264 backend.enqueue(serde_json::json!({"test": "data"}));
265
266 let result = backend.receive().await.unwrap();
267 if let ReceiveResult::Message(received) = result {
268 assert_eq!(backend.queue_len(), 0);
269 received.nack(true).await.unwrap();
270 assert_eq!(backend.queue_len(), 1); } else {
272 panic!("Expected Message variant");
273 }
274 }
275
276 #[tokio::test]
277 async fn test_nack_without_requeue() {
278 let backend = MemoryBackend::new();
279
280 backend.enqueue(serde_json::json!({"test": "data"}));
281
282 let result = backend.receive().await.unwrap();
283 if let ReceiveResult::Message(received) = result {
284 received.nack(false).await.unwrap();
285 assert_eq!(backend.queue_len(), 0); assert_eq!(backend.unacked_count(), 0); } else {
288 panic!("Expected Message variant");
289 }
290 }
291
292 #[tokio::test]
293 async fn test_shutdown() {
294 let backend = MemoryBackend::new();
295
296 backend.shutdown().await.unwrap();
297
298 let result = backend.receive().await.unwrap();
299 assert!(result.is_shutdown());
300 }
301
302 #[tokio::test]
303 async fn test_health_check() {
304 let backend = MemoryBackend::new();
305 assert!(backend.health_check().await.is_ok());
306 }
307
308 #[tokio::test]
309 async fn test_queue_len() {
310 let backend = MemoryBackend::new();
311
312 backend.enqueue(serde_json::json!({"msg": 1}));
313 backend.enqueue(serde_json::json!({"msg": 2}));
314 backend.enqueue(serde_json::json!({"msg": 3}));
315
316 assert_eq!(backend.queue_len(), 3);
317 }
318
319 #[tokio::test]
320 async fn test_clear() {
321 let backend = MemoryBackend::new();
322
323 backend.enqueue(serde_json::json!({"msg": 1}));
324 backend.enqueue(serde_json::json!({"msg": 2}));
325
326 backend.clear();
327 assert_eq!(backend.queue_len(), 0);
328 }
329}