klauthed_data/outbox/
mod.rs1#[cfg(feature = "sql")]
25pub mod sql;
26
27#[cfg(feature = "mongodb")]
28pub mod mongo;
29
30#[cfg(feature = "sql")]
31pub use sql::SqlOutbox;
32
33#[cfg(feature = "mongodb")]
34pub use mongo::MongoOutbox;
35
36use async_trait::async_trait;
37use klauthed_core::domain::{DomainEvent, EventEnvelope};
38use klauthed_core::id::Id;
39use klauthed_core::time::Timestamp;
40use serde::{Deserialize, Serialize};
41use std::sync::Mutex;
42
43use crate::error::DataError;
44
45pub struct OutboxTag;
47
48pub type OutboxId = Id<OutboxTag>;
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
56pub struct OutboxEntry {
57 pub id: OutboxId,
59 pub aggregate_type: String,
61 pub aggregate_id: String,
63 pub event_type: String,
65 pub sequence: u64,
67 pub payload: serde_json::Value,
69 pub occurred_at: Timestamp,
71 pub published: bool,
73 pub published_at: Option<Timestamp>,
75}
76
77impl OutboxEntry {
78 pub fn from_envelope<E>(envelope: &EventEnvelope<E>) -> Result<Self, DataError>
84 where
85 E: Serialize + DomainEvent,
86 {
87 let payload = serde_json::to_value(&envelope.payload)
88 .map_err(|e| DataError::Outbox(format!("failed to serialize event payload: {e}")))?;
89 Ok(Self {
90 id: OutboxId::new(),
91 aggregate_type: envelope.aggregate_type.to_string(),
92 aggregate_id: envelope.aggregate_id.clone(),
93 event_type: envelope.event_type.to_string(),
94 sequence: envelope.sequence,
95 payload,
96 occurred_at: envelope.occurred_at,
97 published: false,
98 published_at: None,
99 })
100 }
101}
102
103#[async_trait]
110pub trait Outbox: Send + Sync {
111 async fn enqueue(&self, entries: Vec<OutboxEntry>) -> Result<(), DataError>;
114
115 async fn fetch_unpublished(&self, limit: usize) -> Result<Vec<OutboxEntry>, DataError>;
117
118 async fn mark_published(&self, ids: &[OutboxId]) -> Result<(), DataError>;
121}
122
123#[derive(Default)]
127pub struct InMemoryOutbox {
128 entries: Mutex<Vec<OutboxEntry>>,
129}
130
131impl InMemoryOutbox {
132 pub fn new() -> Self {
134 Self::default()
135 }
136
137 pub fn len(&self) -> usize {
139 self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner).len()
140 }
141
142 pub fn is_empty(&self) -> bool {
144 self.len() == 0
145 }
146}
147
148#[async_trait]
149impl Outbox for InMemoryOutbox {
150 async fn enqueue(&self, entries: Vec<OutboxEntry>) -> Result<(), DataError> {
151 self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner).extend(entries);
152 Ok(())
153 }
154
155 async fn fetch_unpublished(&self, limit: usize) -> Result<Vec<OutboxEntry>, DataError> {
156 let guard = self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
157 let mut unpublished: Vec<OutboxEntry> =
158 guard.iter().filter(|e| !e.published).cloned().collect();
159 unpublished.sort_by_key(|e| e.id);
161 unpublished.truncate(limit);
162 Ok(unpublished)
163 }
164
165 async fn mark_published(&self, ids: &[OutboxId]) -> Result<(), DataError> {
166 let now = Timestamp::now();
167 let mut guard = self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
168 for entry in guard.iter_mut() {
169 if !entry.published && ids.contains(&entry.id) {
170 entry.published = true;
171 entry.published_at = Some(now);
172 }
173 }
174 Ok(())
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use std::borrow::Cow;
182
183 #[derive(Debug, Serialize)]
184 struct Opened {
185 owner: String,
186 }
187
188 impl DomainEvent for Opened {
189 fn event_type(&self) -> &'static str {
190 "account.opened"
191 }
192 }
193
194 fn envelope(seq: u64) -> EventEnvelope<Opened> {
195 EventEnvelope {
196 event_id: Id::new(),
197 event_type: Cow::Borrowed("account.opened"),
198 aggregate_id: "acct-1".to_owned(),
199 aggregate_type: Cow::Borrowed("account"),
200 sequence: seq,
201 occurred_at: Timestamp::from_unix_millis(1_000),
202 payload: Opened { owner: "alice".to_owned() },
203 }
204 }
205
206 #[test]
207 fn from_envelope_carries_metadata_and_serializes_payload() {
208 let entry = OutboxEntry::from_envelope(&envelope(7)).unwrap();
209 assert_eq!(entry.aggregate_type, "account");
210 assert_eq!(entry.aggregate_id, "acct-1");
211 assert_eq!(entry.event_type, "account.opened");
212 assert_eq!(entry.sequence, 7);
213 assert_eq!(entry.payload["owner"], "alice");
214 assert!(!entry.published);
215 assert!(entry.published_at.is_none());
216 }
217
218 #[tokio::test]
219 async fn enqueue_fetch_mark_published_round_trip() {
220 let outbox = InMemoryOutbox::new();
221 let e1 = OutboxEntry::from_envelope(&envelope(1)).unwrap();
222 let e2 = OutboxEntry::from_envelope(&envelope(2)).unwrap();
223 let (id1, id2) = (e1.id, e2.id);
224
225 outbox.enqueue(vec![e1, e2]).await.unwrap();
226 assert_eq!(outbox.len(), 2);
227
228 let unpublished = outbox.fetch_unpublished(10).await.unwrap();
229 assert_eq!(unpublished.len(), 2);
230
231 outbox.mark_published(&[id1]).await.unwrap();
233 let remaining = outbox.fetch_unpublished(10).await.unwrap();
234 assert_eq!(remaining.len(), 1);
235 assert_eq!(remaining[0].id, id2);
236
237 outbox.mark_published(&[id2]).await.unwrap();
238 assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
239 }
240
241 #[tokio::test]
242 async fn fetch_unpublished_honors_limit() {
243 let outbox = InMemoryOutbox::new();
244 let entries = (1..=5).map(|s| OutboxEntry::from_envelope(&envelope(s)).unwrap()).collect();
245 outbox.enqueue(entries).await.unwrap();
246
247 assert_eq!(outbox.fetch_unpublished(2).await.unwrap().len(), 2);
248 assert_eq!(outbox.fetch_unpublished(100).await.unwrap().len(), 5);
249 }
250}