Skip to main content

hexeract_outbox_sql/
sqlite.rs

1//! SQLite backend for the Hexeract outbox.
2//!
3//! # Concurrency
4//!
5//! SQLite has no `FOR UPDATE SKIP LOCKED`, so this backend assumes a
6//! **single [`OutboxWorker`] per database**. Running several workers against
7//! the same SQLite database can dispatch an envelope more than once, because
8//! concurrent pollers may read the same pending rows before either marks them
9//! delivered. For competing-consumers fan-out across many workers, use the
10//! PostgreSQL or MySQL backend instead. Configuring `busy_timeout` on the pool
11//! is recommended so writes wait rather than fail under contention.
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use hexeract_outbox::ErasedHandler;
19use hexeract_outbox::Event;
20use hexeract_outbox::Handler;
21use hexeract_outbox::OutboxEnvelope;
22use hexeract_outbox::OutboxError;
23use hexeract_outbox::OutboxPublisher;
24use hexeract_outbox::OutboxStore;
25use hexeract_outbox::OutboxWorker;
26use hexeract_outbox::OutboxWorkerConfig;
27use hexeract_outbox::TypedHandler;
28use sqlx::Acquire;
29use sqlx::Row;
30use sqlx::Sqlite;
31use sqlx::SqlitePool;
32use sqlx::Transaction;
33use sqlx::pool::PoolConnection;
34use uuid::Uuid;
35
36use crate::DEFAULT_TABLE_NAME;
37use crate::dialect::Dialect;
38use crate::envelope::assemble_envelope;
39use crate::envelope::parse_sqlite_utc;
40use crate::validate::validate_event_type;
41use crate::validate::validate_table_name;
42
43const DIALECT: Dialect = Dialect::Sqlite;
44
45/// Maximum interval in seconds that can be safely passed to SQLite's strftime
46/// modifier (`"+N seconds"`).
47///
48/// SQLite's strftime modifier must parse to a finite value. Durations near
49/// [`Duration::MAX`] produce `"+inf seconds"` via [`f64::INFINITY`], which
50/// SQLite ignores silently, leaving `next_retry_at` as `NULL`. Capping at
51/// this value (roughly 292 years) keeps the result well within SQLite's
52/// datetime range while being far beyond any practical retry interval.
53const MAX_SQLITE_INTERVAL_SECS: u64 = 9_223_372_036; // i64::MAX seconds
54
55/// Render a backoff/lease [`Duration`] as a SQLite `strftime` modifier, e.g.
56/// `"+1.500 seconds"`, so `next_retry_at` is computed from the database clock.
57///
58/// The duration is capped at [`MAX_SQLITE_INTERVAL_SECS`] before conversion
59/// so that pathologically large values do not produce an `"+inf seconds"`
60/// modifier that SQLite would silently ignore.
61fn sqlite_seconds_modifier(d: Duration) -> String {
62    let capped = d.min(Duration::from_secs(MAX_SQLITE_INTERVAL_SECS));
63    format!("+{:.3} seconds", capped.as_secs_f64())
64}
65
66fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
67    OutboxError::Database(Box::new(error))
68}
69
70fn pool_error(error: sqlx::Error) -> OutboxError {
71    if matches!(error, sqlx::Error::PoolTimedOut) {
72        OutboxError::PoolTimeout
73    } else {
74        OutboxError::Database(Box::new(error))
75    }
76}
77
78/// Decode one polled row into an [`OutboxEnvelope`].
79///
80/// Kept separate from the poll loop so a decode failure (notably a timestamp
81/// that does not match either accepted SQLite layout) can be isolated to the
82/// offending row (logged and skipped) instead of aborting the whole batch.
83fn decode_sqlite_row(row: &sqlx::sqlite::SqliteRow) -> Result<OutboxEnvelope, OutboxError> {
84    let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
85    let event_type: String = row.try_get("event_type").map_err(database_error)?;
86    let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
87    let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
88    let created_at: String = row.try_get("created_at").map_err(database_error)?;
89    let attempts: i64 = row.try_get("attempts").map_err(database_error)?;
90    let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
91    let next_retry_at: Option<String> = row.try_get("next_retry_at").map_err(database_error)?;
92
93    let payload = serde_json::to_vec(&payload)?;
94    let next_retry_at = next_retry_at.as_deref().map(parse_sqlite_utc).transpose()?;
95
96    Ok(assemble_envelope(
97        event_id,
98        event_type,
99        payload,
100        subject_id,
101        parse_sqlite_utc(&created_at)?,
102        u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
103        last_error,
104        next_retry_at,
105    ))
106}
107
108#[derive(Debug, Clone)]
109struct DeadLetterSql {
110    insert_sql: Arc<str>,
111    delete_sql: Arc<str>,
112}
113
114/// Apply the canonical SQLite outbox schema to the target database.
115///
116/// **Intended for POCs, integration tests and local development.**
117/// Production deployments should run their own migration tooling against the
118/// SQL rendered by [`Dialect::schema_ddl`].
119///
120/// # Errors
121///
122/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
123/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
124pub async fn ensure_schema(pool: &SqlitePool, table_name: &str) -> Result<(), OutboxError> {
125    let ddl = DIALECT.schema_ddl(table_name)?;
126    sqlx::raw_sql(&ddl)
127        .execute(pool)
128        .await
129        .map_err(database_error)?;
130    Ok(())
131}
132
133/// SQLite implementation of [`OutboxStore`] backed by `sqlx::SqlitePool`.
134///
135/// See the [module documentation](self) for the single-worker concurrency model.
136/// Cheap to clone (the pool and the cached SQL strings are reference-counted).
137#[derive(Debug, Clone)]
138pub struct SqliteOutboxStore {
139    pool: SqlitePool,
140    table_name: Arc<str>,
141    poll_sql: Arc<str>,
142    mark_delivered_sql: Arc<str>,
143    mark_failed_sql: Arc<str>,
144    dead_letter: Option<Arc<DeadLetterSql>>,
145}
146
147impl SqliteOutboxStore {
148    /// Build a store for the given pool and table.
149    ///
150    /// # Errors
151    ///
152    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
153    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
154    pub fn new(pool: SqlitePool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
155        let table_name = table_name.into();
156        validate_table_name(&table_name)?;
157        let poll_sql = DIALECT.poll_sql(&table_name);
158        let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
159        let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
160        Ok(Self {
161            pool,
162            table_name: Arc::from(table_name),
163            poll_sql: Arc::from(poll_sql),
164            mark_delivered_sql: Arc::from(mark_delivered_sql),
165            mark_failed_sql: Arc::from(mark_failed_sql),
166            dead_letter: None,
167        })
168    }
169
170    /// Underlying pool.
171    #[must_use]
172    pub fn pool(&self) -> &SqlitePool {
173        &self.pool
174    }
175
176    /// Configured table name.
177    #[must_use]
178    pub fn table_name(&self) -> &str {
179        &self.table_name
180    }
181
182    /// Activate dead-letter persistence for poison messages.
183    ///
184    /// # Errors
185    ///
186    /// Returns [`OutboxError::Internal`] if `dlq_table` is not a valid identifier.
187    pub fn with_dead_letter(mut self, dlq_table: impl Into<String>) -> Result<Self, OutboxError> {
188        let dlq = dlq_table.into();
189        validate_table_name(&dlq)?;
190        let insert_sql = DIALECT.insert_dead_letter_sql(&self.table_name, &dlq);
191        let delete_sql = DIALECT.delete_from_main_sql(&self.table_name);
192        self.dead_letter = Some(Arc::new(DeadLetterSql {
193            insert_sql: Arc::from(insert_sql),
194            delete_sql: Arc::from(delete_sql),
195        }));
196        Ok(self)
197    }
198}
199
200#[async_trait]
201impl OutboxStore for SqliteOutboxStore {
202    type Client = PoolConnection<Sqlite>;
203    type Tx<'tx> = Transaction<'tx, Sqlite>;
204
205    async fn acquire(&self) -> Result<Self::Client, OutboxError> {
206        self.pool.acquire().await.map_err(pool_error)
207    }
208
209    async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
210        client.begin().await.map_err(database_error)
211    }
212
213    async fn poll<'a>(
214        &self,
215        tx: &mut Self::Tx<'a>,
216        batch_size: usize,
217        max_attempts: u32,
218    ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
219        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
220        let max = i64::from(max_attempts);
221        let rows = sqlx::query(&self.poll_sql)
222            .bind(max)
223            .bind(limit)
224            .fetch_all(&mut **tx)
225            .await
226            .map_err(database_error)?;
227
228        let mut envelopes = Vec::with_capacity(rows.len());
229        for row in rows {
230            // A single undecodable row (e.g. an externally written timestamp in
231            // an unexpected layout) must not abort the whole poll: that
232            // head-of-line poisons the queue forever (#214). Log it and skip so
233            // the rest of the batch keeps draining.
234            match decode_sqlite_row(&row) {
235                Ok(envelope) => envelopes.push(envelope),
236                Err(error) => {
237                    let event_id = row.try_get::<Uuid, _>("event_id").ok();
238                    tracing::error!(
239                        ?event_id,
240                        error = %error,
241                        "skipping undecodable outbox row; the rest of the batch continues"
242                    );
243                }
244            }
245        }
246        Ok(envelopes)
247    }
248
249    async fn mark_delivered<'a>(
250        &self,
251        tx: &mut Self::Tx<'a>,
252        event_id: Uuid,
253    ) -> Result<(), OutboxError> {
254        sqlx::query(&self.mark_delivered_sql)
255            .bind(event_id)
256            .execute(&mut **tx)
257            .await
258            .map_err(database_error)?;
259        Ok(())
260    }
261
262    async fn mark_failed<'a>(
263        &self,
264        tx: &mut Self::Tx<'a>,
265        event_id: Uuid,
266        error: &str,
267        retry_in: Duration,
268    ) -> Result<(), OutboxError> {
269        // next_retry_at is computed as strftime('now', ?modifier) against the
270        // DB clock (#230); bind the backoff as a strftime seconds modifier.
271        sqlx::query(&self.mark_failed_sql)
272            .bind(error)
273            .bind(sqlite_seconds_modifier(retry_in))
274            .bind(event_id)
275            .execute(&mut **tx)
276            .await
277            .map_err(database_error)?;
278        Ok(())
279    }
280
281    async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
282        tx.commit().await.map_err(database_error)
283    }
284
285    async fn mark_dead_lettered<'a>(
286        &self,
287        tx: &mut Self::Tx<'a>,
288        event_id: Uuid,
289        _error: &str,
290    ) -> Result<(), OutboxError> {
291        let Some(dlq) = &self.dead_letter else {
292            return Ok(());
293        };
294        sqlx::query(&dlq.insert_sql)
295            .bind(event_id)
296            .execute(&mut **tx)
297            .await
298            .map_err(database_error)?;
299        sqlx::query(&dlq.delete_sql)
300            .bind(event_id)
301            .execute(&mut **tx)
302            .await
303            .map_err(database_error)?;
304        Ok(())
305    }
306
307    /// Set the soft lease and consume one retry slot on the claimed batch.
308    ///
309    /// SQLite has no `FOR UPDATE SKIP LOCKED`, so this does **not** provide a
310    /// competing-consumer claim: the store remains single-writer (see the
311    /// [module documentation](self)). The override exists so that `attempts`
312    /// is incremented at claim time on SQLite too. Without it, a worker that
313    /// crashed between claim and acknowledgement would never advance
314    /// `attempts` and would redeliver a poison row forever (#213).
315    async fn claim<'a>(
316        &self,
317        tx: &mut Self::Tx<'a>,
318        event_ids: &[Uuid],
319        lease_for: Duration,
320    ) -> Result<(), OutboxError> {
321        if event_ids.is_empty() {
322            return Ok(());
323        }
324        // Lease anchored to the DB clock via strftime('now', ?modifier) (#230).
325        let sql = DIALECT.claim_sql(&self.table_name, event_ids.len());
326        let mut query = sqlx::query(&sql).bind(sqlite_seconds_modifier(lease_for));
327        for id in event_ids {
328            query = query.bind(*id);
329        }
330        query.execute(&mut **tx).await.map_err(database_error)?;
331        Ok(())
332    }
333}
334
335/// SQLite implementation of [`OutboxPublisher`] backed by `sqlx::SqlitePool`.
336///
337/// Cheap to clone (the pool and the cached insert statement are reference-counted).
338#[derive(Debug, Clone)]
339pub struct SqliteOutboxPublisher {
340    pool: SqlitePool,
341    table_name: Arc<str>,
342    insert_sql: Arc<str>,
343}
344
345impl SqliteOutboxPublisher {
346    /// Create a new publisher for the given pool and table.
347    ///
348    /// # Errors
349    ///
350    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
351    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
352    pub fn new(pool: SqlitePool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
353        let table_name = table_name.into();
354        validate_table_name(&table_name)?;
355        let insert_sql = DIALECT.insert_sql(&table_name);
356        Ok(Self {
357            pool,
358            table_name: Arc::from(table_name),
359            insert_sql: Arc::from(insert_sql),
360        })
361    }
362
363    /// Underlying pool, exposed for callers that open their own transactions.
364    #[must_use]
365    pub fn pool(&self) -> &SqlitePool {
366        &self.pool
367    }
368
369    /// Configured table name.
370    #[must_use]
371    pub fn table_name(&self) -> &str {
372        &self.table_name
373    }
374}
375
376impl OutboxPublisher for SqliteOutboxPublisher {
377    type Tx<'tx> = Transaction<'tx, Sqlite>;
378
379    async fn publish_in_tx<E: Event>(
380        &self,
381        tx: &mut Self::Tx<'_>,
382        event: &E,
383    ) -> Result<Uuid, OutboxError> {
384        validate_event_type(E::EVENT_TYPE)?;
385        let event_id = Uuid::now_v7();
386        let payload = serde_json::to_value(event)?;
387        sqlx::query(&self.insert_sql)
388            .bind(event_id)
389            .bind(E::EVENT_TYPE)
390            .bind(payload)
391            .bind(Option::<Uuid>::None)
392            .execute(&mut **tx)
393            .await
394            .map_err(database_error)?;
395        Ok(event_id)
396    }
397
398    async fn publish_in_tx_with_subject<E: Event>(
399        &self,
400        tx: &mut Self::Tx<'_>,
401        subject_id: Uuid,
402        event: &E,
403    ) -> Result<Uuid, OutboxError> {
404        validate_event_type(E::EVENT_TYPE)?;
405        let event_id = Uuid::now_v7();
406        let payload = serde_json::to_value(event)?;
407        sqlx::query(&self.insert_sql)
408            .bind(event_id)
409            .bind(E::EVENT_TYPE)
410            .bind(payload)
411            .bind(Some(subject_id))
412            .execute(&mut **tx)
413            .await
414            .map_err(database_error)?;
415        Ok(event_id)
416    }
417
418    async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
419        let mut tx = self.pool.begin().await.map_err(database_error)?;
420        let event_id = self.publish_in_tx(&mut tx, event).await?;
421        tx.commit().await.map_err(database_error)?;
422        Ok(event_id)
423    }
424}
425
426/// Fluent builder for an [`OutboxWorker`] backed by [`SqliteOutboxStore`].
427///
428/// See the [module documentation](self) for the single-worker concurrency model.
429///
430/// # Pool sizing and acquire timeout
431///
432/// SQLite uses a single-writer model. A pool size of 1–2 connections is
433/// typical: the worker holds one connection per poll cycle while publishers
434/// take the other. To prevent a stuck writer from blocking `acquire()`
435/// indefinitely, set an acquire timeout on the pool:
436///
437/// ```rust,ignore
438/// use sqlx::sqlite::SqlitePoolOptions;
439/// use std::time::Duration;
440///
441/// let pool = SqlitePoolOptions::new()
442///     .max_connections(2)
443///     // surface PoolTimeout instead of blocking indefinitely
444///     .acquire_timeout(Duration::from_secs(5))
445///     .connect("sqlite:outbox.db")
446///     .await?;
447///
448/// let worker = SqliteOutboxWorkerBuilder::new(pool).build()?;
449/// ```
450///
451/// When `acquire_timeout` expires, [`OutboxStore::acquire`] returns
452/// [`OutboxError::PoolTimeout`] instead of hanging. The worker logs the
453/// error and retries after [`OutboxWorkerConfig::poll_interval`].
454///
455/// [`OutboxError::PoolTimeout`]: hexeract_outbox::OutboxError::PoolTimeout
456/// [`OutboxWorkerConfig::poll_interval`]: hexeract_outbox::OutboxWorkerConfig::poll_interval
457pub struct SqliteOutboxWorkerBuilder {
458    pool: SqlitePool,
459    table_name: String,
460    dead_letter_table: Option<String>,
461    handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
462    config: OutboxWorkerConfig,
463}
464
465impl SqliteOutboxWorkerBuilder {
466    /// Start a new builder for the given pool.
467    #[must_use]
468    pub fn new(pool: SqlitePool) -> Self {
469        Self {
470            pool,
471            table_name: DEFAULT_TABLE_NAME.to_owned(),
472            dead_letter_table: None,
473            handlers: HashMap::new(),
474            config: OutboxWorkerConfig::default(),
475        }
476    }
477
478    /// Override the outbox table name (default `"audit_outbox"`).
479    #[must_use]
480    pub fn table_name(mut self, name: impl Into<String>) -> Self {
481        self.table_name = name.into();
482        self
483    }
484
485    /// Enable dead-letter persistence for poison messages.
486    #[must_use]
487    pub fn dead_letter_table(mut self, name: impl Into<String>) -> Self {
488        self.dead_letter_table = Some(name.into());
489        self
490    }
491
492    /// Register a typed handler for the event type `E`.
493    ///
494    /// Registering twice for the same event type silently replaces the
495    /// previous handler.
496    #[must_use]
497    pub fn register_handler<E, H>(mut self, handler: H) -> Self
498    where
499        E: Event,
500        H: Handler<E>,
501    {
502        let typed = TypedHandler::<E, H>::new(handler);
503        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
504        self.handlers.insert(E::EVENT_TYPE, erased);
505        self
506    }
507
508    /// Register a handler already shared behind an `Arc`.
509    #[must_use]
510    pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
511    where
512        E: Event,
513        H: Handler<E>,
514    {
515        let typed = TypedHandler::<E, H>::shared(handler);
516        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
517        self.handlers.insert(E::EVENT_TYPE, erased);
518        self
519    }
520
521    /// Override the poll interval (default 100 ms).
522    #[must_use]
523    pub fn poll_interval(mut self, d: Duration) -> Self {
524        self.config.poll_interval = d;
525        self
526    }
527
528    /// Override the batch size per poll (default 10).
529    #[must_use]
530    pub fn batch_size(mut self, n: usize) -> Self {
531        self.config.batch_size = n;
532        self
533    }
534
535    /// Override the maximum number of attempts per envelope (default 5).
536    #[must_use]
537    pub fn max_attempts(mut self, n: u32) -> Self {
538        self.config.max_attempts = n;
539        self
540    }
541
542    /// Override the base delay for exponential backoff (default 1 s).
543    #[must_use]
544    pub fn retry_base_delay(mut self, d: Duration) -> Self {
545        self.config.retry_base_delay = d;
546        self
547    }
548
549    /// Override the maximum backoff delay (default 5 min).
550    #[must_use]
551    pub fn retry_max_delay(mut self, d: Duration) -> Self {
552        self.config.retry_max_delay = d;
553        self
554    }
555
556    /// Enable or disable full jitter on the backoff delay (default `true`).
557    #[must_use]
558    pub fn jitter(mut self, enabled: bool) -> Self {
559        self.config.jitter = enabled;
560        self
561    }
562
563    /// Override the soft-lease duration for claimed envelopes (default 30 s).
564    ///
565    /// SQLite has no `FOR UPDATE SKIP LOCKED` and therefore no
566    /// competing-consumer claim, so this store is meant for a **single
567    /// worker** (see the [module documentation](self)); running several workers
568    /// against one database can still double-dispatch. The lease is recorded on
569    /// claim alongside the attempt increment, but with a single writer it only
570    /// affects when a crashed-and-restarted worker re-picks an in-flight row.
571    #[must_use]
572    pub fn dispatch_timeout(mut self, d: Duration) -> Self {
573        self.config.dispatch_timeout = d;
574        self
575    }
576
577    /// Consume the builder and produce an [`OutboxWorker`] ready to spawn.
578    ///
579    /// # Errors
580    ///
581    /// Returns [`OutboxError::Internal`] if the configured `table_name`
582    /// is not a valid identifier.
583    pub fn build(self) -> Result<OutboxWorker<SqliteOutboxStore>, OutboxError> {
584        let mut store = SqliteOutboxStore::new(self.pool, self.table_name)?;
585        if let Some(dlq) = self.dead_letter_table {
586            store = store.with_dead_letter(dlq)?;
587        }
588        Ok(OutboxWorker::new(store, self.handlers, self.config))
589    }
590}
591
592/// Apply the dead-letter schema to the target SQLite database.
593///
594/// **Intended for POCs, integration tests and local development.**
595///
596/// # Errors
597///
598/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
599/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
600pub async fn ensure_dead_letter_schema(
601    pool: &SqlitePool,
602    table_name: &str,
603) -> Result<(), OutboxError> {
604    let ddl = DIALECT.dead_letter_schema_ddl(table_name)?;
605    sqlx::raw_sql(&ddl)
606        .execute(pool)
607        .await
608        .map_err(database_error)?;
609    Ok(())
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use hexeract_core::HandlerContext;
616    use serde::Deserialize;
617    use serde::Serialize;
618
619    fn lazy_pool() -> SqlitePool {
620        SqlitePool::connect_lazy("sqlite::memory:").expect("lazy pool must build from a valid URL")
621    }
622
623    #[derive(Debug, Serialize, Deserialize)]
624    struct UserRegistered {
625        user_id: Uuid,
626    }
627
628    impl Event for UserRegistered {
629        const EVENT_TYPE: &'static str = "users.registered";
630    }
631
632    #[derive(Debug, Serialize, Deserialize)]
633    struct OrderPlaced {
634        order_id: Uuid,
635    }
636
637    impl Event for OrderPlaced {
638        const EVENT_TYPE: &'static str = "orders.placed";
639    }
640
641    struct NoopHandler;
642
643    impl Handler<UserRegistered> for NoopHandler {
644        type Error = OutboxError;
645        async fn handle(
646            &self,
647            _event: UserRegistered,
648            _ctx: &HandlerContext,
649        ) -> Result<(), Self::Error> {
650            Ok(())
651        }
652    }
653
654    impl Handler<OrderPlaced> for NoopHandler {
655        type Error = OutboxError;
656        async fn handle(
657            &self,
658            _event: OrderPlaced,
659            _ctx: &HandlerContext,
660        ) -> Result<(), Self::Error> {
661            Ok(())
662        }
663    }
664
665    #[test]
666    fn pool_error_maps_pool_timed_out_to_pool_timeout_variant() {
667        let err = pool_error(sqlx::Error::PoolTimedOut);
668        assert!(
669            matches!(err, OutboxError::PoolTimeout),
670            "PoolTimedOut must map to OutboxError::PoolTimeout, got {err:?}"
671        );
672    }
673
674    #[test]
675    fn pool_error_wraps_other_errors_as_database_error() {
676        let err = pool_error(sqlx::Error::RowNotFound);
677        assert!(
678            matches!(err, OutboxError::Database(_)),
679            "non-timeout errors must map to OutboxError::Database, got {err:?}"
680        );
681    }
682
683    #[tokio::test]
684    async fn store_new_rejects_invalid_table_name() {
685        let err = SqliteOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
686        assert!(matches!(err, OutboxError::Internal(_)));
687    }
688
689    #[tokio::test]
690    async fn store_new_caches_sqlite_sql_without_skip_locked() {
691        let store = SqliteOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
692        assert_eq!(store.table_name(), "audit_outbox");
693        assert!(store.poll_sql.contains("FROM \"audit_outbox\""));
694        assert!(!store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
695        assert!(store.poll_sql.contains("strftime"));
696        // The attempt increment lives in claim_sql now (see #213), not in
697        // mark_failed.
698        assert!(!store.mark_failed_sql.contains("attempts = attempts + 1"));
699    }
700
701    #[tokio::test]
702    async fn publisher_new_caches_insert_sql_with_question_marks() {
703        let publisher = SqliteOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
704        assert_eq!(publisher.table_name(), "audit_outbox");
705        assert!(
706            publisher
707                .insert_sql
708                .contains("INSERT INTO \"audit_outbox\"")
709        );
710        assert!(publisher.insert_sql.contains("?, ?, ?, ?"));
711    }
712
713    #[test]
714    fn sqlite_seconds_modifier_caps_huge_duration() {
715        // Duration::MAX produces inf via as_secs_f64(); capping prevents an
716        // "+inf seconds" modifier that SQLite would silently ignore (#240).
717        let modifier = sqlite_seconds_modifier(Duration::MAX);
718        assert!(
719            !modifier.contains("inf"),
720            "Duration::MAX must not produce an inf modifier, got: {modifier}"
721        );
722        assert!(modifier.starts_with('+'), "modifier must start with '+'");
723        assert!(
724            modifier.ends_with(" seconds"),
725            "modifier must end with ' seconds'"
726        );
727    }
728
729    #[test]
730    fn sqlite_seconds_modifier_preserves_ordinary_values() {
731        let modifier = sqlite_seconds_modifier(Duration::from_millis(1_500));
732        assert_eq!(modifier, "+1.500 seconds");
733    }
734
735    #[tokio::test]
736    async fn builder_register_handler_records_event_types() {
737        let builder = SqliteOutboxWorkerBuilder::new(lazy_pool())
738            .register_handler::<UserRegistered, _>(NoopHandler)
739            .register_handler::<OrderPlaced, _>(NoopHandler);
740        assert_eq!(builder.handlers.len(), 2);
741        assert!(builder.handlers.contains_key("users.registered"));
742        assert!(builder.handlers.contains_key("orders.placed"));
743    }
744
745    #[tokio::test]
746    async fn builder_build_rejects_invalid_table_name() {
747        let result = SqliteOutboxWorkerBuilder::new(lazy_pool())
748            .table_name("bad name; DROP TABLE")
749            .build();
750        assert!(matches!(result, Err(OutboxError::Internal(_))));
751    }
752
753    #[tokio::test]
754    async fn builder_build_with_default_table_name_succeeds() {
755        let worker = SqliteOutboxWorkerBuilder::new(lazy_pool()).build();
756        assert!(worker.is_ok());
757    }
758}