Skip to main content

foxtive_worker/backends/
memory.rs

1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex};
4use tokio::sync::Notify;
5
6use crate::MessageProperties;
7use crate::backends::ReceiveResult;
8use crate::backends::contract::MessageBackend;
9use crate::error::WorkerResult;
10use crate::message::{AckHandle, Message, MessageMetadata, ReceivedMessage};
11
12/// In-memory acknowledgment handle.
13#[derive(Debug)]
14pub struct MemoryAckHandle {
15    message_id: String,
16    backend: Arc<MemoryBackendInner>,
17}
18
19#[async_trait]
20impl AckHandle for MemoryAckHandle {
21    async fn ack(&self) -> WorkerResult<()> {
22        self.backend.ack(&self.message_id)
23    }
24
25    async fn nack(&self, requeue: bool) -> WorkerResult<()> {
26        self.backend.nack(&self.message_id, requeue)
27    }
28}
29
30/// Internal state for the memory backend.
31#[derive(Debug)]
32struct MemoryBackendInner {
33    queue: Mutex<VecDeque<Message<serde_json::Value>>>,
34    unacked: Mutex<std::collections::HashMap<String, Message<serde_json::Value>>>,
35    notify: Notify,
36    shutdown: Mutex<bool>,
37}
38
39impl MemoryBackendInner {
40    fn ack(&self, message_id: &str) -> WorkerResult<()> {
41        let mut unacked = self.unacked.lock().unwrap();
42        unacked.remove(message_id);
43        Ok(())
44    }
45
46    fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
47        let mut unacked = self.unacked.lock().unwrap();
48        if let Some(message) = unacked.remove(message_id)
49            && requeue
50        {
51            self.queue.lock().unwrap().push_back(message);
52            self.notify.notify_one();
53        }
54        Ok(())
55    }
56}
57
58/// In-memory message backend for testing and development.
59///
60/// This backend stores messages in memory and provides a simple queue
61/// for testing worker implementations without external dependencies.
62///
63/// # Example
64/// ```rust
65/// use foxtive_worker::backends::MemoryBackend;
66///
67/// let backend = MemoryBackend::new();
68/// backend.enqueue(serde_json::json!({"key": "value"}));
69/// ```
70pub struct MemoryBackend {
71    inner: Arc<MemoryBackendInner>,
72    source: String,
73}
74
75impl std::fmt::Debug for MemoryBackend {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("MemoryBackend")
78            .field("source", &self.source)
79            .finish()
80    }
81}
82
83impl MemoryBackend {
84    /// Create a new in-memory backend with the default source name.
85    pub fn new() -> Self {
86        Self::with_source("memory-queue")
87    }
88
89    /// Create a new in-memory backend with a custom source name.
90    pub fn with_source(source: impl Into<String>) -> Self {
91        Self {
92            inner: Arc::new(MemoryBackendInner {
93                queue: Mutex::new(VecDeque::new()),
94                unacked: Mutex::new(std::collections::HashMap::new()),
95                notify: Notify::new(),
96                shutdown: Mutex::new(false),
97            }),
98            source: source.into(),
99        }
100    }
101
102    /// Enqueue a message for processing.
103    ///
104    /// # Arguments
105    /// * `payload` - The message payload
106    ///
107    /// # Returns
108    /// The message ID
109    pub fn enqueue(&self, payload: serde_json::Value) -> String {
110        self.enqueue_with_properties(payload, None)
111    }
112
113    /// Enqueue a message with custom properties.
114    ///
115    /// # Arguments
116    /// * `payload` - The message payload
117    /// * `properties` - Optional message properties
118    ///
119    /// # Returns
120    /// The message ID
121    pub fn enqueue_with_properties(
122        &self,
123        payload: serde_json::Value,
124        properties: Option<MessageProperties>,
125    ) -> String {
126        let message_id = uuid::Uuid::new_v4().to_string();
127        let mut metadata = MessageMetadata::new(&self.source);
128        if let Some(props) = properties {
129            metadata = metadata.with_properties(props);
130        }
131
132        let message = Message {
133            id: message_id.clone(),
134            payload,
135            metadata,
136        };
137
138        let mut queue = self.inner.queue.lock().unwrap();
139        queue.push_back(message);
140
141        // Notify waiting receivers
142        self.inner.notify.notify_one();
143
144        message_id
145    }
146
147    /// Enqueue multiple messages.
148    pub fn enqueue_batch(&self, payloads: Vec<serde_json::Value>) -> Vec<String> {
149        payloads.into_iter().map(|p| self.enqueue(p)).collect()
150    }
151
152    /// Get the number of messages currently in the queue.
153    pub fn queue_len(&self) -> usize {
154        let queue = self.inner.queue.lock().unwrap();
155        queue.len()
156    }
157
158    /// Get the number of unacknowledged messages.
159    pub fn unacked_count(&self) -> usize {
160        let unacked = self.inner.unacked.lock().unwrap();
161        unacked.len()
162    }
163
164    /// Clear all messages from the queue.
165    pub fn clear(&self) {
166        let mut queue = self.inner.queue.lock().unwrap();
167        queue.clear();
168    }
169}
170
171impl Default for MemoryBackend {
172    fn default() -> Self {
173        Self::new()
174    }
175}
176
177#[async_trait]
178impl MessageBackend for MemoryBackend {
179    async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
180        // Check if shutdown
181        {
182            let shutdown = self.inner.shutdown.lock().unwrap();
183            if *shutdown {
184                return Ok(ReceiveResult::Shutdown);
185            }
186        }
187
188        // Try to get a message from the queue
189        loop {
190            // Check queue first
191            {
192                let mut queue = self.inner.queue.lock().unwrap();
193                if let Some(message) = queue.pop_front() {
194                    let message_id = message.id.clone();
195
196                    // Track as unacked
197                    {
198                        let mut unacked = self.inner.unacked.lock().unwrap();
199                        unacked.insert(message_id.clone(), message.clone());
200                    }
201
202                    let ack_handle = Arc::new(MemoryAckHandle {
203                        message_id,
204                        backend: self.inner.clone(),
205                    });
206
207                    return Ok(ReceiveResult::Message(Box::from(ReceivedMessage::new(
208                        message, ack_handle,
209                    ))));
210                }
211            }
212
213            // Check shutdown again
214            {
215                let shutdown = self.inner.shutdown.lock().unwrap();
216                if *shutdown {
217                    return Ok(ReceiveResult::Shutdown);
218                }
219            }
220
221            // Wait for notification
222            self.inner.notify.notified().await;
223        }
224    }
225
226    async fn ack(&self, message_id: &str) -> WorkerResult<()> {
227        self.inner.ack(message_id)
228    }
229
230    async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
231        self.inner.nack(message_id, requeue)
232    }
233
234    async fn health_check(&self) -> WorkerResult<()> {
235        // Memory backend is always healthy
236        Ok(())
237    }
238
239    async fn shutdown(&self) -> WorkerResult<()> {
240        let mut shutdown = self.inner.shutdown.lock().unwrap();
241        *shutdown = true;
242        // Wake up any waiting receivers
243        self.inner.notify.notify_waiters();
244        Ok(())
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use crate::backends::ReceiveResult;
252
253    #[tokio::test]
254    async fn test_enqueue_and_receive() {
255        let backend = MemoryBackend::new();
256
257        backend.enqueue(serde_json::json!({"test": "data"}));
258
259        let result = backend.receive().await.unwrap();
260        assert!(result.is_message());
261
262        if let ReceiveResult::Message(message) = result {
263            assert_eq!(message.message.payload["test"], "data");
264        } else {
265            panic!("Expected Message variant");
266        }
267    }
268
269    #[tokio::test]
270    async fn test_ack_removes_from_unacked() {
271        let backend = MemoryBackend::new();
272
273        backend.enqueue(serde_json::json!({"test": "data"}));
274
275        let result = backend.receive().await.unwrap();
276        if let ReceiveResult::Message(received) = result {
277            assert_eq!(backend.unacked_count(), 1);
278            received.ack().await.unwrap();
279            assert_eq!(backend.unacked_count(), 0);
280        } else {
281            panic!("Expected Message variant");
282        }
283    }
284
285    #[tokio::test]
286    async fn test_nack_with_requeue() {
287        let backend = MemoryBackend::new();
288
289        backend.enqueue(serde_json::json!({"test": "data"}));
290
291        let result = backend.receive().await.unwrap();
292        if let ReceiveResult::Message(received) = result {
293            assert_eq!(backend.queue_len(), 0);
294            received.nack(true).await.unwrap();
295            assert_eq!(backend.queue_len(), 1); // Requeued
296        } else {
297            panic!("Expected Message variant");
298        }
299    }
300
301    #[tokio::test]
302    async fn test_nack_without_requeue() {
303        let backend = MemoryBackend::new();
304
305        backend.enqueue(serde_json::json!({"test": "data"}));
306
307        let result = backend.receive().await.unwrap();
308        if let ReceiveResult::Message(received) = result {
309            received.nack(false).await.unwrap();
310            assert_eq!(backend.queue_len(), 0); // Not requeued
311            assert_eq!(backend.unacked_count(), 0); // Removed from unacked
312        } else {
313            panic!("Expected Message variant");
314        }
315    }
316
317    #[tokio::test]
318    async fn test_shutdown() {
319        let backend = MemoryBackend::new();
320
321        backend.shutdown().await.unwrap();
322
323        let result = backend.receive().await.unwrap();
324        assert!(result.is_shutdown());
325    }
326
327    #[tokio::test]
328    async fn test_health_check() {
329        let backend = MemoryBackend::new();
330        assert!(backend.health_check().await.is_ok());
331    }
332
333    #[tokio::test]
334    async fn test_queue_len() {
335        let backend = MemoryBackend::new();
336
337        backend.enqueue(serde_json::json!({"msg": 1}));
338        backend.enqueue(serde_json::json!({"msg": 2}));
339        backend.enqueue(serde_json::json!({"msg": 3}));
340
341        assert_eq!(backend.queue_len(), 3);
342    }
343
344    #[tokio::test]
345    async fn test_clear() {
346        let backend = MemoryBackend::new();
347
348        backend.enqueue(serde_json::json!({"msg": 1}));
349        backend.enqueue(serde_json::json!({"msg": 2}));
350
351        backend.clear();
352        assert_eq!(backend.queue_len(), 0);
353    }
354}