Skip to main content

stateset_sync/
outbox.rs

1use std::collections::VecDeque;
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use serde::{Deserialize, Serialize};
6
7use crate::error::SyncError;
8use crate::event::SyncEvent;
9
10/// Default maximum capacity for the outbox.
11const DEFAULT_MAX_CAPACITY: usize = 10_000;
12
13/// Snapshot schema for durable outbox persistence.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15struct OutboxSnapshot {
16    events: Vec<SyncEvent>,
17    next_sequence: u64,
18}
19
20/// An event outbox.
21///
22/// Events are appended and assigned monotonically increasing sequence numbers.
23/// The outbox can be drained (consumed) in order for push operations, or
24/// peeked without consuming.
25///
26/// By default this is in-memory, but it can be backed by a durable JSON
27/// snapshot via [`Outbox::with_persistence`].
28///
29/// # Examples
30///
31/// ```
32/// use stateset_sync::{Outbox, SyncEvent};
33/// use serde_json::json;
34///
35/// let mut outbox = Outbox::new(100);
36/// let seq = outbox.append(SyncEvent::new("order.created", "order", "ORD-1", json!({}))).unwrap();
37/// assert_eq!(seq, 1);
38/// assert_eq!(outbox.count(), 1);
39/// ```
40#[derive(Debug)]
41pub struct Outbox {
42    events: VecDeque<SyncEvent>,
43    max_capacity: usize,
44    next_sequence: u64,
45    persistence_path: Option<PathBuf>,
46}
47
48impl Outbox {
49    /// Create a new in-memory `Outbox` with the given maximum capacity.
50    #[must_use]
51    pub const fn new(max_capacity: usize) -> Self {
52        Self { events: VecDeque::new(), max_capacity, next_sequence: 1, persistence_path: None }
53    }
54
55    /// Create a new `Outbox` with the default maximum capacity (10,000).
56    #[must_use]
57    pub const fn with_default_capacity() -> Self {
58        Self::new(DEFAULT_MAX_CAPACITY)
59    }
60
61    /// Create a durable outbox persisted to `path`.
62    ///
63    /// If the snapshot already exists, it is loaded and reused.
64    ///
65    /// # Errors
66    ///
67    /// Returns [`SyncError::Storage`] if snapshot I/O fails.
68    pub fn with_persistence(
69        max_capacity: usize,
70        path: impl AsRef<Path>,
71    ) -> Result<Self, SyncError> {
72        let path = path.as_ref().to_path_buf();
73        let mut outbox = Self {
74            events: VecDeque::new(),
75            max_capacity,
76            next_sequence: 1,
77            persistence_path: Some(path.clone()),
78        };
79
80        if path.exists() {
81            let contents = fs::read_to_string(&path)
82                .map_err(|e| SyncError::Storage(format!("read outbox snapshot failed: {e}")))?;
83            if !contents.trim().is_empty() {
84                let snapshot: OutboxSnapshot = serde_json::from_str(&contents)?;
85                outbox.events = snapshot.events.into();
86                outbox.next_sequence = snapshot.next_sequence.max(
87                    outbox.events.back().map(|event| event.sequence.saturating_add(1)).unwrap_or(1),
88                );
89                while outbox.events.len() > outbox.max_capacity {
90                    let _ = outbox.events.pop_front();
91                }
92            }
93        } else {
94            outbox.persist()?;
95        }
96
97        Ok(outbox)
98    }
99
100    /// Append an event to the outbox, assigning it the next sequence number.
101    ///
102    /// Returns the assigned sequence number.
103    ///
104    /// # Errors
105    ///
106    /// Returns [`SyncError::OutboxFull`] if the outbox is at capacity or
107    /// [`SyncError::Storage`] if durable persistence fails.
108    pub fn append(&mut self, event: SyncEvent) -> Result<u64, SyncError> {
109        if self.events.len() >= self.max_capacity {
110            return Err(SyncError::OutboxFull {
111                capacity: self.max_capacity,
112                current: self.events.len(),
113            });
114        }
115
116        let seq = self.next_sequence;
117        self.next_sequence += 1;
118        let event = event.with_sequence(seq);
119        self.events.push_back(event);
120
121        if let Err(err) = self.persist() {
122            let _ = self.events.pop_back();
123            self.next_sequence = self.next_sequence.saturating_sub(1);
124            return Err(err);
125        }
126
127        Ok(seq)
128    }
129
130    /// Drain up to `count` events from the front of the outbox (FIFO order).
131    ///
132    /// Drained events are removed from the outbox.
133    ///
134    /// # Errors
135    ///
136    /// Returns [`SyncError::Storage`] if durable persistence cannot be updated.
137    /// In that case, the original in-memory ordering is restored.
138    pub fn drain(&mut self, count: usize) -> Result<Vec<SyncEvent>, SyncError> {
139        let n = count.min(self.events.len());
140        let drained: Vec<SyncEvent> = self.events.drain(..n).collect();
141
142        if let Err(err) = self.persist() {
143            for event in drained.iter().rev().cloned() {
144                self.events.push_front(event);
145            }
146            return Err(err);
147        }
148
149        Ok(drained)
150    }
151
152    /// Peek at up to `count` events from the front of the outbox without consuming them.
153    #[must_use]
154    pub fn peek(&self, count: usize) -> Vec<&SyncEvent> {
155        self.events.iter().take(count).collect()
156    }
157
158    /// Retain only events matching the given predicate.
159    ///
160    /// Preserves FIFO order and does not modify sequence allocation.
161    pub fn retain<F>(&mut self, predicate: F)
162    where
163        F: FnMut(&SyncEvent) -> bool,
164    {
165        let _ = self.try_retain(predicate);
166    }
167
168    /// Retain only events matching the given predicate and persist the result.
169    ///
170    /// # Errors
171    ///
172    /// Returns [`SyncError::Storage`] if durable persistence fails. In that
173    /// case, the original in-memory ordering is restored.
174    pub fn try_retain<F>(&mut self, mut predicate: F) -> Result<(), SyncError>
175    where
176        F: FnMut(&SyncEvent) -> bool,
177    {
178        let before = self.events.clone();
179        self.events.retain(|event| predicate(event));
180        if let Err(err) = self.persist() {
181            self.events = before;
182            return Err(err);
183        }
184        Ok(())
185    }
186
187    /// Return the number of events currently in the outbox.
188    #[must_use]
189    pub fn count(&self) -> usize {
190        self.events.len()
191    }
192
193    /// Whether the outbox is empty.
194    #[must_use]
195    pub fn is_empty(&self) -> bool {
196        self.events.is_empty()
197    }
198
199    /// Whether the outbox is at maximum capacity.
200    #[must_use]
201    pub fn is_full(&self) -> bool {
202        self.events.len() >= self.max_capacity
203    }
204
205    /// Remove all events from the outbox.
206    pub fn clear(&mut self) {
207        let before = self.events.clone();
208        self.events.clear();
209        if self.persist().is_err() {
210            self.events = before;
211        }
212    }
213
214    /// Return the maximum capacity of the outbox.
215    #[must_use]
216    pub const fn max_capacity(&self) -> usize {
217        self.max_capacity
218    }
219
220    /// Return the next sequence number that will be assigned.
221    #[must_use]
222    pub const fn next_sequence(&self) -> u64 {
223        self.next_sequence
224    }
225
226    fn persist(&self) -> Result<(), SyncError> {
227        let Some(path) = &self.persistence_path else {
228            return Ok(());
229        };
230
231        let snapshot = OutboxSnapshot {
232            events: self.events.iter().cloned().collect(),
233            next_sequence: self.next_sequence,
234        };
235
236        let serialized = serde_json::to_string_pretty(&snapshot)?;
237        if let Some(parent) = path.parent() {
238            fs::create_dir_all(parent).map_err(|e| {
239                SyncError::Storage(format!("create outbox snapshot directory failed: {e}"))
240            })?;
241        }
242
243        let tmp_path = path.with_extension("tmp");
244        fs::write(&tmp_path, serialized)
245            .map_err(|e| SyncError::Storage(format!("write outbox snapshot failed: {e}")))?;
246        fs::rename(&tmp_path, path)
247            .map_err(|e| SyncError::Storage(format!("replace outbox snapshot failed: {e}")))?;
248
249        Ok(())
250    }
251}
252
253impl Default for Outbox {
254    fn default() -> Self {
255        Self::with_default_capacity()
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use serde_json::json;
263    use tempfile::tempdir;
264
265    fn make_event(event_type: &str) -> SyncEvent {
266        SyncEvent::new(event_type, "order", "ORD-1", json!({}))
267    }
268
269    #[test]
270    fn append_increments_sequence() {
271        let mut outbox = Outbox::new(100);
272        let s1 = outbox.append(make_event("a")).unwrap();
273        let s2 = outbox.append(make_event("b")).unwrap();
274        let s3 = outbox.append(make_event("c")).unwrap();
275        assert_eq!(s1, 1);
276        assert_eq!(s2, 2);
277        assert_eq!(s3, 3);
278    }
279
280    #[test]
281    fn append_updates_count() {
282        let mut outbox = Outbox::new(100);
283        assert_eq!(outbox.count(), 0);
284        outbox.append(make_event("a")).unwrap();
285        assert_eq!(outbox.count(), 1);
286        outbox.append(make_event("b")).unwrap();
287        assert_eq!(outbox.count(), 2);
288    }
289
290    #[test]
291    fn append_at_capacity_fails() {
292        let mut outbox = Outbox::new(2);
293        outbox.append(make_event("a")).unwrap();
294        outbox.append(make_event("b")).unwrap();
295        let result = outbox.append(make_event("c"));
296        assert!(result.is_err());
297        assert!(matches!(result.unwrap_err(), SyncError::OutboxFull { capacity: 2, current: 2 }));
298    }
299
300    #[test]
301    fn drain_partial() {
302        let mut outbox = Outbox::new(100);
303        outbox.append(make_event("a")).unwrap();
304        outbox.append(make_event("b")).unwrap();
305        outbox.append(make_event("c")).unwrap();
306
307        let drained = outbox.drain(2).unwrap();
308        assert_eq!(drained.len(), 2);
309        assert_eq!(drained[0].sequence, 1);
310        assert_eq!(drained[1].sequence, 2);
311        assert_eq!(outbox.count(), 1);
312    }
313
314    #[test]
315    fn drain_all() {
316        let mut outbox = Outbox::new(100);
317        outbox.append(make_event("a")).unwrap();
318        outbox.append(make_event("b")).unwrap();
319
320        let drained = outbox.drain(10).unwrap();
321        assert_eq!(drained.len(), 2);
322        assert!(outbox.is_empty());
323    }
324
325    #[test]
326    fn drain_empty() {
327        let mut outbox = Outbox::new(100);
328        let drained = outbox.drain(10).unwrap();
329        assert!(drained.is_empty());
330    }
331
332    #[test]
333    fn peek_without_consuming() {
334        let mut outbox = Outbox::new(100);
335        outbox.append(make_event("a")).unwrap();
336        outbox.append(make_event("b")).unwrap();
337
338        let peeked = outbox.peek(1);
339        assert_eq!(peeked.len(), 1);
340        assert_eq!(peeked[0].sequence, 1);
341        assert_eq!(outbox.count(), 2); // still there
342    }
343
344    #[test]
345    fn peek_more_than_available() {
346        let mut outbox = Outbox::new(100);
347        outbox.append(make_event("a")).unwrap();
348
349        let peeked = outbox.peek(10);
350        assert_eq!(peeked.len(), 1);
351    }
352
353    #[test]
354    fn retain_filters_events_and_preserves_order() {
355        let mut outbox = Outbox::new(10);
356        outbox.append(make_event("a")).unwrap();
357        outbox.append(make_event("b")).unwrap();
358        outbox.append(make_event("c")).unwrap();
359
360        outbox.retain(|event| event.event_type != "b");
361
362        let remaining = outbox.peek(10);
363        assert_eq!(remaining.len(), 2);
364        assert_eq!(remaining[0].event_type, "a");
365        assert_eq!(remaining[1].event_type, "c");
366    }
367
368    #[test]
369    fn clear_removes_all() {
370        let mut outbox = Outbox::new(100);
371        outbox.append(make_event("a")).unwrap();
372        outbox.append(make_event("b")).unwrap();
373        outbox.clear();
374        assert!(outbox.is_empty());
375        assert_eq!(outbox.count(), 0);
376    }
377
378    #[test]
379    fn clear_does_not_reset_sequence() {
380        let mut outbox = Outbox::new(100);
381        outbox.append(make_event("a")).unwrap();
382        outbox.append(make_event("b")).unwrap();
383        outbox.clear();
384
385        let seq = outbox.append(make_event("c")).unwrap();
386        assert_eq!(seq, 3); // sequence continues from where it left off
387    }
388
389    #[test]
390    fn is_empty_and_is_full() {
391        let mut outbox = Outbox::new(2);
392        assert!(outbox.is_empty());
393        assert!(!outbox.is_full());
394
395        outbox.append(make_event("a")).unwrap();
396        assert!(!outbox.is_empty());
397        assert!(!outbox.is_full());
398
399        outbox.append(make_event("b")).unwrap();
400        assert!(!outbox.is_empty());
401        assert!(outbox.is_full());
402    }
403
404    #[test]
405    fn fifo_ordering() {
406        let mut outbox = Outbox::new(100);
407        outbox.append(make_event("first")).unwrap();
408        outbox.append(make_event("second")).unwrap();
409        outbox.append(make_event("third")).unwrap();
410
411        let drained = outbox.drain(3).unwrap();
412        assert_eq!(drained[0].event_type, "first");
413        assert_eq!(drained[1].event_type, "second");
414        assert_eq!(drained[2].event_type, "third");
415    }
416
417    #[test]
418    fn drain_then_append_works() {
419        let mut outbox = Outbox::new(2);
420        outbox.append(make_event("a")).unwrap();
421        outbox.append(make_event("b")).unwrap();
422        assert!(outbox.is_full());
423
424        outbox.drain(1).unwrap();
425        assert!(!outbox.is_full());
426
427        let seq = outbox.append(make_event("c")).unwrap();
428        assert_eq!(seq, 3);
429    }
430
431    #[test]
432    fn default_capacity() {
433        let outbox = Outbox::with_default_capacity();
434        assert_eq!(outbox.max_capacity(), DEFAULT_MAX_CAPACITY);
435    }
436
437    #[test]
438    fn default_trait() {
439        let outbox = Outbox::default();
440        assert_eq!(outbox.max_capacity(), DEFAULT_MAX_CAPACITY);
441        assert!(outbox.is_empty());
442    }
443
444    #[test]
445    fn next_sequence_accessor() {
446        let mut outbox = Outbox::new(100);
447        assert_eq!(outbox.next_sequence(), 1);
448        outbox.append(make_event("a")).unwrap();
449        assert_eq!(outbox.next_sequence(), 2);
450    }
451
452    #[test]
453    fn persistent_outbox_roundtrip() {
454        let dir = tempdir().unwrap();
455        let path = dir.path().join("outbox.json");
456
457        {
458            let mut outbox = Outbox::with_persistence(10, &path).unwrap();
459            outbox.append(make_event("a")).unwrap();
460            outbox.append(make_event("b")).unwrap();
461            assert_eq!(outbox.count(), 2);
462        }
463
464        let outbox = Outbox::with_persistence(10, &path).unwrap();
465        assert_eq!(outbox.count(), 2);
466        assert_eq!(outbox.next_sequence(), 3);
467    }
468
469    #[test]
470    fn drain_restores_events_when_persist_fails() {
471        let dir = tempdir().unwrap();
472        let mut outbox = Outbox::new(10);
473        outbox.append(make_event("a")).unwrap();
474        outbox.append(make_event("b")).unwrap();
475        outbox.persistence_path = Some(dir.path().join("outbox.json"));
476        outbox.persist().unwrap();
477        std::fs::remove_file(dir.path().join("outbox.json")).unwrap();
478        std::fs::create_dir(dir.path().join("outbox.json")).unwrap();
479
480        let err = outbox.drain(1).unwrap_err();
481        assert!(matches!(err, SyncError::Storage(_)));
482        assert_eq!(outbox.count(), 2);
483        assert_eq!(outbox.peek(10)[0].event_type, "a");
484        assert_eq!(outbox.peek(10)[1].event_type, "b");
485    }
486}