1use crate::error::Result;
5use crate::events::ChangeEvent;
6use crate::storage::{BatchWriter, Storage};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11const OUTBOX_PREFIX: &[u8] = b"_outbox/";
12const DEAD_LETTER_PREFIX: &[u8] = b"_dead_letter/";
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15struct StoredOutboxEntry {
16 events: Vec<ChangeEvent>,
17 retry_count: u32,
18 created_at: u64,
19 #[serde(default)]
20 dispatched_count: usize,
21}
22
23pub struct Outbox {
24 storage: Arc<Storage>,
25}
26
27impl Outbox {
28 #[allow(clippy::must_use_candidate)]
29 pub fn new(storage: Arc<Storage>) -> Self {
30 Self { storage }
31 }
32
33 pub fn enqueue_event(&self, batch: &mut BatchWriter, operation_id: &str, event: &ChangeEvent) {
34 self.enqueue_events(batch, operation_id, std::slice::from_ref(event));
35 }
36
37 pub fn enqueue_events(
38 &self,
39 batch: &mut BatchWriter,
40 operation_id: &str,
41 events: &[ChangeEvent],
42 ) {
43 let key = format!("_outbox/{operation_id}");
44 let stored = StoredOutboxEntry {
45 events: events.to_vec(),
46 retry_count: 0,
47 created_at: SystemTime::now()
48 .duration_since(UNIX_EPOCH)
49 .map(|d| d.as_secs())
50 .unwrap_or(0),
51 dispatched_count: 0,
52 };
53 let value = serde_json::to_vec(&stored).unwrap_or_default();
54 batch.insert(key.into_bytes(), value);
55 }
56
57 pub fn pending_events(&self) -> Result<Vec<OutboxEntry>> {
60 self.scan_entries(OUTBOX_PREFIX, "_outbox/")
61 }
62
63 fn scan_entries(&self, prefix: &[u8], strip_prefix: &str) -> Result<Vec<OutboxEntry>> {
64 let items = self.storage.prefix_scan(prefix)?;
65 let mut entries = Vec::new();
66
67 for (key, value) in items {
68 let key_str = String::from_utf8_lossy(&key);
69 let operation_id = key_str
70 .strip_prefix(strip_prefix)
71 .unwrap_or(&key_str)
72 .to_string();
73
74 if let Ok(stored) = serde_json::from_slice::<StoredOutboxEntry>(&value) {
75 entries.push(OutboxEntry {
76 operation_id,
77 events: stored.events,
78 retry_count: stored.retry_count,
79 created_at: stored.created_at,
80 dispatched_count: stored.dispatched_count,
81 });
82 } else if let Ok(event) = serde_json::from_slice::<ChangeEvent>(&value) {
83 entries.push(OutboxEntry {
84 operation_id,
85 events: vec![event],
86 retry_count: 0,
87 created_at: 0,
88 dispatched_count: 0,
89 });
90 } else if let Ok(events) = serde_json::from_slice::<Vec<ChangeEvent>>(&value) {
91 entries.push(OutboxEntry {
92 operation_id,
93 events,
94 retry_count: 0,
95 created_at: 0,
96 dispatched_count: 0,
97 });
98 }
99 }
100
101 Ok(entries)
102 }
103
104 pub fn mark_delivered(&self, operation_id: &str) -> Result<()> {
107 let key = format!("_outbox/{operation_id}");
108 self.storage.remove(key.as_bytes())
109 }
110
111 pub fn pending_count(&self) -> Result<usize> {
114 let items = self.storage.prefix_scan(OUTBOX_PREFIX)?;
115 Ok(items.len())
116 }
117
118 pub fn increment_retry(&self, operation_id: &str) -> Result<()> {
121 let key = format!("_outbox/{operation_id}");
122 if let Some(value) = self.storage.get(key.as_bytes())?
123 && let Ok(mut stored) = serde_json::from_slice::<StoredOutboxEntry>(&value)
124 {
125 stored.retry_count += 1;
126 let new_value = serde_json::to_vec(&stored).unwrap_or_default();
127 self.storage.insert(key.as_bytes(), &new_value)?;
128 }
129 Ok(())
130 }
131
132 pub fn update_dispatched_count(&self, operation_id: &str, count: usize) -> Result<()> {
135 let key = format!("_outbox/{operation_id}");
136 if let Some(value) = self.storage.get(key.as_bytes())?
137 && let Ok(mut stored) = serde_json::from_slice::<StoredOutboxEntry>(&value)
138 {
139 stored.dispatched_count = count;
140 stored.retry_count += 1;
141 let new_value = serde_json::to_vec(&stored).unwrap_or_default();
142 self.storage.insert(key.as_bytes(), &new_value)?;
143 }
144 Ok(())
145 }
146
147 pub fn move_to_dead_letter(&self, operation_id: &str) -> Result<()> {
150 let outbox_key = format!("_outbox/{operation_id}");
151 if let Some(value) = self.storage.get(outbox_key.as_bytes())? {
152 let dead_letter_key = format!("_dead_letter/{operation_id}");
153 let mut batch = self.storage.batch();
154 batch.remove(outbox_key.into_bytes());
155 batch.insert(dead_letter_key.into_bytes(), value);
156 batch.commit()?;
157 }
158 Ok(())
159 }
160
161 pub fn dead_letter_entries(&self) -> Result<Vec<OutboxEntry>> {
164 self.scan_entries(DEAD_LETTER_PREFIX, "_dead_letter/")
165 }
166
167 pub fn dead_letter_count(&self) -> Result<usize> {
170 let items = self.storage.prefix_scan(DEAD_LETTER_PREFIX)?;
171 Ok(items.len())
172 }
173
174 pub fn remove_dead_letter(&self, operation_id: &str) -> Result<()> {
177 let key = format!("_dead_letter/{operation_id}");
178 self.storage.remove(key.as_bytes())
179 }
180}
181
182#[derive(Debug, Clone)]
183pub struct OutboxEntry {
184 pub operation_id: String,
185 pub events: Vec<ChangeEvent>,
186 pub retry_count: u32,
187 pub created_at: u64,
188 pub dispatched_count: usize,
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::storage::MemoryBackend;
195
196 #[test]
197 fn test_outbox_enqueue_single_event() {
198 let backend = Arc::new(MemoryBackend::new());
199 let storage = Arc::new(Storage::with_backend(backend));
200 let outbox = Outbox::new(Arc::clone(&storage));
201
202 let event = ChangeEvent::create(
203 "users".to_string(),
204 "1".to_string(),
205 serde_json::json!({"name": "Alice"}),
206 );
207
208 let mut batch = storage.batch();
209 outbox.enqueue_event(&mut batch, "op-123", &event);
210 batch.commit().unwrap();
211
212 let pending = outbox.pending_events().unwrap();
213 assert_eq!(pending.len(), 1);
214 assert_eq!(pending[0].operation_id, "op-123");
215 assert_eq!(pending[0].events.len(), 1);
216 assert_eq!(pending[0].events[0].entity, "users");
217 }
218
219 #[test]
220 fn test_outbox_enqueue_multiple_events() {
221 let backend = Arc::new(MemoryBackend::new());
222 let storage = Arc::new(Storage::with_backend(backend));
223 let outbox = Outbox::new(Arc::clone(&storage));
224
225 let events = vec![
226 ChangeEvent::delete("users".to_string(), "1".to_string(), serde_json::json!({})),
227 ChangeEvent::delete("posts".to_string(), "10".to_string(), serde_json::json!({})),
228 ChangeEvent::delete("posts".to_string(), "11".to_string(), serde_json::json!({})),
229 ];
230
231 let mut batch = storage.batch();
232 outbox.enqueue_events(&mut batch, "cascade-456", &events);
233 batch.commit().unwrap();
234
235 let pending = outbox.pending_events().unwrap();
236 assert_eq!(pending.len(), 1);
237 assert_eq!(pending[0].events.len(), 3);
238 }
239
240 #[test]
241 fn test_outbox_mark_delivered() {
242 let backend = Arc::new(MemoryBackend::new());
243 let storage = Arc::new(Storage::with_backend(backend));
244 let outbox = Outbox::new(Arc::clone(&storage));
245
246 let event = ChangeEvent::create(
247 "users".to_string(),
248 "1".to_string(),
249 serde_json::json!({"name": "Alice"}),
250 );
251
252 let mut batch = storage.batch();
253 outbox.enqueue_event(&mut batch, "op-123", &event);
254 batch.commit().unwrap();
255
256 assert_eq!(outbox.pending_count().unwrap(), 1);
257
258 outbox.mark_delivered("op-123").unwrap();
259
260 assert_eq!(outbox.pending_count().unwrap(), 0);
261 }
262
263 #[test]
264 fn test_outbox_atomic_with_data() {
265 let backend = Arc::new(MemoryBackend::new());
266 let storage = Arc::new(Storage::with_backend(backend));
267 let outbox = Outbox::new(Arc::clone(&storage));
268
269 let event = ChangeEvent::create(
270 "users".to_string(),
271 "1".to_string(),
272 serde_json::json!({"name": "Alice"}),
273 );
274
275 let mut batch = storage.batch();
276 batch.insert(b"data/users/1".to_vec(), b"user data".to_vec());
277 outbox.enqueue_event(&mut batch, "op-123", &event);
278 batch.commit().unwrap();
279
280 assert!(storage.get(b"data/users/1").unwrap().is_some());
281 assert_eq!(outbox.pending_count().unwrap(), 1);
282 }
283
284 #[test]
285 fn test_outbox_retry_count() {
286 let backend = Arc::new(MemoryBackend::new());
287 let storage = Arc::new(Storage::with_backend(backend));
288 let outbox = Outbox::new(Arc::clone(&storage));
289
290 let event = ChangeEvent::create(
291 "users".to_string(),
292 "1".to_string(),
293 serde_json::json!({"name": "Alice"}),
294 );
295
296 let mut batch = storage.batch();
297 outbox.enqueue_event(&mut batch, "op-123", &event);
298 batch.commit().unwrap();
299
300 let pending = outbox.pending_events().unwrap();
301 assert_eq!(pending[0].retry_count, 0);
302
303 outbox.increment_retry("op-123").unwrap();
304 outbox.increment_retry("op-123").unwrap();
305
306 let pending = outbox.pending_events().unwrap();
307 assert_eq!(pending[0].retry_count, 2);
308 }
309
310 #[test]
311 fn test_outbox_dead_letter() {
312 let backend = Arc::new(MemoryBackend::new());
313 let storage = Arc::new(Storage::with_backend(backend));
314 let outbox = Outbox::new(Arc::clone(&storage));
315
316 let event = ChangeEvent::create(
317 "users".to_string(),
318 "1".to_string(),
319 serde_json::json!({"name": "Alice"}),
320 );
321
322 let mut batch = storage.batch();
323 outbox.enqueue_event(&mut batch, "op-123", &event);
324 batch.commit().unwrap();
325
326 assert_eq!(outbox.pending_count().unwrap(), 1);
327 assert_eq!(outbox.dead_letter_count().unwrap(), 0);
328
329 outbox.move_to_dead_letter("op-123").unwrap();
330
331 assert_eq!(outbox.pending_count().unwrap(), 0);
332 assert_eq!(outbox.dead_letter_count().unwrap(), 1);
333
334 let dead = outbox.dead_letter_entries().unwrap();
335 assert_eq!(dead[0].operation_id, "op-123");
336 assert_eq!(dead[0].events[0].entity, "users");
337 }
338
339 #[test]
340 fn test_outbox_created_at() {
341 let backend = Arc::new(MemoryBackend::new());
342 let storage = Arc::new(Storage::with_backend(backend));
343 let outbox = Outbox::new(Arc::clone(&storage));
344
345 let event = ChangeEvent::create(
346 "users".to_string(),
347 "1".to_string(),
348 serde_json::json!({"name": "Alice"}),
349 );
350
351 let mut batch = storage.batch();
352 outbox.enqueue_event(&mut batch, "op-123", &event);
353 batch.commit().unwrap();
354
355 let pending = outbox.pending_events().unwrap();
356 assert!(pending[0].created_at > 0);
357 }
358
359 #[test]
360 fn test_outbox_update_dispatched_count() {
361 let backend = Arc::new(MemoryBackend::new());
362 let storage = Arc::new(Storage::with_backend(backend));
363 let outbox = Outbox::new(Arc::clone(&storage));
364
365 let events = vec![
366 ChangeEvent::create("users".to_string(), "1".to_string(), serde_json::json!({})),
367 ChangeEvent::create("users".to_string(), "2".to_string(), serde_json::json!({})),
368 ChangeEvent::create("users".to_string(), "3".to_string(), serde_json::json!({})),
369 ];
370
371 let mut batch = storage.batch();
372 outbox.enqueue_events(&mut batch, "op-partial", &events);
373 batch.commit().unwrap();
374
375 let pending = outbox.pending_events().unwrap();
376 assert_eq!(pending[0].dispatched_count, 0);
377 assert_eq!(pending[0].retry_count, 0);
378
379 outbox.update_dispatched_count("op-partial", 1).unwrap();
380
381 let pending = outbox.pending_events().unwrap();
382 assert_eq!(pending[0].dispatched_count, 1);
383 assert_eq!(pending[0].retry_count, 1);
384 assert_eq!(pending[0].events.len(), 3);
385
386 outbox.update_dispatched_count("op-partial", 2).unwrap();
387
388 let pending = outbox.pending_events().unwrap();
389 assert_eq!(pending[0].dispatched_count, 2);
390 assert_eq!(pending[0].retry_count, 2);
391 }
392
393 #[test]
394 fn test_outbox_backward_compat_missing_dispatched_count() {
395 let backend = Arc::new(MemoryBackend::new());
396 let storage = Arc::new(Storage::with_backend(backend));
397 let outbox = Outbox::new(Arc::clone(&storage));
398
399 let legacy = serde_json::json!({
400 "events": [{"sequence": 0, "entity": "users", "id": "1", "operation": "Create", "data": {}}],
401 "retry_count": 3,
402 "created_at": 1000
403 });
404 storage
405 .insert(b"_outbox/legacy-op", &serde_json::to_vec(&legacy).unwrap())
406 .unwrap();
407
408 let pending = outbox.pending_events().unwrap();
409 assert_eq!(pending.len(), 1);
410 assert_eq!(pending[0].dispatched_count, 0);
411 assert_eq!(pending[0].retry_count, 3);
412 }
413}