Skip to main content

hexeract_outbox_sql/
sqlite.rs

1//! SQLite backend for the Hexeract outbox.
2//!
3//! # Concurrency
4//!
5//! SQLite has no `FOR UPDATE SKIP LOCKED`, so this backend assumes a
6//! **single [`OutboxWorker`] per database**. Running several workers against
7//! the same SQLite database can dispatch an envelope more than once, because
8//! concurrent pollers may read the same pending rows before either marks them
9//! delivered. For competing-consumers fan-out across many workers, use the
10//! PostgreSQL or MySQL backend instead. Configuring `busy_timeout` on the pool
11//! is recommended so writes wait rather than fail under contention.
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use async_trait::async_trait;
19use hexeract_outbox::ErasedHandler;
20use hexeract_outbox::Event;
21use hexeract_outbox::Handler;
22use hexeract_outbox::OutboxEnvelope;
23use hexeract_outbox::OutboxError;
24use hexeract_outbox::OutboxPublisher;
25use hexeract_outbox::OutboxStore;
26use hexeract_outbox::OutboxWorker;
27use hexeract_outbox::OutboxWorkerConfig;
28use hexeract_outbox::TypedHandler;
29use sqlx::Acquire;
30use sqlx::Row;
31use sqlx::Sqlite;
32use sqlx::SqlitePool;
33use sqlx::Transaction;
34use sqlx::pool::PoolConnection;
35use uuid::Uuid;
36
37use crate::DEFAULT_TABLE_NAME;
38use crate::dialect::Dialect;
39use crate::envelope::assemble_envelope;
40use crate::envelope::format_sqlite_utc;
41use crate::envelope::parse_sqlite_utc;
42use crate::validate::validate_table_name;
43
44const DIALECT: Dialect = Dialect::Sqlite;
45
46fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
47    OutboxError::Database(Box::new(error))
48}
49
50/// Apply the canonical SQLite outbox schema to the target database.
51///
52/// **Intended for POCs, integration tests and local development.**
53/// Production deployments should run their own migration tooling against the
54/// SQL rendered by [`Dialect::schema_ddl`].
55///
56/// # Errors
57///
58/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
59/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
60pub async fn ensure_schema(pool: &SqlitePool, table_name: &str) -> Result<(), OutboxError> {
61    let ddl = DIALECT.schema_ddl(table_name)?;
62    sqlx::raw_sql(&ddl)
63        .execute(pool)
64        .await
65        .map_err(database_error)?;
66    Ok(())
67}
68
69/// SQLite implementation of [`OutboxStore`] backed by `sqlx::SqlitePool`.
70///
71/// See the [module documentation](self) for the single-worker concurrency model.
72/// Cheap to clone (the pool and the cached SQL strings are reference-counted).
73#[derive(Debug, Clone)]
74pub struct SqliteOutboxStore {
75    pool: SqlitePool,
76    table_name: Arc<str>,
77    poll_sql: Arc<str>,
78    mark_delivered_sql: Arc<str>,
79    mark_failed_sql: Arc<str>,
80}
81
82impl SqliteOutboxStore {
83    /// Build a store for the given pool and table.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
88    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
89    pub fn new(pool: SqlitePool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
90        let table_name = table_name.into();
91        validate_table_name(&table_name)?;
92        let poll_sql = DIALECT.poll_sql(&table_name);
93        let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
94        let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
95        Ok(Self {
96            pool,
97            table_name: Arc::from(table_name),
98            poll_sql: Arc::from(poll_sql),
99            mark_delivered_sql: Arc::from(mark_delivered_sql),
100            mark_failed_sql: Arc::from(mark_failed_sql),
101        })
102    }
103
104    /// Underlying pool.
105    #[must_use]
106    pub fn pool(&self) -> &SqlitePool {
107        &self.pool
108    }
109
110    /// Configured table name.
111    #[must_use]
112    pub fn table_name(&self) -> &str {
113        &self.table_name
114    }
115}
116
117#[async_trait]
118impl OutboxStore for SqliteOutboxStore {
119    type Client = PoolConnection<Sqlite>;
120    type Tx<'tx> = Transaction<'tx, Sqlite>;
121
122    async fn acquire(&self) -> Result<Self::Client, OutboxError> {
123        self.pool.acquire().await.map_err(database_error)
124    }
125
126    async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
127        client.begin().await.map_err(database_error)
128    }
129
130    async fn poll<'a>(
131        &self,
132        tx: &mut Self::Tx<'a>,
133        batch_size: usize,
134        max_attempts: u32,
135    ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
136        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
137        let max = i64::from(max_attempts);
138        let rows = sqlx::query(&self.poll_sql)
139            .bind(max)
140            .bind(limit)
141            .fetch_all(&mut **tx)
142            .await
143            .map_err(database_error)?;
144
145        let mut envelopes = Vec::with_capacity(rows.len());
146        for row in rows {
147            let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
148            let event_type: String = row.try_get("event_type").map_err(database_error)?;
149            let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
150            let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
151            let created_at: String = row.try_get("created_at").map_err(database_error)?;
152            let attempts: i64 = row.try_get("attempts").map_err(database_error)?;
153            let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
154            let next_retry_at: Option<String> =
155                row.try_get("next_retry_at").map_err(database_error)?;
156
157            let payload = serde_json::to_vec(&payload)?;
158            let next_retry_at = next_retry_at.as_deref().map(parse_sqlite_utc).transpose()?;
159
160            envelopes.push(assemble_envelope(
161                event_id,
162                event_type,
163                payload,
164                subject_id,
165                parse_sqlite_utc(&created_at)?,
166                u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
167                last_error,
168                next_retry_at,
169            ));
170        }
171        Ok(envelopes)
172    }
173
174    async fn mark_delivered<'a>(
175        &self,
176        tx: &mut Self::Tx<'a>,
177        event_id: Uuid,
178    ) -> Result<(), OutboxError> {
179        sqlx::query(&self.mark_delivered_sql)
180            .bind(event_id)
181            .execute(&mut **tx)
182            .await
183            .map_err(database_error)?;
184        Ok(())
185    }
186
187    async fn mark_failed<'a>(
188        &self,
189        tx: &mut Self::Tx<'a>,
190        event_id: Uuid,
191        error: &str,
192        next_retry_at: SystemTime,
193    ) -> Result<(), OutboxError> {
194        let next_retry_at = format_sqlite_utc(next_retry_at)?;
195        sqlx::query(&self.mark_failed_sql)
196            .bind(error)
197            .bind(next_retry_at)
198            .bind(event_id)
199            .execute(&mut **tx)
200            .await
201            .map_err(database_error)?;
202        Ok(())
203    }
204
205    async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
206        tx.commit().await.map_err(database_error)
207    }
208}
209
210/// SQLite implementation of [`OutboxPublisher`] backed by `sqlx::SqlitePool`.
211///
212/// Cheap to clone (the pool and the cached insert statement are reference-counted).
213#[derive(Debug, Clone)]
214pub struct SqliteOutboxPublisher {
215    pool: SqlitePool,
216    table_name: Arc<str>,
217    insert_sql: Arc<str>,
218}
219
220impl SqliteOutboxPublisher {
221    /// Create a new publisher for the given pool and table.
222    ///
223    /// # Errors
224    ///
225    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
226    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
227    pub fn new(pool: SqlitePool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
228        let table_name = table_name.into();
229        validate_table_name(&table_name)?;
230        let insert_sql = DIALECT.insert_sql(&table_name);
231        Ok(Self {
232            pool,
233            table_name: Arc::from(table_name),
234            insert_sql: Arc::from(insert_sql),
235        })
236    }
237
238    /// Underlying pool, exposed for callers that open their own transactions.
239    #[must_use]
240    pub fn pool(&self) -> &SqlitePool {
241        &self.pool
242    }
243
244    /// Configured table name.
245    #[must_use]
246    pub fn table_name(&self) -> &str {
247        &self.table_name
248    }
249}
250
251impl OutboxPublisher for SqliteOutboxPublisher {
252    type Tx<'tx> = Transaction<'tx, Sqlite>;
253
254    async fn publish_in_tx<E: Event>(
255        &self,
256        tx: &mut Self::Tx<'_>,
257        event: &E,
258    ) -> Result<Uuid, OutboxError> {
259        let event_id = Uuid::now_v7();
260        let payload = serde_json::to_value(event)?;
261        sqlx::query(&self.insert_sql)
262            .bind(event_id)
263            .bind(E::EVENT_TYPE)
264            .bind(payload)
265            .bind(Option::<Uuid>::None)
266            .execute(&mut **tx)
267            .await
268            .map_err(database_error)?;
269        Ok(event_id)
270    }
271
272    async fn publish_in_tx_with_subject<E: Event>(
273        &self,
274        tx: &mut Self::Tx<'_>,
275        subject_id: Uuid,
276        event: &E,
277    ) -> Result<Uuid, OutboxError> {
278        let event_id = Uuid::now_v7();
279        let payload = serde_json::to_value(event)?;
280        sqlx::query(&self.insert_sql)
281            .bind(event_id)
282            .bind(E::EVENT_TYPE)
283            .bind(payload)
284            .bind(Some(subject_id))
285            .execute(&mut **tx)
286            .await
287            .map_err(database_error)?;
288        Ok(event_id)
289    }
290
291    async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
292        let mut tx = self.pool.begin().await.map_err(database_error)?;
293        let event_id = self.publish_in_tx(&mut tx, event).await?;
294        tx.commit().await.map_err(database_error)?;
295        Ok(event_id)
296    }
297}
298
299/// Fluent builder for an [`OutboxWorker`] backed by [`SqliteOutboxStore`].
300///
301/// See the [module documentation](self) for the single-worker concurrency model.
302pub struct SqliteOutboxWorkerBuilder {
303    pool: SqlitePool,
304    table_name: String,
305    handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
306    config: OutboxWorkerConfig,
307}
308
309impl SqliteOutboxWorkerBuilder {
310    /// Start a new builder for the given pool.
311    #[must_use]
312    pub fn new(pool: SqlitePool) -> Self {
313        Self {
314            pool,
315            table_name: DEFAULT_TABLE_NAME.to_owned(),
316            handlers: HashMap::new(),
317            config: OutboxWorkerConfig::default(),
318        }
319    }
320
321    /// Override the outbox table name (default `"audit_outbox"`).
322    #[must_use]
323    pub fn table_name(mut self, name: impl Into<String>) -> Self {
324        self.table_name = name.into();
325        self
326    }
327
328    /// Register a typed handler for the event type `E`.
329    ///
330    /// Registering twice for the same event type silently replaces the
331    /// previous handler.
332    #[must_use]
333    pub fn register_handler<E, H>(mut self, handler: H) -> Self
334    where
335        E: Event,
336        H: Handler<E>,
337    {
338        let typed = TypedHandler::<E, H>::new(handler);
339        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
340        self.handlers.insert(E::EVENT_TYPE, erased);
341        self
342    }
343
344    /// Register a handler already shared behind an `Arc`.
345    #[must_use]
346    pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
347    where
348        E: Event,
349        H: Handler<E>,
350    {
351        let typed = TypedHandler::<E, H>::shared(handler);
352        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
353        self.handlers.insert(E::EVENT_TYPE, erased);
354        self
355    }
356
357    /// Override the poll interval (default 100 ms).
358    #[must_use]
359    pub fn poll_interval(mut self, d: Duration) -> Self {
360        self.config.poll_interval = d;
361        self
362    }
363
364    /// Override the batch size per poll (default 10).
365    #[must_use]
366    pub fn batch_size(mut self, n: usize) -> Self {
367        self.config.batch_size = n;
368        self
369    }
370
371    /// Override the maximum number of attempts per envelope (default 5).
372    #[must_use]
373    pub fn max_attempts(mut self, n: u32) -> Self {
374        self.config.max_attempts = n;
375        self
376    }
377
378    /// Override the constant retry delay between failed attempts (default 5 s).
379    #[must_use]
380    pub fn retry_delay(mut self, d: Duration) -> Self {
381        self.config.retry_delay = d;
382        self
383    }
384
385    /// Consume the builder and produce an [`OutboxWorker`] ready to spawn.
386    ///
387    /// # Errors
388    ///
389    /// Returns [`OutboxError::Internal`] if the configured `table_name`
390    /// is not a valid identifier.
391    pub fn build(self) -> Result<OutboxWorker<SqliteOutboxStore>, OutboxError> {
392        let store = SqliteOutboxStore::new(self.pool, self.table_name)?;
393        Ok(OutboxWorker::new(store, self.handlers, self.config))
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use hexeract_core::HandlerContext;
401    use serde::Deserialize;
402    use serde::Serialize;
403
404    fn lazy_pool() -> SqlitePool {
405        SqlitePool::connect_lazy("sqlite::memory:").expect("lazy pool must build from a valid URL")
406    }
407
408    #[derive(Debug, Serialize, Deserialize)]
409    struct UserRegistered {
410        user_id: Uuid,
411    }
412
413    impl Event for UserRegistered {
414        const EVENT_TYPE: &'static str = "users.registered";
415    }
416
417    #[derive(Debug, Serialize, Deserialize)]
418    struct OrderPlaced {
419        order_id: Uuid,
420    }
421
422    impl Event for OrderPlaced {
423        const EVENT_TYPE: &'static str = "orders.placed";
424    }
425
426    struct NoopHandler;
427
428    impl Handler<UserRegistered> for NoopHandler {
429        type Error = OutboxError;
430        async fn handle(
431            &self,
432            _event: UserRegistered,
433            _ctx: &HandlerContext,
434        ) -> Result<(), Self::Error> {
435            Ok(())
436        }
437    }
438
439    impl Handler<OrderPlaced> for NoopHandler {
440        type Error = OutboxError;
441        async fn handle(
442            &self,
443            _event: OrderPlaced,
444            _ctx: &HandlerContext,
445        ) -> Result<(), Self::Error> {
446            Ok(())
447        }
448    }
449
450    #[tokio::test]
451    async fn store_new_rejects_invalid_table_name() {
452        let err = SqliteOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
453        assert!(matches!(err, OutboxError::Internal(_)));
454    }
455
456    #[tokio::test]
457    async fn store_new_caches_sqlite_sql_without_skip_locked() {
458        let store = SqliteOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
459        assert_eq!(store.table_name(), "audit_outbox");
460        assert!(store.poll_sql.contains("FROM audit_outbox"));
461        assert!(!store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
462        assert!(store.poll_sql.contains("strftime"));
463        assert!(store.mark_failed_sql.contains("attempts = attempts + 1"));
464    }
465
466    #[tokio::test]
467    async fn publisher_new_caches_insert_sql_with_question_marks() {
468        let publisher = SqliteOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
469        assert_eq!(publisher.table_name(), "audit_outbox");
470        assert!(publisher.insert_sql.contains("INSERT INTO audit_outbox"));
471        assert!(publisher.insert_sql.contains("?, ?, ?, ?"));
472    }
473
474    #[tokio::test]
475    async fn builder_register_handler_records_event_types() {
476        let builder = SqliteOutboxWorkerBuilder::new(lazy_pool())
477            .register_handler::<UserRegistered, _>(NoopHandler)
478            .register_handler::<OrderPlaced, _>(NoopHandler);
479        assert_eq!(builder.handlers.len(), 2);
480        assert!(builder.handlers.contains_key("users.registered"));
481        assert!(builder.handlers.contains_key("orders.placed"));
482    }
483
484    #[tokio::test]
485    async fn builder_build_rejects_invalid_table_name() {
486        let result = SqliteOutboxWorkerBuilder::new(lazy_pool())
487            .table_name("bad name; DROP TABLE")
488            .build();
489        assert!(matches!(result, Err(OutboxError::Internal(_))));
490    }
491
492    #[tokio::test]
493    async fn builder_build_with_default_table_name_succeeds() {
494        let worker = SqliteOutboxWorkerBuilder::new(lazy_pool()).build();
495        assert!(worker.is_ok());
496    }
497}