Skip to main content

foxtive_worker/backends/
memory.rs

1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex};
4use tokio::sync::Notify;
5
6use crate::backends::ReceiveResult;
7use crate::backends::contract::MessageBackend;
8use crate::error::WorkerResult;
9use crate::message::{AckHandle, Message, MessageMetadata, ReceivedMessage};
10
11/// In-memory acknowledgment handle.
12#[derive(Debug)]
13pub struct MemoryAckHandle {
14    message_id: String,
15    backend: Arc<MemoryBackendInner>,
16}
17
18#[async_trait]
19impl AckHandle for MemoryAckHandle {
20    async fn ack(&self) -> WorkerResult<()> {
21        self.backend.ack(&self.message_id)
22    }
23
24    async fn nack(&self, requeue: bool) -> WorkerResult<()> {
25        self.backend.nack(&self.message_id, requeue)
26    }
27}
28
29/// Internal state for the memory backend.
30#[derive(Debug)]
31struct MemoryBackendInner {
32    queue: Mutex<VecDeque<Message<serde_json::Value>>>,
33    unacked: Mutex<std::collections::HashMap<String, Message<serde_json::Value>>>,
34    notify: Notify,
35    shutdown: Mutex<bool>,
36}
37
38impl MemoryBackendInner {
39    fn ack(&self, message_id: &str) -> WorkerResult<()> {
40        let mut unacked = self.unacked.lock().unwrap();
41        unacked.remove(message_id);
42        Ok(())
43    }
44
45    fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
46        let mut unacked = self.unacked.lock().unwrap();
47        if let Some(message) = unacked.remove(message_id)
48            && requeue
49        {
50            self.queue.lock().unwrap().push_back(message);
51            self.notify.notify_one();
52        }
53        Ok(())
54    }
55}
56
57/// In-memory message backend for testing and development.
58///
59/// This backend stores messages in memory and provides a simple queue
60/// for testing worker implementations without external dependencies.
61///
62/// # Example
63/// ```rust
64/// use foxtive_worker::backends::MemoryBackend;
65///
66/// let backend = MemoryBackend::new();
67/// backend.enqueue(serde_json::json!({"key": "value"}));
68/// ```
69pub struct MemoryBackend {
70    inner: Arc<MemoryBackendInner>,
71    source: String,
72}
73
74impl std::fmt::Debug for MemoryBackend {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("MemoryBackend")
77            .field("source", &self.source)
78            .finish()
79    }
80}
81
82impl MemoryBackend {
83    /// Create a new in-memory backend with the default source name.
84    pub fn new() -> Self {
85        Self::with_source("memory-queue")
86    }
87
88    /// Create a new in-memory backend with a custom source name.
89    pub fn with_source(source: impl Into<String>) -> Self {
90        Self {
91            inner: Arc::new(MemoryBackendInner {
92                queue: Mutex::new(VecDeque::new()),
93                unacked: Mutex::new(std::collections::HashMap::new()),
94                notify: Notify::new(),
95                shutdown: Mutex::new(false),
96            }),
97            source: source.into(),
98        }
99    }
100
101    /// Enqueue a message for processing.
102    ///
103    /// # Arguments
104    /// * `payload` - The message payload
105    ///
106    /// # Returns
107    /// The message ID
108    pub fn enqueue(&self, payload: serde_json::Value) -> String {
109        let message_id = uuid::Uuid::new_v4().to_string();
110        let message = Message {
111            id: message_id.clone(),
112            payload,
113            metadata: MessageMetadata::new(&self.source),
114        };
115
116        let mut queue = self.inner.queue.lock().unwrap();
117        queue.push_back(message);
118
119        // Notify waiting receivers
120        self.inner.notify.notify_one();
121
122        message_id
123    }
124
125    /// Enqueue multiple messages.
126    pub fn enqueue_batch(&self, payloads: Vec<serde_json::Value>) -> Vec<String> {
127        payloads.into_iter().map(|p| self.enqueue(p)).collect()
128    }
129
130    /// Get the number of messages currently in the queue.
131    pub fn queue_len(&self) -> usize {
132        let queue = self.inner.queue.lock().unwrap();
133        queue.len()
134    }
135
136    /// Get the number of unacknowledged messages.
137    pub fn unacked_count(&self) -> usize {
138        let unacked = self.inner.unacked.lock().unwrap();
139        unacked.len()
140    }
141
142    /// Clear all messages from the queue.
143    pub fn clear(&self) {
144        let mut queue = self.inner.queue.lock().unwrap();
145        queue.clear();
146    }
147}
148
149impl Default for MemoryBackend {
150    fn default() -> Self {
151        Self::new()
152    }
153}
154
155#[async_trait]
156impl MessageBackend for MemoryBackend {
157    async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
158        // Check if shutdown
159        {
160            let shutdown = self.inner.shutdown.lock().unwrap();
161            if *shutdown {
162                return Ok(ReceiveResult::Shutdown);
163            }
164        }
165
166        // Try to get a message from the queue
167        loop {
168            // Check queue first
169            {
170                let mut queue = self.inner.queue.lock().unwrap();
171                if let Some(message) = queue.pop_front() {
172                    let message_id = message.id.clone();
173
174                    // Track as unacked
175                    {
176                        let mut unacked = self.inner.unacked.lock().unwrap();
177                        unacked.insert(message_id.clone(), message.clone());
178                    }
179
180                    let ack_handle = Arc::new(MemoryAckHandle {
181                        message_id,
182                        backend: self.inner.clone(),
183                    });
184
185                    return Ok(ReceiveResult::Message(ReceivedMessage::new(
186                        message, ack_handle,
187                    )));
188                }
189            }
190
191            // Check shutdown again
192            {
193                let shutdown = self.inner.shutdown.lock().unwrap();
194                if *shutdown {
195                    return Ok(ReceiveResult::Shutdown);
196                }
197            }
198
199            // Wait for notification
200            self.inner.notify.notified().await;
201        }
202    }
203
204    async fn ack(&self, message_id: &str) -> WorkerResult<()> {
205        self.inner.ack(message_id)
206    }
207
208    async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
209        self.inner.nack(message_id, requeue)
210    }
211
212    async fn health_check(&self) -> WorkerResult<()> {
213        // Memory backend is always healthy
214        Ok(())
215    }
216
217    async fn shutdown(&self) -> WorkerResult<()> {
218        let mut shutdown = self.inner.shutdown.lock().unwrap();
219        *shutdown = true;
220        // Wake up any waiting receivers
221        self.inner.notify.notify_waiters();
222        Ok(())
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use crate::backends::ReceiveResult;
230
231    #[tokio::test]
232    async fn test_enqueue_and_receive() {
233        let backend = MemoryBackend::new();
234
235        backend.enqueue(serde_json::json!({"test": "data"}));
236
237        let result = backend.receive().await.unwrap();
238        assert!(result.is_message());
239
240        if let ReceiveResult::Message(message) = result {
241            assert_eq!(message.message.payload["test"], "data");
242        } else {
243            panic!("Expected Message variant");
244        }
245    }
246
247    #[tokio::test]
248    async fn test_ack_removes_from_unacked() {
249        let backend = MemoryBackend::new();
250
251        backend.enqueue(serde_json::json!({"test": "data"}));
252
253        let result = backend.receive().await.unwrap();
254        if let ReceiveResult::Message(received) = result {
255            assert_eq!(backend.unacked_count(), 1);
256            received.ack().await.unwrap();
257            assert_eq!(backend.unacked_count(), 0);
258        } else {
259            panic!("Expected Message variant");
260        }
261    }
262
263    #[tokio::test]
264    async fn test_nack_with_requeue() {
265        let backend = MemoryBackend::new();
266
267        backend.enqueue(serde_json::json!({"test": "data"}));
268
269        let result = backend.receive().await.unwrap();
270        if let ReceiveResult::Message(received) = result {
271            assert_eq!(backend.queue_len(), 0);
272            received.nack(true).await.unwrap();
273            assert_eq!(backend.queue_len(), 1); // Requeued
274        } else {
275            panic!("Expected Message variant");
276        }
277    }
278
279    #[tokio::test]
280    async fn test_nack_without_requeue() {
281        let backend = MemoryBackend::new();
282
283        backend.enqueue(serde_json::json!({"test": "data"}));
284
285        let result = backend.receive().await.unwrap();
286        if let ReceiveResult::Message(received) = result {
287            received.nack(false).await.unwrap();
288            assert_eq!(backend.queue_len(), 0); // Not requeued
289            assert_eq!(backend.unacked_count(), 0); // Removed from unacked
290        } else {
291            panic!("Expected Message variant");
292        }
293    }
294
295    #[tokio::test]
296    async fn test_shutdown() {
297        let backend = MemoryBackend::new();
298
299        backend.shutdown().await.unwrap();
300
301        let result = backend.receive().await.unwrap();
302        assert!(result.is_shutdown());
303    }
304
305    #[tokio::test]
306    async fn test_health_check() {
307        let backend = MemoryBackend::new();
308        assert!(backend.health_check().await.is_ok());
309    }
310
311    #[tokio::test]
312    async fn test_queue_len() {
313        let backend = MemoryBackend::new();
314
315        backend.enqueue(serde_json::json!({"msg": 1}));
316        backend.enqueue(serde_json::json!({"msg": 2}));
317        backend.enqueue(serde_json::json!({"msg": 3}));
318
319        assert_eq!(backend.queue_len(), 3);
320    }
321
322    #[tokio::test]
323    async fn test_clear() {
324        let backend = MemoryBackend::new();
325
326        backend.enqueue(serde_json::json!({"msg": 1}));
327        backend.enqueue(serde_json::json!({"msg": 2}));
328
329        backend.clear();
330        assert_eq!(backend.queue_len(), 0);
331    }
332}