Skip to main content

klauthed_data/outbox/
mod.rs

1//! Transactional outbox.
2//!
3//! The outbox pattern makes "change state **and** publish an event" atomic
4//! without a distributed transaction: the producer writes domain events into an
5//! outbox table *in the same transaction* as the state change, and a separate
6//! relay later reads unpublished entries and ships them to the broker, marking
7//! them published. This crate provides the backend-agnostic [`Outbox`] trait,
8//! the [`OutboxEntry`] row model, and an in-memory implementation for tests.
9//!
10//! A real Postgres-backed `Outbox` (selecting `FOR UPDATE SKIP LOCKED`) is a
11//! future pass; the trait is shaped so that backend can drop in unchanged.
12//!
13//! ```
14//! use klauthed_data::outbox::{InMemoryOutbox, Outbox};
15//!
16//! # async fn run() -> Result<(), klauthed_data::DataError> {
17//! let outbox = InMemoryOutbox::new();
18//! let unpublished = outbox.fetch_unpublished(10).await?;
19//! assert!(unpublished.is_empty());
20//! # Ok(())
21//! # }
22//! ```
23
24#[cfg(feature = "sql")]
25pub mod sql;
26
27#[cfg(feature = "mongodb")]
28pub mod mongo;
29
30#[cfg(feature = "sql")]
31pub use sql::SqlOutbox;
32
33#[cfg(feature = "mongodb")]
34pub use mongo::MongoOutbox;
35
36use async_trait::async_trait;
37use klauthed_core::domain::{DomainEvent, EventEnvelope};
38use klauthed_core::id::Id;
39use klauthed_core::time::Timestamp;
40use serde::{Deserialize, Serialize};
41use std::sync::Mutex;
42
43use crate::error::DataError;
44
45/// Marker tag for an [`OutboxEntry`]'s identity.
46pub struct OutboxTag;
47
48/// The id minted for each persisted outbox row.
49pub type OutboxId = Id<OutboxTag>;
50
51/// One row in the outbox: a serialized domain event awaiting publication.
52///
53/// Construct from an [`EventEnvelope`] via [`OutboxEntry::from_envelope`], which
54/// carries over the aggregate metadata and serializes the payload to JSON.
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
56pub struct OutboxEntry {
57    /// Unique id of this outbox row.
58    pub id: OutboxId,
59    /// The aggregate type the event belongs to, e.g. `account`.
60    pub aggregate_type: String,
61    /// The aggregate instance id, rendered as a string.
62    pub aggregate_id: String,
63    /// The stable, dotted event name, e.g. `account.opened`.
64    pub event_type: String,
65    /// The aggregate version this event produced (monotonic per aggregate).
66    pub sequence: u64,
67    /// The serialized event payload.
68    pub payload: serde_json::Value,
69    /// When the event occurred.
70    pub occurred_at: Timestamp,
71    /// Whether the relay has shipped this entry to the broker.
72    pub published: bool,
73    /// When the entry was marked published, if it has been.
74    pub published_at: Option<Timestamp>,
75}
76
77impl OutboxEntry {
78    /// Build an unpublished entry from an [`EventEnvelope`], serializing its
79    /// payload to JSON.
80    ///
81    /// # Errors
82    /// Returns [`DataError::Outbox`] if the payload cannot be serialized.
83    pub fn from_envelope<E>(envelope: &EventEnvelope<E>) -> Result<Self, DataError>
84    where
85        E: Serialize + DomainEvent,
86    {
87        let payload = serde_json::to_value(&envelope.payload)
88            .map_err(|e| DataError::Outbox(format!("failed to serialize event payload: {e}")))?;
89        Ok(Self {
90            id: OutboxId::new(),
91            aggregate_type: envelope.aggregate_type.to_string(),
92            aggregate_id: envelope.aggregate_id.clone(),
93            event_type: envelope.event_type.to_string(),
94            sequence: envelope.sequence,
95            payload,
96            occurred_at: envelope.occurred_at,
97            published: false,
98            published_at: None,
99        })
100    }
101}
102
103/// A durable buffer of domain events awaiting publication.
104///
105/// Implementations persist entries alongside aggregate state (ideally in the
106/// same transaction) and a relay drains them with
107/// [`fetch_unpublished`](Outbox::fetch_unpublished) /
108/// [`mark_published`](Outbox::mark_published).
109#[async_trait]
110pub trait Outbox: Send + Sync {
111    /// Persist a batch of entries (idempotent on `id` is the implementation's
112    /// responsibility; the in-memory impl simply appends).
113    async fn enqueue(&self, entries: Vec<OutboxEntry>) -> Result<(), DataError>;
114
115    /// Return up to `limit` unpublished entries, oldest first.
116    async fn fetch_unpublished(&self, limit: usize) -> Result<Vec<OutboxEntry>, DataError>;
117
118    /// Mark the given entries published (no-op for ids that are absent or
119    /// already published).
120    async fn mark_published(&self, ids: &[OutboxId]) -> Result<(), DataError>;
121}
122
123/// A thread-safe, in-memory [`Outbox`] for tests and single-process use.
124///
125/// Not durable: entries live only for the lifetime of the process.
126#[derive(Default)]
127pub struct InMemoryOutbox {
128    entries: Mutex<Vec<OutboxEntry>>,
129}
130
131impl InMemoryOutbox {
132    /// An empty outbox.
133    pub fn new() -> Self {
134        Self::default()
135    }
136
137    /// The total number of entries held (published or not).
138    pub fn len(&self) -> usize {
139        self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner).len()
140    }
141
142    /// Whether the outbox holds no entries at all.
143    pub fn is_empty(&self) -> bool {
144        self.len() == 0
145    }
146}
147
148#[async_trait]
149impl Outbox for InMemoryOutbox {
150    async fn enqueue(&self, entries: Vec<OutboxEntry>) -> Result<(), DataError> {
151        self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner).extend(entries);
152        Ok(())
153    }
154
155    async fn fetch_unpublished(&self, limit: usize) -> Result<Vec<OutboxEntry>, DataError> {
156        let guard = self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
157        let mut unpublished: Vec<OutboxEntry> =
158            guard.iter().filter(|e| !e.published).cloned().collect();
159        // Oldest first by id (UUID v7 is time-sortable), then truncate.
160        unpublished.sort_by_key(|e| e.id);
161        unpublished.truncate(limit);
162        Ok(unpublished)
163    }
164
165    async fn mark_published(&self, ids: &[OutboxId]) -> Result<(), DataError> {
166        let now = Timestamp::now();
167        let mut guard = self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
168        for entry in guard.iter_mut() {
169            if !entry.published && ids.contains(&entry.id) {
170                entry.published = true;
171                entry.published_at = Some(now);
172            }
173        }
174        Ok(())
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use std::borrow::Cow;
182
183    #[derive(Debug, Serialize)]
184    struct Opened {
185        owner: String,
186    }
187
188    impl DomainEvent for Opened {
189        fn event_type(&self) -> &'static str {
190            "account.opened"
191        }
192    }
193
194    fn envelope(seq: u64) -> EventEnvelope<Opened> {
195        EventEnvelope {
196            event_id: Id::new(),
197            event_type: Cow::Borrowed("account.opened"),
198            aggregate_id: "acct-1".to_owned(),
199            aggregate_type: Cow::Borrowed("account"),
200            sequence: seq,
201            occurred_at: Timestamp::from_unix_millis(1_000),
202            payload: Opened { owner: "alice".to_owned() },
203        }
204    }
205
206    #[test]
207    fn from_envelope_carries_metadata_and_serializes_payload() {
208        let entry = OutboxEntry::from_envelope(&envelope(7)).unwrap();
209        assert_eq!(entry.aggregate_type, "account");
210        assert_eq!(entry.aggregate_id, "acct-1");
211        assert_eq!(entry.event_type, "account.opened");
212        assert_eq!(entry.sequence, 7);
213        assert_eq!(entry.payload["owner"], "alice");
214        assert!(!entry.published);
215        assert!(entry.published_at.is_none());
216    }
217
218    #[tokio::test]
219    async fn enqueue_fetch_mark_published_round_trip() {
220        let outbox = InMemoryOutbox::new();
221        let e1 = OutboxEntry::from_envelope(&envelope(1)).unwrap();
222        let e2 = OutboxEntry::from_envelope(&envelope(2)).unwrap();
223        let (id1, id2) = (e1.id, e2.id);
224
225        outbox.enqueue(vec![e1, e2]).await.unwrap();
226        assert_eq!(outbox.len(), 2);
227
228        let unpublished = outbox.fetch_unpublished(10).await.unwrap();
229        assert_eq!(unpublished.len(), 2);
230
231        // Publish only the first; it should drop out of the next fetch.
232        outbox.mark_published(&[id1]).await.unwrap();
233        let remaining = outbox.fetch_unpublished(10).await.unwrap();
234        assert_eq!(remaining.len(), 1);
235        assert_eq!(remaining[0].id, id2);
236
237        outbox.mark_published(&[id2]).await.unwrap();
238        assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
239    }
240
241    #[tokio::test]
242    async fn fetch_unpublished_honors_limit() {
243        let outbox = InMemoryOutbox::new();
244        let entries = (1..=5).map(|s| OutboxEntry::from_envelope(&envelope(s)).unwrap()).collect();
245        outbox.enqueue(entries).await.unwrap();
246
247        assert_eq!(outbox.fetch_unpublished(2).await.unwrap().len(), 2);
248        assert_eq!(outbox.fetch_unpublished(100).await.unwrap().len(), 5);
249    }
250}