Skip to main content

foxtive_worker/backends/
memory.rs

1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex};
4use tokio::sync::Notify;
5
6use crate::backends::contract::MessageBackend;
7use crate::backends::ReceiveResult;
8use crate::error::WorkerResult;
9use crate::message::{AckHandle, Message, MessageMetadata, ReceivedMessage};
10
11/// In-memory acknowledgment handle.
12#[derive(Debug)]
13pub struct MemoryAckHandle {
14    message_id: String,
15    backend: Arc<MemoryBackendInner>,
16}
17
18#[async_trait]
19impl AckHandle for MemoryAckHandle {
20    async fn ack(&self) -> WorkerResult<()> {
21        self.backend.ack(&self.message_id)
22    }
23
24    async fn nack(&self, requeue: bool) -> WorkerResult<()> {
25        self.backend.nack(&self.message_id, requeue)
26    }
27}
28
29/// Internal state for the memory backend.
30#[derive(Debug)]
31struct MemoryBackendInner {
32    queue: Mutex<VecDeque<Message<serde_json::Value>>>,
33    unacked: Mutex<std::collections::HashMap<String, Message<serde_json::Value>>>,
34    notify: Notify,
35    shutdown: Mutex<bool>,
36}
37
38impl MemoryBackendInner {
39    fn ack(&self, message_id: &str) -> WorkerResult<()> {
40        let mut unacked = self.unacked.lock().unwrap();
41        unacked.remove(message_id);
42        Ok(())
43    }
44
45    fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
46        let mut unacked = self.unacked.lock().unwrap();
47        if let Some(message) = unacked.remove(message_id)
48            && requeue {
49                self.queue.lock().unwrap().push_back(message);
50                self.notify.notify_one();
51            }
52        Ok(())
53    }
54}
55
56/// In-memory message backend for testing and development.
57///
58/// This backend stores messages in memory and provides a simple queue
59/// for testing worker implementations without external dependencies.
60///
61/// # Example
62/// ```rust
63/// use foxtive_worker::backends::MemoryBackend;
64///
65/// let backend = MemoryBackend::new();
66/// backend.enqueue(serde_json::json!({"key": "value"}));
67/// ```
68pub struct MemoryBackend {
69    inner: Arc<MemoryBackendInner>,
70    source: String,
71}
72
73impl std::fmt::Debug for MemoryBackend {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("MemoryBackend")
76            .field("source", &self.source)
77            .finish()
78    }
79}
80
81impl MemoryBackend {
82    /// Create a new in-memory backend with the default source name.
83    pub fn new() -> Self {
84        Self::with_source("memory-queue")
85    }
86
87    /// Create a new in-memory backend with a custom source name.
88    pub fn with_source(source: impl Into<String>) -> Self {
89        Self {
90            inner: Arc::new(MemoryBackendInner {
91                queue: Mutex::new(VecDeque::new()),
92                unacked: Mutex::new(std::collections::HashMap::new()),
93                notify: Notify::new(),
94                shutdown: Mutex::new(false),
95            }),
96            source: source.into(),
97        }
98    }
99
100    /// Enqueue a message for processing.
101    ///
102    /// # Arguments
103    /// * `payload` - The message payload
104    ///
105    /// # Returns
106    /// The message ID
107    pub fn enqueue(&self, payload: serde_json::Value) -> String {
108        let message_id = uuid::Uuid::new_v4().to_string();
109        let message = Message {
110            id: message_id.clone(),
111            payload,
112            metadata: MessageMetadata::new(&self.source),
113        };
114
115        let mut queue = self.inner.queue.lock().unwrap();
116        queue.push_back(message);
117        
118        // Notify waiting receivers
119        self.inner.notify.notify_one();
120
121        message_id
122    }
123
124    /// Enqueue multiple messages.
125    pub fn enqueue_batch(&self, payloads: Vec<serde_json::Value>) -> Vec<String> {
126        payloads.into_iter().map(|p| self.enqueue(p)).collect()
127    }
128
129    /// Get the number of messages currently in the queue.
130    pub fn queue_len(&self) -> usize {
131        let queue = self.inner.queue.lock().unwrap();
132        queue.len()
133    }
134
135    /// Get the number of unacknowledged messages.
136    pub fn unacked_count(&self) -> usize {
137        let unacked = self.inner.unacked.lock().unwrap();
138        unacked.len()
139    }
140
141    /// Clear all messages from the queue.
142    pub fn clear(&self) {
143        let mut queue = self.inner.queue.lock().unwrap();
144        queue.clear();
145    }
146}
147
148impl Default for MemoryBackend {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154#[async_trait]
155impl MessageBackend for MemoryBackend {
156    async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
157        // Check if shutdown
158        {
159            let shutdown = self.inner.shutdown.lock().unwrap();
160            if *shutdown {
161                return Ok(ReceiveResult::Shutdown);
162            }
163        }
164
165        // Try to get a message from the queue
166        loop {
167            // Check queue first
168            {
169                let mut queue = self.inner.queue.lock().unwrap();
170                if let Some(message) = queue.pop_front() {
171                    let message_id = message.id.clone();
172                    
173                    // Track as unacked
174                    {
175                        let mut unacked = self.inner.unacked.lock().unwrap();
176                        unacked.insert(message_id.clone(), message.clone());
177                    }
178
179                    let ack_handle = Arc::new(MemoryAckHandle {
180                        message_id,
181                        backend: self.inner.clone(),
182                    });
183
184                    return Ok(ReceiveResult::Message(ReceivedMessage::new(message, ack_handle)));
185                }
186            }
187
188            // Check shutdown again
189            {
190                let shutdown = self.inner.shutdown.lock().unwrap();
191                if *shutdown {
192                    return Ok(ReceiveResult::Shutdown);
193                }
194            }
195
196            // Wait for notification
197            self.inner.notify.notified().await;
198        }
199    }
200
201    async fn ack(&self, message_id: &str) -> WorkerResult<()> {
202        self.inner.ack(message_id)
203    }
204
205    async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
206        self.inner.nack(message_id, requeue)
207    }
208
209    async fn health_check(&self) -> WorkerResult<()> {
210        // Memory backend is always healthy
211        Ok(())
212    }
213
214    async fn shutdown(&self) -> WorkerResult<()> {
215        let mut shutdown = self.inner.shutdown.lock().unwrap();
216        *shutdown = true;
217        // Wake up any waiting receivers
218        self.inner.notify.notify_waiters();
219        Ok(())
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use crate::backends::ReceiveResult;
227
228    #[tokio::test]
229    async fn test_enqueue_and_receive() {
230        let backend = MemoryBackend::new();
231        
232        backend.enqueue(serde_json::json!({"test": "data"}));
233        
234        let result = backend.receive().await.unwrap();
235        assert!(result.is_message());
236        
237        if let ReceiveResult::Message(message) = result {
238            assert_eq!(message.message.payload["test"], "data");
239        } else {
240            panic!("Expected Message variant");
241        }
242    }
243
244    #[tokio::test]
245    async fn test_ack_removes_from_unacked() {
246        let backend = MemoryBackend::new();
247        
248        backend.enqueue(serde_json::json!({"test": "data"}));
249        
250        let result = backend.receive().await.unwrap();
251        if let ReceiveResult::Message(received) = result {
252            assert_eq!(backend.unacked_count(), 1);
253            received.ack().await.unwrap();
254            assert_eq!(backend.unacked_count(), 0);
255        } else {
256            panic!("Expected Message variant");
257        }
258    }
259
260    #[tokio::test]
261    async fn test_nack_with_requeue() {
262        let backend = MemoryBackend::new();
263        
264        backend.enqueue(serde_json::json!({"test": "data"}));
265        
266        let result = backend.receive().await.unwrap();
267        if let ReceiveResult::Message(received) = result {
268            assert_eq!(backend.queue_len(), 0);
269            received.nack(true).await.unwrap();
270            assert_eq!(backend.queue_len(), 1); // Requeued
271        } else {
272            panic!("Expected Message variant");
273        }
274    }
275
276    #[tokio::test]
277    async fn test_nack_without_requeue() {
278        let backend = MemoryBackend::new();
279        
280        backend.enqueue(serde_json::json!({"test": "data"}));
281        
282        let result = backend.receive().await.unwrap();
283        if let ReceiveResult::Message(received) = result {
284            received.nack(false).await.unwrap();
285            assert_eq!(backend.queue_len(), 0); // Not requeued
286            assert_eq!(backend.unacked_count(), 0); // Removed from unacked
287        } else {
288            panic!("Expected Message variant");
289        }
290    }
291
292    #[tokio::test]
293    async fn test_shutdown() {
294        let backend = MemoryBackend::new();
295        
296        backend.shutdown().await.unwrap();
297        
298        let result = backend.receive().await.unwrap();
299        assert!(result.is_shutdown());
300    }
301
302    #[tokio::test]
303    async fn test_health_check() {
304        let backend = MemoryBackend::new();
305        assert!(backend.health_check().await.is_ok());
306    }
307
308    #[tokio::test]
309    async fn test_queue_len() {
310        let backend = MemoryBackend::new();
311        
312        backend.enqueue(serde_json::json!({"msg": 1}));
313        backend.enqueue(serde_json::json!({"msg": 2}));
314        backend.enqueue(serde_json::json!({"msg": 3}));
315        
316        assert_eq!(backend.queue_len(), 3);
317    }
318
319    #[tokio::test]
320    async fn test_clear() {
321        let backend = MemoryBackend::new();
322        
323        backend.enqueue(serde_json::json!({"msg": 1}));
324        backend.enqueue(serde_json::json!({"msg": 2}));
325        
326        backend.clear();
327        assert_eq!(backend.queue_len(), 0);
328    }
329}