1use std::collections::VecDeque;
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use serde::{Deserialize, Serialize};
6
7use crate::error::SyncError;
8use crate::event::SyncEvent;
9
10const DEFAULT_MAX_CAPACITY: usize = 10_000;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15struct OutboxSnapshot {
16 events: Vec<SyncEvent>,
17 next_sequence: u64,
18}
19
20#[derive(Debug)]
41pub struct Outbox {
42 events: VecDeque<SyncEvent>,
43 max_capacity: usize,
44 next_sequence: u64,
45 persistence_path: Option<PathBuf>,
46}
47
48impl Outbox {
49 #[must_use]
51 pub const fn new(max_capacity: usize) -> Self {
52 Self { events: VecDeque::new(), max_capacity, next_sequence: 1, persistence_path: None }
53 }
54
55 #[must_use]
57 pub const fn with_default_capacity() -> Self {
58 Self::new(DEFAULT_MAX_CAPACITY)
59 }
60
61 pub fn with_persistence(
69 max_capacity: usize,
70 path: impl AsRef<Path>,
71 ) -> Result<Self, SyncError> {
72 let path = path.as_ref().to_path_buf();
73 let mut outbox = Self {
74 events: VecDeque::new(),
75 max_capacity,
76 next_sequence: 1,
77 persistence_path: Some(path.clone()),
78 };
79
80 if path.exists() {
81 let contents = fs::read_to_string(&path)
82 .map_err(|e| SyncError::Storage(format!("read outbox snapshot failed: {e}")))?;
83 if !contents.trim().is_empty() {
84 let snapshot: OutboxSnapshot = serde_json::from_str(&contents)?;
85 outbox.events = snapshot.events.into();
86 outbox.next_sequence = snapshot.next_sequence.max(
87 outbox.events.back().map(|event| event.sequence.saturating_add(1)).unwrap_or(1),
88 );
89 while outbox.events.len() > outbox.max_capacity {
90 let _ = outbox.events.pop_front();
91 }
92 }
93 } else {
94 outbox.persist()?;
95 }
96
97 Ok(outbox)
98 }
99
100 pub fn append(&mut self, event: SyncEvent) -> Result<u64, SyncError> {
109 if self.events.len() >= self.max_capacity {
110 return Err(SyncError::OutboxFull {
111 capacity: self.max_capacity,
112 current: self.events.len(),
113 });
114 }
115
116 let seq = self.next_sequence;
117 self.next_sequence += 1;
118 let event = event.with_sequence(seq);
119 self.events.push_back(event);
120
121 if let Err(err) = self.persist() {
122 let _ = self.events.pop_back();
123 self.next_sequence = self.next_sequence.saturating_sub(1);
124 return Err(err);
125 }
126
127 Ok(seq)
128 }
129
130 pub fn drain(&mut self, count: usize) -> Result<Vec<SyncEvent>, SyncError> {
139 let n = count.min(self.events.len());
140 let drained: Vec<SyncEvent> = self.events.drain(..n).collect();
141
142 if let Err(err) = self.persist() {
143 for event in drained.iter().rev().cloned() {
144 self.events.push_front(event);
145 }
146 return Err(err);
147 }
148
149 Ok(drained)
150 }
151
152 #[must_use]
154 pub fn peek(&self, count: usize) -> Vec<&SyncEvent> {
155 self.events.iter().take(count).collect()
156 }
157
158 pub fn retain<F>(&mut self, predicate: F)
162 where
163 F: FnMut(&SyncEvent) -> bool,
164 {
165 let _ = self.try_retain(predicate);
166 }
167
168 pub fn try_retain<F>(&mut self, mut predicate: F) -> Result<(), SyncError>
175 where
176 F: FnMut(&SyncEvent) -> bool,
177 {
178 let before = self.events.clone();
179 self.events.retain(|event| predicate(event));
180 if let Err(err) = self.persist() {
181 self.events = before;
182 return Err(err);
183 }
184 Ok(())
185 }
186
187 #[must_use]
189 pub fn count(&self) -> usize {
190 self.events.len()
191 }
192
193 #[must_use]
195 pub fn is_empty(&self) -> bool {
196 self.events.is_empty()
197 }
198
199 #[must_use]
201 pub fn is_full(&self) -> bool {
202 self.events.len() >= self.max_capacity
203 }
204
205 pub fn clear(&mut self) {
207 let before = self.events.clone();
208 self.events.clear();
209 if self.persist().is_err() {
210 self.events = before;
211 }
212 }
213
214 #[must_use]
216 pub const fn max_capacity(&self) -> usize {
217 self.max_capacity
218 }
219
220 #[must_use]
222 pub const fn next_sequence(&self) -> u64 {
223 self.next_sequence
224 }
225
226 fn persist(&self) -> Result<(), SyncError> {
227 let Some(path) = &self.persistence_path else {
228 return Ok(());
229 };
230
231 let snapshot = OutboxSnapshot {
232 events: self.events.iter().cloned().collect(),
233 next_sequence: self.next_sequence,
234 };
235
236 let serialized = serde_json::to_string_pretty(&snapshot)?;
237 if let Some(parent) = path.parent() {
238 fs::create_dir_all(parent).map_err(|e| {
239 SyncError::Storage(format!("create outbox snapshot directory failed: {e}"))
240 })?;
241 }
242
243 let tmp_path = path.with_extension("tmp");
244 fs::write(&tmp_path, serialized)
245 .map_err(|e| SyncError::Storage(format!("write outbox snapshot failed: {e}")))?;
246 fs::rename(&tmp_path, path)
247 .map_err(|e| SyncError::Storage(format!("replace outbox snapshot failed: {e}")))?;
248
249 Ok(())
250 }
251}
252
253impl Default for Outbox {
254 fn default() -> Self {
255 Self::with_default_capacity()
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262 use serde_json::json;
263 use tempfile::tempdir;
264
265 fn make_event(event_type: &str) -> SyncEvent {
266 SyncEvent::new(event_type, "order", "ORD-1", json!({}))
267 }
268
269 #[test]
270 fn append_increments_sequence() {
271 let mut outbox = Outbox::new(100);
272 let s1 = outbox.append(make_event("a")).unwrap();
273 let s2 = outbox.append(make_event("b")).unwrap();
274 let s3 = outbox.append(make_event("c")).unwrap();
275 assert_eq!(s1, 1);
276 assert_eq!(s2, 2);
277 assert_eq!(s3, 3);
278 }
279
280 #[test]
281 fn append_updates_count() {
282 let mut outbox = Outbox::new(100);
283 assert_eq!(outbox.count(), 0);
284 outbox.append(make_event("a")).unwrap();
285 assert_eq!(outbox.count(), 1);
286 outbox.append(make_event("b")).unwrap();
287 assert_eq!(outbox.count(), 2);
288 }
289
290 #[test]
291 fn append_at_capacity_fails() {
292 let mut outbox = Outbox::new(2);
293 outbox.append(make_event("a")).unwrap();
294 outbox.append(make_event("b")).unwrap();
295 let result = outbox.append(make_event("c"));
296 assert!(result.is_err());
297 assert!(matches!(result.unwrap_err(), SyncError::OutboxFull { capacity: 2, current: 2 }));
298 }
299
300 #[test]
301 fn drain_partial() {
302 let mut outbox = Outbox::new(100);
303 outbox.append(make_event("a")).unwrap();
304 outbox.append(make_event("b")).unwrap();
305 outbox.append(make_event("c")).unwrap();
306
307 let drained = outbox.drain(2).unwrap();
308 assert_eq!(drained.len(), 2);
309 assert_eq!(drained[0].sequence, 1);
310 assert_eq!(drained[1].sequence, 2);
311 assert_eq!(outbox.count(), 1);
312 }
313
314 #[test]
315 fn drain_all() {
316 let mut outbox = Outbox::new(100);
317 outbox.append(make_event("a")).unwrap();
318 outbox.append(make_event("b")).unwrap();
319
320 let drained = outbox.drain(10).unwrap();
321 assert_eq!(drained.len(), 2);
322 assert!(outbox.is_empty());
323 }
324
325 #[test]
326 fn drain_empty() {
327 let mut outbox = Outbox::new(100);
328 let drained = outbox.drain(10).unwrap();
329 assert!(drained.is_empty());
330 }
331
332 #[test]
333 fn peek_without_consuming() {
334 let mut outbox = Outbox::new(100);
335 outbox.append(make_event("a")).unwrap();
336 outbox.append(make_event("b")).unwrap();
337
338 let peeked = outbox.peek(1);
339 assert_eq!(peeked.len(), 1);
340 assert_eq!(peeked[0].sequence, 1);
341 assert_eq!(outbox.count(), 2); }
343
344 #[test]
345 fn peek_more_than_available() {
346 let mut outbox = Outbox::new(100);
347 outbox.append(make_event("a")).unwrap();
348
349 let peeked = outbox.peek(10);
350 assert_eq!(peeked.len(), 1);
351 }
352
353 #[test]
354 fn retain_filters_events_and_preserves_order() {
355 let mut outbox = Outbox::new(10);
356 outbox.append(make_event("a")).unwrap();
357 outbox.append(make_event("b")).unwrap();
358 outbox.append(make_event("c")).unwrap();
359
360 outbox.retain(|event| event.event_type != "b");
361
362 let remaining = outbox.peek(10);
363 assert_eq!(remaining.len(), 2);
364 assert_eq!(remaining[0].event_type, "a");
365 assert_eq!(remaining[1].event_type, "c");
366 }
367
368 #[test]
369 fn clear_removes_all() {
370 let mut outbox = Outbox::new(100);
371 outbox.append(make_event("a")).unwrap();
372 outbox.append(make_event("b")).unwrap();
373 outbox.clear();
374 assert!(outbox.is_empty());
375 assert_eq!(outbox.count(), 0);
376 }
377
378 #[test]
379 fn clear_does_not_reset_sequence() {
380 let mut outbox = Outbox::new(100);
381 outbox.append(make_event("a")).unwrap();
382 outbox.append(make_event("b")).unwrap();
383 outbox.clear();
384
385 let seq = outbox.append(make_event("c")).unwrap();
386 assert_eq!(seq, 3); }
388
389 #[test]
390 fn is_empty_and_is_full() {
391 let mut outbox = Outbox::new(2);
392 assert!(outbox.is_empty());
393 assert!(!outbox.is_full());
394
395 outbox.append(make_event("a")).unwrap();
396 assert!(!outbox.is_empty());
397 assert!(!outbox.is_full());
398
399 outbox.append(make_event("b")).unwrap();
400 assert!(!outbox.is_empty());
401 assert!(outbox.is_full());
402 }
403
404 #[test]
405 fn fifo_ordering() {
406 let mut outbox = Outbox::new(100);
407 outbox.append(make_event("first")).unwrap();
408 outbox.append(make_event("second")).unwrap();
409 outbox.append(make_event("third")).unwrap();
410
411 let drained = outbox.drain(3).unwrap();
412 assert_eq!(drained[0].event_type, "first");
413 assert_eq!(drained[1].event_type, "second");
414 assert_eq!(drained[2].event_type, "third");
415 }
416
417 #[test]
418 fn drain_then_append_works() {
419 let mut outbox = Outbox::new(2);
420 outbox.append(make_event("a")).unwrap();
421 outbox.append(make_event("b")).unwrap();
422 assert!(outbox.is_full());
423
424 outbox.drain(1).unwrap();
425 assert!(!outbox.is_full());
426
427 let seq = outbox.append(make_event("c")).unwrap();
428 assert_eq!(seq, 3);
429 }
430
431 #[test]
432 fn default_capacity() {
433 let outbox = Outbox::with_default_capacity();
434 assert_eq!(outbox.max_capacity(), DEFAULT_MAX_CAPACITY);
435 }
436
437 #[test]
438 fn default_trait() {
439 let outbox = Outbox::default();
440 assert_eq!(outbox.max_capacity(), DEFAULT_MAX_CAPACITY);
441 assert!(outbox.is_empty());
442 }
443
444 #[test]
445 fn next_sequence_accessor() {
446 let mut outbox = Outbox::new(100);
447 assert_eq!(outbox.next_sequence(), 1);
448 outbox.append(make_event("a")).unwrap();
449 assert_eq!(outbox.next_sequence(), 2);
450 }
451
452 #[test]
453 fn persistent_outbox_roundtrip() {
454 let dir = tempdir().unwrap();
455 let path = dir.path().join("outbox.json");
456
457 {
458 let mut outbox = Outbox::with_persistence(10, &path).unwrap();
459 outbox.append(make_event("a")).unwrap();
460 outbox.append(make_event("b")).unwrap();
461 assert_eq!(outbox.count(), 2);
462 }
463
464 let outbox = Outbox::with_persistence(10, &path).unwrap();
465 assert_eq!(outbox.count(), 2);
466 assert_eq!(outbox.next_sequence(), 3);
467 }
468
469 #[test]
470 fn drain_restores_events_when_persist_fails() {
471 let dir = tempdir().unwrap();
472 let mut outbox = Outbox::new(10);
473 outbox.append(make_event("a")).unwrap();
474 outbox.append(make_event("b")).unwrap();
475 outbox.persistence_path = Some(dir.path().join("outbox.json"));
476 outbox.persist().unwrap();
477 std::fs::remove_file(dir.path().join("outbox.json")).unwrap();
478 std::fs::create_dir(dir.path().join("outbox.json")).unwrap();
479
480 let err = outbox.drain(1).unwrap_err();
481 assert!(matches!(err, SyncError::Storage(_)));
482 assert_eq!(outbox.count(), 2);
483 assert_eq!(outbox.peek(10)[0].event_type, "a");
484 assert_eq!(outbox.peek(10)[1].event_type, "b");
485 }
486}