Skip to main content

mqdb_core/
outbox.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::error::Result;
5use crate::events::ChangeEvent;
6use crate::storage::{BatchWriter, Storage};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11const OUTBOX_PREFIX: &[u8] = b"_outbox/";
12const DEAD_LETTER_PREFIX: &[u8] = b"_dead_letter/";
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15struct StoredOutboxEntry {
16    events: Vec<ChangeEvent>,
17    retry_count: u32,
18    created_at: u64,
19    #[serde(default)]
20    dispatched_count: usize,
21}
22
23pub struct Outbox {
24    storage: Arc<Storage>,
25}
26
27impl Outbox {
28    #[allow(clippy::must_use_candidate)]
29    pub fn new(storage: Arc<Storage>) -> Self {
30        Self { storage }
31    }
32
33    pub fn enqueue_event(&self, batch: &mut BatchWriter, operation_id: &str, event: &ChangeEvent) {
34        self.enqueue_events(batch, operation_id, std::slice::from_ref(event));
35    }
36
37    pub fn enqueue_events(
38        &self,
39        batch: &mut BatchWriter,
40        operation_id: &str,
41        events: &[ChangeEvent],
42    ) {
43        let key = format!("_outbox/{operation_id}");
44        let stored = StoredOutboxEntry {
45            events: events.to_vec(),
46            retry_count: 0,
47            created_at: SystemTime::now()
48                .duration_since(UNIX_EPOCH)
49                .map(|d| d.as_secs())
50                .unwrap_or(0),
51            dispatched_count: 0,
52        };
53        let value = serde_json::to_vec(&stored).unwrap_or_default();
54        batch.insert(key.into_bytes(), value);
55    }
56
57    /// # Errors
58    /// Returns an error if reading from storage fails.
59    pub fn pending_events(&self) -> Result<Vec<OutboxEntry>> {
60        self.scan_entries(OUTBOX_PREFIX, "_outbox/")
61    }
62
63    fn scan_entries(&self, prefix: &[u8], strip_prefix: &str) -> Result<Vec<OutboxEntry>> {
64        let items = self.storage.prefix_scan(prefix)?;
65        let mut entries = Vec::new();
66
67        for (key, value) in items {
68            let key_str = String::from_utf8_lossy(&key);
69            let operation_id = key_str
70                .strip_prefix(strip_prefix)
71                .unwrap_or(&key_str)
72                .to_string();
73
74            if let Ok(stored) = serde_json::from_slice::<StoredOutboxEntry>(&value) {
75                entries.push(OutboxEntry {
76                    operation_id,
77                    events: stored.events,
78                    retry_count: stored.retry_count,
79                    created_at: stored.created_at,
80                    dispatched_count: stored.dispatched_count,
81                });
82            } else if let Ok(event) = serde_json::from_slice::<ChangeEvent>(&value) {
83                entries.push(OutboxEntry {
84                    operation_id,
85                    events: vec![event],
86                    retry_count: 0,
87                    created_at: 0,
88                    dispatched_count: 0,
89                });
90            } else if let Ok(events) = serde_json::from_slice::<Vec<ChangeEvent>>(&value) {
91                entries.push(OutboxEntry {
92                    operation_id,
93                    events,
94                    retry_count: 0,
95                    created_at: 0,
96                    dispatched_count: 0,
97                });
98            }
99        }
100
101        Ok(entries)
102    }
103
104    /// # Errors
105    /// Returns an error if removing from storage fails.
106    pub fn mark_delivered(&self, operation_id: &str) -> Result<()> {
107        let key = format!("_outbox/{operation_id}");
108        self.storage.remove(key.as_bytes())
109    }
110
111    /// # Errors
112    /// Returns an error if reading from storage fails.
113    pub fn pending_count(&self) -> Result<usize> {
114        let items = self.storage.prefix_scan(OUTBOX_PREFIX)?;
115        Ok(items.len())
116    }
117
118    /// # Errors
119    /// Returns an error if reading or writing to storage fails.
120    pub fn increment_retry(&self, operation_id: &str) -> Result<()> {
121        let key = format!("_outbox/{operation_id}");
122        if let Some(value) = self.storage.get(key.as_bytes())?
123            && let Ok(mut stored) = serde_json::from_slice::<StoredOutboxEntry>(&value)
124        {
125            stored.retry_count += 1;
126            let new_value = serde_json::to_vec(&stored).unwrap_or_default();
127            self.storage.insert(key.as_bytes(), &new_value)?;
128        }
129        Ok(())
130    }
131
132    /// # Errors
133    /// Returns an error if reading or writing to storage fails.
134    pub fn update_dispatched_count(&self, operation_id: &str, count: usize) -> Result<()> {
135        let key = format!("_outbox/{operation_id}");
136        if let Some(value) = self.storage.get(key.as_bytes())?
137            && let Ok(mut stored) = serde_json::from_slice::<StoredOutboxEntry>(&value)
138        {
139            stored.dispatched_count = count;
140            stored.retry_count += 1;
141            let new_value = serde_json::to_vec(&stored).unwrap_or_default();
142            self.storage.insert(key.as_bytes(), &new_value)?;
143        }
144        Ok(())
145    }
146
147    /// # Errors
148    /// Returns an error if reading or writing to storage fails.
149    pub fn move_to_dead_letter(&self, operation_id: &str) -> Result<()> {
150        let outbox_key = format!("_outbox/{operation_id}");
151        if let Some(value) = self.storage.get(outbox_key.as_bytes())? {
152            let dead_letter_key = format!("_dead_letter/{operation_id}");
153            let mut batch = self.storage.batch();
154            batch.remove(outbox_key.into_bytes());
155            batch.insert(dead_letter_key.into_bytes(), value);
156            batch.commit()?;
157        }
158        Ok(())
159    }
160
161    /// # Errors
162    /// Returns an error if reading from storage fails.
163    pub fn dead_letter_entries(&self) -> Result<Vec<OutboxEntry>> {
164        self.scan_entries(DEAD_LETTER_PREFIX, "_dead_letter/")
165    }
166
167    /// # Errors
168    /// Returns an error if reading from storage fails.
169    pub fn dead_letter_count(&self) -> Result<usize> {
170        let items = self.storage.prefix_scan(DEAD_LETTER_PREFIX)?;
171        Ok(items.len())
172    }
173
174    /// # Errors
175    /// Returns an error if removing from storage fails.
176    pub fn remove_dead_letter(&self, operation_id: &str) -> Result<()> {
177        let key = format!("_dead_letter/{operation_id}");
178        self.storage.remove(key.as_bytes())
179    }
180}
181
182#[derive(Debug, Clone)]
183pub struct OutboxEntry {
184    pub operation_id: String,
185    pub events: Vec<ChangeEvent>,
186    pub retry_count: u32,
187    pub created_at: u64,
188    pub dispatched_count: usize,
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::storage::MemoryBackend;
195
196    #[test]
197    fn test_outbox_enqueue_single_event() {
198        let backend = Arc::new(MemoryBackend::new());
199        let storage = Arc::new(Storage::with_backend(backend));
200        let outbox = Outbox::new(Arc::clone(&storage));
201
202        let event = ChangeEvent::create(
203            "users".to_string(),
204            "1".to_string(),
205            serde_json::json!({"name": "Alice"}),
206        );
207
208        let mut batch = storage.batch();
209        outbox.enqueue_event(&mut batch, "op-123", &event);
210        batch.commit().unwrap();
211
212        let pending = outbox.pending_events().unwrap();
213        assert_eq!(pending.len(), 1);
214        assert_eq!(pending[0].operation_id, "op-123");
215        assert_eq!(pending[0].events.len(), 1);
216        assert_eq!(pending[0].events[0].entity, "users");
217    }
218
219    #[test]
220    fn test_outbox_enqueue_multiple_events() {
221        let backend = Arc::new(MemoryBackend::new());
222        let storage = Arc::new(Storage::with_backend(backend));
223        let outbox = Outbox::new(Arc::clone(&storage));
224
225        let events = vec![
226            ChangeEvent::delete("users".to_string(), "1".to_string(), serde_json::json!({})),
227            ChangeEvent::delete("posts".to_string(), "10".to_string(), serde_json::json!({})),
228            ChangeEvent::delete("posts".to_string(), "11".to_string(), serde_json::json!({})),
229        ];
230
231        let mut batch = storage.batch();
232        outbox.enqueue_events(&mut batch, "cascade-456", &events);
233        batch.commit().unwrap();
234
235        let pending = outbox.pending_events().unwrap();
236        assert_eq!(pending.len(), 1);
237        assert_eq!(pending[0].events.len(), 3);
238    }
239
240    #[test]
241    fn test_outbox_mark_delivered() {
242        let backend = Arc::new(MemoryBackend::new());
243        let storage = Arc::new(Storage::with_backend(backend));
244        let outbox = Outbox::new(Arc::clone(&storage));
245
246        let event = ChangeEvent::create(
247            "users".to_string(),
248            "1".to_string(),
249            serde_json::json!({"name": "Alice"}),
250        );
251
252        let mut batch = storage.batch();
253        outbox.enqueue_event(&mut batch, "op-123", &event);
254        batch.commit().unwrap();
255
256        assert_eq!(outbox.pending_count().unwrap(), 1);
257
258        outbox.mark_delivered("op-123").unwrap();
259
260        assert_eq!(outbox.pending_count().unwrap(), 0);
261    }
262
263    #[test]
264    fn test_outbox_atomic_with_data() {
265        let backend = Arc::new(MemoryBackend::new());
266        let storage = Arc::new(Storage::with_backend(backend));
267        let outbox = Outbox::new(Arc::clone(&storage));
268
269        let event = ChangeEvent::create(
270            "users".to_string(),
271            "1".to_string(),
272            serde_json::json!({"name": "Alice"}),
273        );
274
275        let mut batch = storage.batch();
276        batch.insert(b"data/users/1".to_vec(), b"user data".to_vec());
277        outbox.enqueue_event(&mut batch, "op-123", &event);
278        batch.commit().unwrap();
279
280        assert!(storage.get(b"data/users/1").unwrap().is_some());
281        assert_eq!(outbox.pending_count().unwrap(), 1);
282    }
283
284    #[test]
285    fn test_outbox_retry_count() {
286        let backend = Arc::new(MemoryBackend::new());
287        let storage = Arc::new(Storage::with_backend(backend));
288        let outbox = Outbox::new(Arc::clone(&storage));
289
290        let event = ChangeEvent::create(
291            "users".to_string(),
292            "1".to_string(),
293            serde_json::json!({"name": "Alice"}),
294        );
295
296        let mut batch = storage.batch();
297        outbox.enqueue_event(&mut batch, "op-123", &event);
298        batch.commit().unwrap();
299
300        let pending = outbox.pending_events().unwrap();
301        assert_eq!(pending[0].retry_count, 0);
302
303        outbox.increment_retry("op-123").unwrap();
304        outbox.increment_retry("op-123").unwrap();
305
306        let pending = outbox.pending_events().unwrap();
307        assert_eq!(pending[0].retry_count, 2);
308    }
309
310    #[test]
311    fn test_outbox_dead_letter() {
312        let backend = Arc::new(MemoryBackend::new());
313        let storage = Arc::new(Storage::with_backend(backend));
314        let outbox = Outbox::new(Arc::clone(&storage));
315
316        let event = ChangeEvent::create(
317            "users".to_string(),
318            "1".to_string(),
319            serde_json::json!({"name": "Alice"}),
320        );
321
322        let mut batch = storage.batch();
323        outbox.enqueue_event(&mut batch, "op-123", &event);
324        batch.commit().unwrap();
325
326        assert_eq!(outbox.pending_count().unwrap(), 1);
327        assert_eq!(outbox.dead_letter_count().unwrap(), 0);
328
329        outbox.move_to_dead_letter("op-123").unwrap();
330
331        assert_eq!(outbox.pending_count().unwrap(), 0);
332        assert_eq!(outbox.dead_letter_count().unwrap(), 1);
333
334        let dead = outbox.dead_letter_entries().unwrap();
335        assert_eq!(dead[0].operation_id, "op-123");
336        assert_eq!(dead[0].events[0].entity, "users");
337    }
338
339    #[test]
340    fn test_outbox_created_at() {
341        let backend = Arc::new(MemoryBackend::new());
342        let storage = Arc::new(Storage::with_backend(backend));
343        let outbox = Outbox::new(Arc::clone(&storage));
344
345        let event = ChangeEvent::create(
346            "users".to_string(),
347            "1".to_string(),
348            serde_json::json!({"name": "Alice"}),
349        );
350
351        let mut batch = storage.batch();
352        outbox.enqueue_event(&mut batch, "op-123", &event);
353        batch.commit().unwrap();
354
355        let pending = outbox.pending_events().unwrap();
356        assert!(pending[0].created_at > 0);
357    }
358
359    #[test]
360    fn test_outbox_update_dispatched_count() {
361        let backend = Arc::new(MemoryBackend::new());
362        let storage = Arc::new(Storage::with_backend(backend));
363        let outbox = Outbox::new(Arc::clone(&storage));
364
365        let events = vec![
366            ChangeEvent::create("users".to_string(), "1".to_string(), serde_json::json!({})),
367            ChangeEvent::create("users".to_string(), "2".to_string(), serde_json::json!({})),
368            ChangeEvent::create("users".to_string(), "3".to_string(), serde_json::json!({})),
369        ];
370
371        let mut batch = storage.batch();
372        outbox.enqueue_events(&mut batch, "op-partial", &events);
373        batch.commit().unwrap();
374
375        let pending = outbox.pending_events().unwrap();
376        assert_eq!(pending[0].dispatched_count, 0);
377        assert_eq!(pending[0].retry_count, 0);
378
379        outbox.update_dispatched_count("op-partial", 1).unwrap();
380
381        let pending = outbox.pending_events().unwrap();
382        assert_eq!(pending[0].dispatched_count, 1);
383        assert_eq!(pending[0].retry_count, 1);
384        assert_eq!(pending[0].events.len(), 3);
385
386        outbox.update_dispatched_count("op-partial", 2).unwrap();
387
388        let pending = outbox.pending_events().unwrap();
389        assert_eq!(pending[0].dispatched_count, 2);
390        assert_eq!(pending[0].retry_count, 2);
391    }
392
393    #[test]
394    fn test_outbox_backward_compat_missing_dispatched_count() {
395        let backend = Arc::new(MemoryBackend::new());
396        let storage = Arc::new(Storage::with_backend(backend));
397        let outbox = Outbox::new(Arc::clone(&storage));
398
399        let legacy = serde_json::json!({
400            "events": [{"sequence": 0, "entity": "users", "id": "1", "operation": "Create", "data": {}}],
401            "retry_count": 3,
402            "created_at": 1000
403        });
404        storage
405            .insert(b"_outbox/legacy-op", &serde_json::to_vec(&legacy).unwrap())
406            .unwrap();
407
408        let pending = outbox.pending_events().unwrap();
409        assert_eq!(pending.len(), 1);
410        assert_eq!(pending[0].dispatched_count, 0);
411        assert_eq!(pending[0].retry_count, 3);
412    }
413}