Skip to main content

klauthed_data/outbox/
sql.rs

1//! SQL-backed [`Outbox`] over sqlx's driver-agnostic `AnyPool`.
2//!
3//! [`SqlOutbox`] persists outbox rows in a single portable table so the same
4//! code drives PostgreSQL, MySQL/MariaDB, or SQLite — whichever driver feature
5//! is compiled in. The schema uses only portable column types (`TEXT`,
6//! `BIGINT`, `INTEGER` boolean) so the DDL in [`SqlOutbox::CREATE_TABLE_SQL`]
7//! applies unchanged across backends.
8//!
9//! Timestamps are stored as RFC3339 `TEXT` (lexicographically sortable) and
10//! booleans as `0`/`1` integers, both for maximum portability through the `Any`
11//! driver layer.
12//!
13//! # Concurrent pollers
14//!
15//! The portable [`fetch_unpublished`](Outbox::fetch_unpublished) does a plain
16//! ordered `SELECT`; if several relay processes poll at once they can fetch the
17//! same rows and double-publish (the broker should dedupe on event id). On
18//! PostgreSQL prefer [`SqlOutbox::fetch_unpublished_skip_locked`] (gated behind
19//! the `postgres` feature), which claims rows with `FOR UPDATE SKIP LOCKED` so
20//! each poller gets a disjoint batch.
21//!
22//! ```no_run
23//! # async fn run() -> Result<(), klauthed_data::DataError> {
24//! use klauthed_data::outbox::Outbox;
25//! use klauthed_data::outbox::SqlOutbox;
26//!
27//! sqlx::any::install_default_drivers();
28//! let pool = sqlx::AnyPool::connect("sqlite::memory:").await?;
29//! let outbox = SqlOutbox::new(pool);
30//! outbox.ensure_schema().await?;
31//! let pending = outbox.fetch_unpublished(100).await?;
32//! # let _ = pending;
33//! # Ok(())
34//! # }
35//! ```
36
37use async_trait::async_trait;
38use klauthed_core::time::Timestamp;
39use sqlx::AnyPool;
40use sqlx::Row;
41
42use crate::error::DataError;
43use crate::outbox::{Outbox, OutboxEntry, OutboxId};
44
45/// A durable [`Outbox`] backed by a relational table on an [`AnyPool`].
46///
47/// Clone-cheap: holds only the pool handle (itself an `Arc` internally).
48#[derive(Clone)]
49pub struct SqlOutbox {
50    pool: AnyPool,
51    table: String,
52}
53
54impl SqlOutbox {
55    /// Default table name used when constructed with [`SqlOutbox::new`].
56    pub const DEFAULT_TABLE: &'static str = "outbox";
57
58    /// Portable DDL for the outbox table (default table name), created only if
59    /// absent. Run once at startup via [`SqlOutbox::ensure_schema`], or apply it
60    /// through your migration tooling.
61    pub const CREATE_TABLE_SQL: &'static str = "\
62CREATE TABLE IF NOT EXISTS outbox (
63    id             TEXT    NOT NULL PRIMARY KEY,
64    aggregate_type TEXT    NOT NULL,
65    aggregate_id   TEXT    NOT NULL,
66    event_type     TEXT    NOT NULL,
67    sequence       BIGINT  NOT NULL,
68    payload        TEXT    NOT NULL,
69    occurred_at    TEXT    NOT NULL,
70    published      INTEGER NOT NULL DEFAULT 0,
71    published_at   TEXT
72)";
73
74    /// Wrap an existing pool, using the [`DEFAULT_TABLE`](Self::DEFAULT_TABLE)
75    /// table name.
76    pub fn new(pool: AnyPool) -> Self {
77        Self { pool, table: Self::DEFAULT_TABLE.to_owned() }
78    }
79
80    /// Borrow the underlying pool.
81    pub fn pool(&self) -> &AnyPool {
82        &self.pool
83    }
84
85    /// Create the outbox table if it does not exist.
86    ///
87    /// Uses the bundled [`CREATE_TABLE_SQL`](Self::CREATE_TABLE_SQL); safe to call
88    /// repeatedly. For non-default table names, run equivalent DDL yourself.
89    pub async fn ensure_schema(&self) -> Result<(), DataError> {
90        sqlx::query(Self::CREATE_TABLE_SQL).execute(&self.pool).await?;
91        Ok(())
92    }
93
94    /// Build the column list shared by every `SELECT`.
95    fn select_prefix(&self) -> String {
96        format!(
97            "SELECT id, aggregate_type, aggregate_id, event_type, sequence, \
98             payload, occurred_at, published, published_at FROM {}",
99            self.table
100        )
101    }
102
103    /// Claim up to `limit` unpublished rows on PostgreSQL using
104    /// `FOR UPDATE SKIP LOCKED`, so concurrent relay pollers receive disjoint
105    /// batches.
106    ///
107    /// This must run inside the caller's transaction for the row locks to hold
108    /// until commit; here it locks within an implicit single-statement
109    /// transaction, which is enough to demonstrate the claim semantics. Pair it
110    /// with [`mark_published`](Outbox::mark_published) before committing.
111    ///
112    /// Available only under the `postgres` feature, since `SKIP LOCKED` is not
113    /// portable across all backends.
114    #[cfg(feature = "postgres")]
115    pub async fn fetch_unpublished_skip_locked(
116        &self,
117        limit: usize,
118    ) -> Result<Vec<OutboxEntry>, DataError> {
119        let sql = format!(
120            "{prefix} WHERE published = 0 ORDER BY sequence ASC LIMIT {limit} FOR UPDATE SKIP LOCKED",
121            prefix = self.select_prefix(),
122            limit = limit as i64,
123        );
124        let rows = sqlx::query(sqlx::AssertSqlSafe(&*sql)).fetch_all(&self.pool).await?;
125        rows.iter().map(row_to_entry).collect()
126    }
127}
128
129/// Decode one `AnyRow` into an [`OutboxEntry`].
130fn row_to_entry(row: &sqlx::any::AnyRow) -> Result<OutboxEntry, DataError> {
131    let id_str: String = row.try_get("id")?;
132    let id: OutboxId = id_str
133        .parse()
134        .map_err(|e| DataError::Outbox(format!("invalid outbox id '{id_str}': {e}")))?;
135
136    let payload_str: String = row.try_get("payload")?;
137    let payload: serde_json::Value = serde_json::from_str(&payload_str)
138        .map_err(|e| DataError::Outbox(format!("invalid stored payload json: {e}")))?;
139
140    let occurred_at_str: String = row.try_get("occurred_at")?;
141    let occurred_at = parse_timestamp(&occurred_at_str)?;
142
143    let published_at_str: Option<String> = row.try_get("published_at")?;
144    let published_at = match published_at_str {
145        Some(s) => Some(parse_timestamp(&s)?),
146        None => None,
147    };
148
149    let sequence: i64 = row.try_get("sequence")?;
150    let published: i64 = row.try_get("published")?;
151
152    Ok(OutboxEntry {
153        id,
154        aggregate_type: row.try_get("aggregate_type")?,
155        aggregate_id: row.try_get("aggregate_id")?,
156        event_type: row.try_get("event_type")?,
157        sequence: sequence as u64,
158        payload,
159        occurred_at,
160        published: published != 0,
161        published_at,
162    })
163}
164
165/// Parse an RFC3339 string back into a [`Timestamp`] via its serde representation.
166fn parse_timestamp(s: &str) -> Result<Timestamp, DataError> {
167    serde_json::from_value(serde_json::Value::String(s.to_owned()))
168        .map_err(|e| DataError::Outbox(format!("invalid stored timestamp '{s}': {e}")))
169}
170
171#[async_trait]
172impl Outbox for SqlOutbox {
173    async fn enqueue(&self, entries: Vec<OutboxEntry>) -> Result<(), DataError> {
174        if entries.is_empty() {
175            return Ok(());
176        }
177
178        let sql = format!(
179            "INSERT INTO {} \
180             (id, aggregate_type, aggregate_id, event_type, sequence, payload, occurred_at, published, published_at) \
181             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
182            self.table
183        );
184
185        // One transaction so a partial batch never lands.
186        let mut tx = self.pool.begin().await?;
187        for entry in entries {
188            let payload = serde_json::to_string(&entry.payload).map_err(|e| {
189                DataError::Outbox(format!("failed to serialize outbox payload: {e}"))
190            })?;
191            let published_at = entry.published_at.map(|t| t.to_rfc3339());
192            sqlx::query(sqlx::AssertSqlSafe(&*sql))
193                .bind(entry.id.to_string())
194                .bind(entry.aggregate_type)
195                .bind(entry.aggregate_id)
196                .bind(entry.event_type)
197                .bind(entry.sequence as i64)
198                .bind(payload)
199                .bind(entry.occurred_at.to_rfc3339())
200                .bind(i64::from(entry.published))
201                .bind(published_at)
202                .execute(&mut *tx)
203                .await?;
204        }
205        tx.commit().await?;
206        Ok(())
207    }
208
209    async fn fetch_unpublished(&self, limit: usize) -> Result<Vec<OutboxEntry>, DataError> {
210        let sql = format!(
211            "{prefix} WHERE published = 0 ORDER BY sequence ASC LIMIT {limit}",
212            prefix = self.select_prefix(),
213            limit = limit as i64,
214        );
215        let rows = sqlx::query(sqlx::AssertSqlSafe(&*sql)).fetch_all(&self.pool).await?;
216        rows.iter().map(row_to_entry).collect()
217    }
218
219    async fn mark_published(&self, ids: &[OutboxId]) -> Result<(), DataError> {
220        if ids.is_empty() {
221            return Ok(());
222        }
223        let now = Timestamp::now().to_rfc3339();
224        let sql = format!(
225            "UPDATE {} SET published = 1, published_at = ? WHERE id = ? AND published = 0",
226            self.table
227        );
228        let mut tx = self.pool.begin().await?;
229        for id in ids {
230            sqlx::query(sqlx::AssertSqlSafe(&*sql))
231                .bind(now.clone())
232                .bind(id.to_string())
233                .execute(&mut *tx)
234                .await?;
235        }
236        tx.commit().await?;
237        Ok(())
238    }
239}
240
241#[cfg(all(test, feature = "sqlite"))]
242mod tests {
243    use super::*;
244    use klauthed_core::domain::{DomainEvent, EventEnvelope};
245    use klauthed_core::id::Id;
246    use serde::Serialize;
247    use std::borrow::Cow;
248
249    #[derive(Debug, Serialize)]
250    struct Opened {
251        owner: String,
252    }
253
254    impl DomainEvent for Opened {
255        fn event_type(&self) -> &'static str {
256            "account.opened"
257        }
258    }
259
260    fn entry(seq: u64) -> OutboxEntry {
261        let envelope = EventEnvelope {
262            event_id: Id::new(),
263            event_type: Cow::Borrowed("account.opened"),
264            aggregate_id: "acct-1".to_owned(),
265            aggregate_type: Cow::Borrowed("account"),
266            sequence: seq,
267            occurred_at: Timestamp::from_unix_millis(1_000 + seq as i64),
268            payload: Opened { owner: format!("owner-{seq}") },
269        };
270        OutboxEntry::from_envelope(&envelope).unwrap()
271    }
272
273    async fn memory_outbox() -> SqlOutbox {
274        sqlx::any::install_default_drivers();
275        // SQLite in-memory databases are connection-local: every new connection
276        // sees an empty database. Force max_connections(1) so all operations in
277        // the test share the same connection and therefore the same DB.
278        let pool = sqlx::pool::PoolOptions::new()
279            .max_connections(1)
280            .connect("sqlite::memory:")
281            .await
282            .expect("connect in-memory sqlite");
283        let outbox = SqlOutbox::new(pool);
284        outbox.ensure_schema().await.expect("ensure schema");
285        outbox
286    }
287
288    #[tokio::test]
289    async fn ensure_schema_is_idempotent() {
290        let outbox = memory_outbox().await;
291        // Second call must not error.
292        outbox.ensure_schema().await.unwrap();
293        assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
294    }
295
296    #[tokio::test]
297    async fn enqueue_fetch_mark_round_trip_over_any_sqlite() {
298        let outbox = memory_outbox().await;
299        let e1 = entry(1);
300        let e2 = entry(2);
301        let (id1, id2) = (e1.id, e2.id);
302
303        outbox.enqueue(vec![e1.clone(), e2.clone()]).await.unwrap();
304
305        let unpublished = outbox.fetch_unpublished(10).await.unwrap();
306        assert_eq!(unpublished.len(), 2);
307        // Ordered by sequence ascending; full fidelity round-trip on the first row.
308        assert_eq!(unpublished[0], e1);
309        assert_eq!(unpublished[1].id, id2);
310        assert_eq!(unpublished[0].payload["owner"], "owner-1");
311        assert!(!unpublished[0].published);
312
313        // Publish the first; it drops out of the next fetch.
314        outbox.mark_published(&[id1]).await.unwrap();
315        let remaining = outbox.fetch_unpublished(10).await.unwrap();
316        assert_eq!(remaining.len(), 1);
317        assert_eq!(remaining[0].id, id2);
318
319        outbox.mark_published(&[id2]).await.unwrap();
320        assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
321    }
322
323    #[tokio::test]
324    async fn fetch_honors_limit_and_sequence_order() {
325        let outbox = memory_outbox().await;
326        let entries: Vec<_> = (1..=5).map(entry).collect();
327        outbox.enqueue(entries).await.unwrap();
328
329        let two = outbox.fetch_unpublished(2).await.unwrap();
330        assert_eq!(two.len(), 2);
331        assert_eq!(two[0].sequence, 1);
332        assert_eq!(two[1].sequence, 2);
333
334        assert_eq!(outbox.fetch_unpublished(100).await.unwrap().len(), 5);
335    }
336
337    #[tokio::test]
338    async fn marking_published_stores_published_at() {
339        let outbox = memory_outbox().await;
340        let e = entry(1);
341        let id = e.id;
342        outbox.enqueue(vec![e]).await.unwrap();
343        outbox.mark_published(&[id]).await.unwrap();
344
345        // Re-marking a published row is a no-op (WHERE published = 0).
346        outbox.mark_published(&[id]).await.unwrap();
347        assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
348    }
349
350    #[tokio::test]
351    async fn empty_batches_are_noops() {
352        let outbox = memory_outbox().await;
353        outbox.enqueue(vec![]).await.unwrap();
354        outbox.mark_published(&[]).await.unwrap();
355        assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
356    }
357}