Skip to main content

hexeract_outbox_sql/
postgres.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use hexeract_outbox::ErasedHandler;
7use hexeract_outbox::Event;
8use hexeract_outbox::Handler;
9use hexeract_outbox::OutboxEnvelope;
10use hexeract_outbox::OutboxError;
11use hexeract_outbox::OutboxPublisher;
12use hexeract_outbox::OutboxStore;
13use hexeract_outbox::OutboxWorker;
14use hexeract_outbox::OutboxWorkerConfig;
15use hexeract_outbox::TypedHandler;
16use sqlx::Acquire;
17use sqlx::PgPool;
18use sqlx::Postgres;
19use sqlx::Row;
20use sqlx::Transaction;
21use sqlx::pool::PoolConnection;
22use time::OffsetDateTime;
23use uuid::Uuid;
24
25use crate::DEFAULT_TABLE_NAME;
26use crate::dialect::Dialect;
27use crate::envelope::assemble_envelope;
28use crate::envelope::to_system_time;
29use crate::validate::validate_event_type;
30use crate::validate::validate_table_name;
31
32const DIALECT: Dialect = Dialect::Postgres;
33
34/// Maximum interval in seconds that can be safely cast to `DOUBLE PRECISION`
35/// and added to a PostgreSQL `TIMESTAMPTZ` or `INTERVAL`.
36///
37/// PostgreSQL's `TIMESTAMPTZ` spans roughly from 4713 BC to 294276 AD.
38/// A `DOUBLE PRECISION` representation of a duration near [`Duration::MAX`]
39/// overflows to `inf`, which PostgreSQL rejects with `ERROR: interval out of
40/// range`. Capping at this value (roughly 292 years) prevents that error while
41/// being far beyond any practical retry interval.
42const MAX_PG_INTERVAL_SECS: f64 = 9_223_372_036.0; // i64::MAX seconds
43
44/// Convert a backoff/lease [`Duration`] to seconds for binding as
45/// `DOUBLE PRECISION` in `(NOW() + CAST($n AS DOUBLE PRECISION) * INTERVAL '1 second')`.
46///
47/// Caps at [`MAX_PG_INTERVAL_SECS`] so that pathologically large durations
48/// do not produce `inf`, which PostgreSQL would reject.
49fn duration_to_pg_secs(d: Duration) -> f64 {
50    d.as_secs_f64().min(MAX_PG_INTERVAL_SECS)
51}
52
53fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
54    OutboxError::Database(Box::new(error))
55}
56
57fn pool_error(error: sqlx::Error) -> OutboxError {
58    if matches!(error, sqlx::Error::PoolTimedOut) {
59        OutboxError::PoolTimeout
60    } else {
61        OutboxError::Database(Box::new(error))
62    }
63}
64
65/// Decode one polled row into an [`OutboxEnvelope`].
66///
67/// Kept separate from the poll loop so a decode failure can be isolated to the
68/// offending row (logged and skipped) instead of aborting the whole batch.
69fn decode_pg_row(row: &sqlx::postgres::PgRow) -> Result<OutboxEnvelope, OutboxError> {
70    let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
71    let event_type: String = row.try_get("event_type").map_err(database_error)?;
72    let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
73    let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
74    let created_at: OffsetDateTime = row.try_get("created_at").map_err(database_error)?;
75    let attempts: i32 = row.try_get("attempts").map_err(database_error)?;
76    let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
77    let next_retry_at: Option<OffsetDateTime> =
78        row.try_get("next_retry_at").map_err(database_error)?;
79
80    let payload = serde_json::to_vec(&payload)?;
81
82    Ok(assemble_envelope(
83        event_id,
84        event_type,
85        payload,
86        subject_id,
87        to_system_time(created_at),
88        u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
89        last_error,
90        next_retry_at.map(to_system_time),
91    ))
92}
93
94#[derive(Debug, Clone)]
95struct DeadLetterSql {
96    insert_sql: Arc<str>,
97    delete_sql: Arc<str>,
98}
99
100/// Apply the canonical Postgres outbox schema to the target database.
101///
102/// **Intended for POCs, integration tests and local development.**
103/// Production deployments should run their own migration tooling against the
104/// SQL rendered by [`Dialect::schema_ddl`]. Applying DDL from the running
105/// application typically requires elevated privileges that the runtime role
106/// should not own, and clashes with versioned migration workflows.
107///
108/// # Errors
109///
110/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
111/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
112pub async fn ensure_schema(pool: &PgPool, table_name: &str) -> Result<(), OutboxError> {
113    let ddl = DIALECT.schema_ddl(table_name)?;
114    sqlx::raw_sql(&ddl)
115        .execute(pool)
116        .await
117        .map_err(database_error)?;
118    Ok(())
119}
120
121/// PostgreSQL implementation of [`OutboxStore`] backed by `sqlx::PgPool`.
122///
123/// Cheap to clone (the pool and the cached SQL strings are reference-counted).
124#[derive(Debug, Clone)]
125pub struct PgOutboxStore {
126    pool: PgPool,
127    table_name: Arc<str>,
128    poll_sql: Arc<str>,
129    mark_delivered_sql: Arc<str>,
130    mark_failed_sql: Arc<str>,
131    dead_letter: Option<Arc<DeadLetterSql>>,
132}
133
134impl PgOutboxStore {
135    /// Build a store for the given pool and table.
136    ///
137    /// SQL statements are templated and cached at construction so each poll
138    /// cycle re-uses the same strings.
139    ///
140    /// # Errors
141    ///
142    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
143    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
144    pub fn new(pool: PgPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
145        let table_name = table_name.into();
146        validate_table_name(&table_name)?;
147        let poll_sql = DIALECT.poll_sql(&table_name);
148        let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
149        let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
150        Ok(Self {
151            pool,
152            table_name: Arc::from(table_name),
153            poll_sql: Arc::from(poll_sql),
154            mark_delivered_sql: Arc::from(mark_delivered_sql),
155            mark_failed_sql: Arc::from(mark_failed_sql),
156            dead_letter: None,
157        })
158    }
159
160    /// Underlying pool.
161    #[must_use]
162    pub fn pool(&self) -> &PgPool {
163        &self.pool
164    }
165
166    /// Configured table name.
167    #[must_use]
168    pub fn table_name(&self) -> &str {
169        &self.table_name
170    }
171
172    /// Activate dead-letter persistence for this store.
173    ///
174    /// When enabled, envelopes that exhaust `max_attempts` are atomically
175    /// moved to `dlq_table` by [`OutboxStore::mark_dead_lettered`].
176    ///
177    /// # Errors
178    ///
179    /// Returns [`OutboxError::Internal`] if `dlq_table` is not a valid
180    /// identifier.
181    pub fn with_dead_letter(mut self, dlq_table: impl Into<String>) -> Result<Self, OutboxError> {
182        let dlq = dlq_table.into();
183        validate_table_name(&dlq)?;
184        let insert_sql = DIALECT.insert_dead_letter_sql(&self.table_name, &dlq);
185        let delete_sql = DIALECT.delete_from_main_sql(&self.table_name);
186        self.dead_letter = Some(Arc::new(DeadLetterSql {
187            insert_sql: Arc::from(insert_sql),
188            delete_sql: Arc::from(delete_sql),
189        }));
190        Ok(self)
191    }
192}
193
194#[async_trait]
195impl OutboxStore for PgOutboxStore {
196    type Client = PoolConnection<Postgres>;
197    type Tx<'tx> = Transaction<'tx, Postgres>;
198
199    async fn acquire(&self) -> Result<Self::Client, OutboxError> {
200        self.pool.acquire().await.map_err(pool_error)
201    }
202
203    async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
204        client.begin().await.map_err(database_error)
205    }
206
207    async fn poll<'a>(
208        &self,
209        tx: &mut Self::Tx<'a>,
210        batch_size: usize,
211        max_attempts: u32,
212    ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
213        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
214        let max = i32::try_from(max_attempts).unwrap_or(i32::MAX);
215        let rows = sqlx::query(&self.poll_sql)
216            .bind(max)
217            .bind(limit)
218            .fetch_all(&mut **tx)
219            .await
220            .map_err(database_error)?;
221
222        let mut envelopes = Vec::with_capacity(rows.len());
223        for row in rows {
224            // A single undecodable row (schema drift, corrupt payload) must not
225            // abort the whole poll: that head-of-line poisons the queue forever
226            // (#214). Log it and skip so the rest of the batch keeps draining.
227            match decode_pg_row(&row) {
228                Ok(envelope) => envelopes.push(envelope),
229                Err(error) => {
230                    let event_id = row.try_get::<Uuid, _>("event_id").ok();
231                    tracing::error!(
232                        ?event_id,
233                        error = %error,
234                        "skipping undecodable outbox row; the rest of the batch continues"
235                    );
236                }
237            }
238        }
239        Ok(envelopes)
240    }
241
242    async fn mark_delivered<'a>(
243        &self,
244        tx: &mut Self::Tx<'a>,
245        event_id: Uuid,
246    ) -> Result<(), OutboxError> {
247        sqlx::query(&self.mark_delivered_sql)
248            .bind(event_id)
249            .execute(&mut **tx)
250            .await
251            .map_err(database_error)?;
252        Ok(())
253    }
254
255    async fn mark_failed<'a>(
256        &self,
257        tx: &mut Self::Tx<'a>,
258        event_id: Uuid,
259        error: &str,
260        retry_in: Duration,
261    ) -> Result<(), OutboxError> {
262        // The SQL adds this many seconds to the DB clock (#230); bind as f64.
263        // Capped so a Duration::MAX does not produce inf and cause Postgres to
264        // reject the interval (#240).
265        sqlx::query(&self.mark_failed_sql)
266            .bind(error)
267            .bind(duration_to_pg_secs(retry_in))
268            .bind(event_id)
269            .execute(&mut **tx)
270            .await
271            .map_err(database_error)?;
272        Ok(())
273    }
274
275    async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
276        tx.commit().await.map_err(database_error)
277    }
278
279    async fn mark_dead_lettered<'a>(
280        &self,
281        tx: &mut Self::Tx<'a>,
282        event_id: Uuid,
283        _error: &str,
284    ) -> Result<(), OutboxError> {
285        let Some(dlq) = &self.dead_letter else {
286            return Ok(());
287        };
288        sqlx::query(&dlq.insert_sql)
289            .bind(event_id)
290            .execute(&mut **tx)
291            .await
292            .map_err(database_error)?;
293        sqlx::query(&dlq.delete_sql)
294            .bind(event_id)
295            .execute(&mut **tx)
296            .await
297            .map_err(database_error)?;
298        Ok(())
299    }
300
301    async fn claim<'a>(
302        &self,
303        tx: &mut Self::Tx<'a>,
304        event_ids: &[Uuid],
305        lease_for: Duration,
306    ) -> Result<(), OutboxError> {
307        if event_ids.is_empty() {
308            return Ok(());
309        }
310        // claim_sql for Postgres generates `= ANY($2)` so the UUID slice is
311        // bound as a single array parameter. This avoids the 65,535
312        // bind-parameter limit that a per-row IN-list would hit at large
313        // batch sizes (#240). n is passed for API consistency but is unused
314        // for Postgres (the SQL always has exactly two bind parameters).
315        let sql = DIALECT.claim_sql(&self.table_name, event_ids.len());
316        // $1: lease interval in seconds added to the DB clock (#230).
317        //     Capped so Duration::MAX does not produce inf (#240).
318        // $2: UUID array for ANY($2).
319        sqlx::query(&sql)
320            .bind(duration_to_pg_secs(lease_for))
321            .bind(event_ids)
322            .execute(&mut **tx)
323            .await
324            .map_err(database_error)?;
325        Ok(())
326    }
327}
328
329/// PostgreSQL implementation of [`OutboxPublisher`] backed by `sqlx::PgPool`.
330///
331/// Cheap to clone (the pool and the cached insert statement are reference-counted).
332#[derive(Debug, Clone)]
333pub struct PgOutboxPublisher {
334    pool: PgPool,
335    table_name: Arc<str>,
336    insert_sql: Arc<str>,
337}
338
339impl PgOutboxPublisher {
340    /// Create a new publisher for the given pool and table.
341    ///
342    /// # Errors
343    ///
344    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
345    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
346    pub fn new(pool: PgPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
347        let table_name = table_name.into();
348        validate_table_name(&table_name)?;
349        let insert_sql = DIALECT.insert_sql(&table_name);
350        Ok(Self {
351            pool,
352            table_name: Arc::from(table_name),
353            insert_sql: Arc::from(insert_sql),
354        })
355    }
356
357    /// Underlying pool, exposed for callers that open their own transactions.
358    #[must_use]
359    pub fn pool(&self) -> &PgPool {
360        &self.pool
361    }
362
363    /// Configured table name.
364    #[must_use]
365    pub fn table_name(&self) -> &str {
366        &self.table_name
367    }
368}
369
370impl OutboxPublisher for PgOutboxPublisher {
371    type Tx<'tx> = Transaction<'tx, Postgres>;
372
373    async fn publish_in_tx<E: Event>(
374        &self,
375        tx: &mut Self::Tx<'_>,
376        event: &E,
377    ) -> Result<Uuid, OutboxError> {
378        // Validate event_type length before the INSERT so a misconfigured
379        // implementation produces a clear OutboxError::Internal rather than an
380        // opaque database truncation/rejection error (#240).
381        validate_event_type(E::EVENT_TYPE)?;
382        let event_id = Uuid::now_v7();
383        let payload = serde_json::to_value(event)?;
384        sqlx::query(&self.insert_sql)
385            .bind(event_id)
386            .bind(E::EVENT_TYPE)
387            .bind(payload)
388            .bind(Option::<Uuid>::None)
389            .execute(&mut **tx)
390            .await
391            .map_err(database_error)?;
392        Ok(event_id)
393    }
394
395    async fn publish_in_tx_with_subject<E: Event>(
396        &self,
397        tx: &mut Self::Tx<'_>,
398        subject_id: Uuid,
399        event: &E,
400    ) -> Result<Uuid, OutboxError> {
401        validate_event_type(E::EVENT_TYPE)?;
402        let event_id = Uuid::now_v7();
403        let payload = serde_json::to_value(event)?;
404        sqlx::query(&self.insert_sql)
405            .bind(event_id)
406            .bind(E::EVENT_TYPE)
407            .bind(payload)
408            .bind(Some(subject_id))
409            .execute(&mut **tx)
410            .await
411            .map_err(database_error)?;
412        Ok(event_id)
413    }
414
415    async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
416        let mut tx = self.pool.begin().await.map_err(database_error)?;
417        let event_id = self.publish_in_tx(&mut tx, event).await?;
418        tx.commit().await.map_err(database_error)?;
419        Ok(event_id)
420    }
421}
422
423/// Fluent builder for an [`OutboxWorker`] backed by [`PgOutboxStore`].
424///
425/// # Pool sizing and acquire timeout
426///
427/// The worker and every concurrent publisher draw connections from the same
428/// pool. To avoid indefinite blocking under pressure, configure an acquire
429/// timeout on the `PgPool` before passing it here:
430///
431/// ```rust,ignore
432/// use sqlx::postgres::PgPoolOptions;
433/// use std::time::Duration;
434///
435/// let pool = PgPoolOptions::new()
436///     // 1 connection for the claim cycle + 1 per concurrent publisher + 2 headroom
437///     .max_connections(batch_size + num_publishers + 2)
438///     // surface PoolTimeout instead of blocking indefinitely
439///     .acquire_timeout(Duration::from_secs(5))
440///     .connect("postgres://...")
441///     .await?;
442///
443/// let worker = PgOutboxWorkerBuilder::new(pool)
444///     .batch_size(batch_size)
445///     .build()?;
446/// ```
447///
448/// When `acquire_timeout` expires, [`OutboxStore::acquire`] returns
449/// [`OutboxError::PoolTimeout`] instead of hanging. The worker logs the
450/// error and retries after [`OutboxWorkerConfig::poll_interval`].
451///
452/// [`OutboxError::PoolTimeout`]: hexeract_outbox::OutboxError::PoolTimeout
453/// [`OutboxWorkerConfig::poll_interval`]: hexeract_outbox::OutboxWorkerConfig::poll_interval
454pub struct PgOutboxWorkerBuilder {
455    pool: PgPool,
456    table_name: String,
457    dead_letter_table: Option<String>,
458    handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
459    config: OutboxWorkerConfig,
460}
461
462impl PgOutboxWorkerBuilder {
463    /// Start a new builder for the given pool.
464    #[must_use]
465    pub fn new(pool: PgPool) -> Self {
466        Self {
467            pool,
468            table_name: DEFAULT_TABLE_NAME.to_owned(),
469            dead_letter_table: None,
470            handlers: HashMap::new(),
471            config: OutboxWorkerConfig::default(),
472        }
473    }
474
475    /// Override the outbox table name (default `"audit_outbox"`).
476    #[must_use]
477    pub fn table_name(mut self, name: impl Into<String>) -> Self {
478        self.table_name = name.into();
479        self
480    }
481
482    /// Enable dead-letter persistence for poison messages.
483    ///
484    /// Envelopes that exhaust their retry budget are atomically moved to
485    /// `dlq_table` (INSERT + DELETE in the same transaction). When not set,
486    /// exhausted envelopes are logged via `tracing::error!` but not moved.
487    #[must_use]
488    pub fn dead_letter_table(mut self, name: impl Into<String>) -> Self {
489        self.dead_letter_table = Some(name.into());
490        self
491    }
492
493    /// Register a typed handler for the event type `E`.
494    ///
495    /// Registering twice for the same event type silently replaces the
496    /// previous handler.
497    #[must_use]
498    pub fn register_handler<E, H>(mut self, handler: H) -> Self
499    where
500        E: Event,
501        H: Handler<E>,
502    {
503        let typed = TypedHandler::<E, H>::new(handler);
504        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
505        self.handlers.insert(E::EVENT_TYPE, erased);
506        self
507    }
508
509    /// Register a handler already shared behind an `Arc`.
510    #[must_use]
511    pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
512    where
513        E: Event,
514        H: Handler<E>,
515    {
516        let typed = TypedHandler::<E, H>::shared(handler);
517        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
518        self.handlers.insert(E::EVENT_TYPE, erased);
519        self
520    }
521
522    /// Override the poll interval (default 100 ms).
523    #[must_use]
524    pub fn poll_interval(mut self, d: Duration) -> Self {
525        self.config.poll_interval = d;
526        self
527    }
528
529    /// Override the batch size per poll (default 10).
530    #[must_use]
531    pub fn batch_size(mut self, n: usize) -> Self {
532        self.config.batch_size = n;
533        self
534    }
535
536    /// Override the maximum number of attempts per envelope (default 5).
537    #[must_use]
538    pub fn max_attempts(mut self, n: u32) -> Self {
539        self.config.max_attempts = n;
540        self
541    }
542
543    /// Override the base delay for exponential backoff (default 1 s).
544    ///
545    /// The actual delay before attempt `n` is `min(retry_max_delay, base × 2^n)`,
546    /// optionally jittered. See [`OutboxWorkerConfig::retry_base_delay`].
547    #[must_use]
548    pub fn retry_base_delay(mut self, d: Duration) -> Self {
549        self.config.retry_base_delay = d;
550        self
551    }
552
553    /// Override the maximum backoff delay (default 5 min).
554    ///
555    /// Caps `retry_base_delay × 2^n` regardless of the attempt count.
556    #[must_use]
557    pub fn retry_max_delay(mut self, d: Duration) -> Self {
558        self.config.retry_max_delay = d;
559        self
560    }
561
562    /// Enable or disable full jitter on the backoff delay (default `true`).
563    ///
564    /// When `true` the worker draws a uniform random value in `[0, computed_delay]`
565    /// instead of using the deterministic exponential.
566    #[must_use]
567    pub fn jitter(mut self, enabled: bool) -> Self {
568        self.config.jitter = enabled;
569        self
570    }
571
572    /// Override the per-envelope handler deadline and soft-lease unit
573    /// (default 30 s).
574    ///
575    /// Each handler invocation is wrapped in a hard `tokio` timeout of this
576    /// duration; the batch lease is sized as `batch_size x dispatch_timeout`
577    /// internally. Set it to the worst-case duration of a single handler.
578    #[must_use]
579    pub fn dispatch_timeout(mut self, d: Duration) -> Self {
580        self.config.dispatch_timeout = d;
581        self
582    }
583
584    /// Consume the builder and produce an [`OutboxWorker`] ready to spawn.
585    ///
586    /// # Errors
587    ///
588    /// Returns [`OutboxError::Internal`] if the configured `table_name`
589    /// is not a valid identifier.
590    pub fn build(self) -> Result<OutboxWorker<PgOutboxStore>, OutboxError> {
591        let mut store = PgOutboxStore::new(self.pool, self.table_name)?;
592        if let Some(dlq) = self.dead_letter_table {
593            store = store.with_dead_letter(dlq)?;
594        }
595        Ok(OutboxWorker::new(store, self.handlers, self.config))
596    }
597}
598
599/// Apply the dead-letter schema to the target Postgres database.
600///
601/// **Intended for POCs, integration tests and local development.**
602///
603/// # Errors
604///
605/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
606/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
607pub async fn ensure_dead_letter_schema(pool: &PgPool, table_name: &str) -> Result<(), OutboxError> {
608    let ddl = DIALECT.dead_letter_schema_ddl(table_name)?;
609    sqlx::raw_sql(&ddl)
610        .execute(pool)
611        .await
612        .map_err(database_error)?;
613    Ok(())
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619    use hexeract_core::HandlerContext;
620    use serde::Deserialize;
621    use serde::Serialize;
622
623    fn lazy_pool() -> PgPool {
624        PgPool::connect_lazy("postgres://nobody:nobody@127.0.0.1:1/nobody")
625            .expect("lazy pool must build from a valid URL")
626    }
627
628    #[derive(Debug, Serialize, Deserialize)]
629    struct UserRegistered {
630        user_id: Uuid,
631    }
632
633    impl Event for UserRegistered {
634        const EVENT_TYPE: &'static str = "users.registered";
635    }
636
637    #[derive(Debug, Serialize, Deserialize)]
638    struct OrderPlaced {
639        order_id: Uuid,
640    }
641
642    impl Event for OrderPlaced {
643        const EVENT_TYPE: &'static str = "orders.placed";
644    }
645
646    struct NoopHandler;
647
648    impl Handler<UserRegistered> for NoopHandler {
649        type Error = OutboxError;
650        async fn handle(
651            &self,
652            _event: UserRegistered,
653            _ctx: &HandlerContext,
654        ) -> Result<(), Self::Error> {
655            Ok(())
656        }
657    }
658
659    impl Handler<OrderPlaced> for NoopHandler {
660        type Error = OutboxError;
661        async fn handle(
662            &self,
663            _event: OrderPlaced,
664            _ctx: &HandlerContext,
665        ) -> Result<(), Self::Error> {
666            Ok(())
667        }
668    }
669
670    #[test]
671    fn pool_error_maps_pool_timed_out_to_pool_timeout_variant() {
672        let err = pool_error(sqlx::Error::PoolTimedOut);
673        assert!(
674            matches!(err, OutboxError::PoolTimeout),
675            "PoolTimedOut must map to OutboxError::PoolTimeout, got {err:?}"
676        );
677    }
678
679    #[test]
680    fn pool_error_wraps_other_errors_as_database_error() {
681        let err = pool_error(sqlx::Error::RowNotFound);
682        assert!(
683            matches!(err, OutboxError::Database(_)),
684            "non-timeout errors must map to OutboxError::Database, got {err:?}"
685        );
686    }
687
688    #[tokio::test]
689    async fn store_new_rejects_invalid_table_name() {
690        let err = PgOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
691        assert!(matches!(err, OutboxError::Internal(_)));
692    }
693
694    #[tokio::test]
695    async fn store_new_caches_postgres_sql_with_validated_table_name() {
696        let store = PgOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
697        assert_eq!(store.table_name(), "audit_outbox");
698        assert!(store.poll_sql.contains("FROM \"audit_outbox\""));
699        assert!(store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
700        assert!(store.mark_delivered_sql.contains("UPDATE \"audit_outbox\""));
701        // The attempt increment lives in claim_sql now (see #213), so
702        // mark_failed must not increment again.
703        assert!(!store.mark_failed_sql.contains("attempts = attempts + 1"));
704    }
705
706    #[tokio::test]
707    async fn publisher_new_rejects_invalid_table_name() {
708        let err = PgOutboxPublisher::new(lazy_pool(), "bad name; DROP").unwrap_err();
709        assert!(matches!(err, OutboxError::Internal(_)));
710    }
711
712    #[tokio::test]
713    async fn publisher_new_caches_insert_sql_with_validated_table_name() {
714        let publisher = PgOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
715        assert_eq!(publisher.table_name(), "audit_outbox");
716        assert!(
717            publisher
718                .insert_sql
719                .contains("INSERT INTO \"audit_outbox\"")
720        );
721        assert!(publisher.insert_sql.contains("$1, $2, $3, $4"));
722    }
723
724    #[tokio::test]
725    async fn builder_starts_with_default_table_and_empty_handlers() {
726        let builder = PgOutboxWorkerBuilder::new(lazy_pool());
727        assert_eq!(builder.table_name, DEFAULT_TABLE_NAME);
728        assert!(builder.handlers.is_empty());
729        let default_cfg = OutboxWorkerConfig::default();
730        assert_eq!(builder.config.batch_size, default_cfg.batch_size);
731        assert_eq!(builder.config.max_attempts, default_cfg.max_attempts);
732    }
733
734    #[tokio::test]
735    async fn builder_table_name_can_be_customized() {
736        let builder = PgOutboxWorkerBuilder::new(lazy_pool()).table_name("my_outbox");
737        assert_eq!(builder.table_name, "my_outbox");
738    }
739
740    #[tokio::test]
741    async fn builder_register_handler_records_event_types() {
742        let builder = PgOutboxWorkerBuilder::new(lazy_pool())
743            .register_handler::<UserRegistered, _>(NoopHandler)
744            .register_handler::<OrderPlaced, _>(NoopHandler);
745        assert_eq!(builder.handlers.len(), 2);
746        assert!(builder.handlers.contains_key("users.registered"));
747        assert!(builder.handlers.contains_key("orders.placed"));
748    }
749
750    #[tokio::test]
751    async fn builder_register_handler_twice_replaces_silently() {
752        let builder = PgOutboxWorkerBuilder::new(lazy_pool())
753            .register_handler::<UserRegistered, _>(NoopHandler)
754            .register_handler::<UserRegistered, _>(NoopHandler);
755        assert_eq!(builder.handlers.len(), 1);
756    }
757
758    #[tokio::test]
759    async fn builder_build_rejects_invalid_table_name() {
760        let result = PgOutboxWorkerBuilder::new(lazy_pool())
761            .table_name("bad name; DROP TABLE")
762            .build();
763        assert!(matches!(result, Err(OutboxError::Internal(_))));
764    }
765
766    #[tokio::test]
767    async fn builder_build_with_default_table_name_succeeds() {
768        let worker = PgOutboxWorkerBuilder::new(lazy_pool()).build();
769        assert!(worker.is_ok());
770    }
771
772    #[tokio::test]
773    async fn store_with_dead_letter_caches_sql_for_dlq() {
774        let store = PgOutboxStore::new(lazy_pool(), "audit_outbox")
775            .unwrap()
776            .with_dead_letter("audit_outbox_dead_letter")
777            .unwrap();
778        let dlq = store.dead_letter.as_ref().unwrap();
779        assert!(
780            dlq.insert_sql
781                .contains("INSERT INTO \"audit_outbox_dead_letter\"")
782        );
783        assert!(dlq.insert_sql.contains("FROM \"audit_outbox\""));
784        assert!(dlq.insert_sql.contains("$1"));
785        assert!(dlq.delete_sql.contains("DELETE FROM \"audit_outbox\""));
786        assert!(dlq.delete_sql.contains("$1"));
787    }
788
789    #[tokio::test]
790    async fn store_with_dead_letter_rejects_invalid_dlq_name() {
791        let err = PgOutboxStore::new(lazy_pool(), "audit_outbox")
792            .unwrap()
793            .with_dead_letter("bad name; DROP")
794            .unwrap_err();
795        assert!(matches!(err, OutboxError::Internal(_)));
796    }
797
798    #[tokio::test]
799    async fn builder_dead_letter_table_propagates_to_store() {
800        let worker = PgOutboxWorkerBuilder::new(lazy_pool())
801            .dead_letter_table("audit_outbox_dead_letter")
802            .build()
803            .unwrap();
804        drop(worker);
805    }
806
807    #[tokio::test]
808    async fn builder_dispatch_timeout_overrides_default() {
809        let worker = PgOutboxWorkerBuilder::new(lazy_pool())
810            .dispatch_timeout(Duration::from_secs(60))
811            .build()
812            .unwrap();
813        drop(worker);
814    }
815
816    #[test]
817    fn store_claim_sql_uses_any_array_bind() {
818        // Postgres claim uses = ANY($2) so the bind count is always 2,
819        // regardless of batch size (#240).
820        let sql = DIALECT.claim_sql("audit_outbox", 3);
821        assert!(sql.contains("UPDATE \"audit_outbox\""));
822        assert!(sql.contains("next_retry_at = (NOW() +"));
823        assert!(sql.contains("$1"));
824        assert!(sql.contains("WHERE event_id = ANY($2)"));
825        assert!(!sql.contains("$3"));
826        assert!(sql.contains("attempts = attempts + 1"));
827    }
828
829    #[test]
830    fn duration_to_pg_secs_caps_at_max_interval() {
831        // Duration::MAX produces inf via as_secs_f64(), which Postgres rejects
832        // with "interval out of range". The helper must cap the output.
833        let capped = duration_to_pg_secs(Duration::MAX);
834        assert!(
835            capped.is_finite(),
836            "Duration::MAX must produce a finite f64, got {capped}"
837        );
838        // Use approximate comparison for f64 (float_cmp lint).
839        assert!(
840            (capped - MAX_PG_INTERVAL_SECS).abs() < 1.0,
841            "capped value must equal MAX_PG_INTERVAL_SECS, got {capped}"
842        );
843
844        // Ordinary values must pass through unchanged.
845        let ordinary = Duration::from_secs(300);
846        assert!(
847            (duration_to_pg_secs(ordinary) - 300.0_f64).abs() < f64::EPSILON,
848            "ordinary duration must not be capped"
849        );
850    }
851
852    #[tokio::test]
853    async fn publisher_rejects_event_type_exceeding_64_bytes() {
854        // publish_in_tx must validate E::EVENT_TYPE before touching the DB so
855        // the caller gets OutboxError::Internal instead of an opaque DB error.
856        // 65 bytes: exceeds VARCHAR(64)
857        const OVERLENGTH_EVENT_TYPE: &str =
858            "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1";
859
860        let publisher = PgOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
861        // We cannot call publish_in_tx without a real transaction, so verify
862        // the validation function directly with the same constant used at publish
863        // time.
864        let result = validate_event_type(OVERLENGTH_EVENT_TYPE);
865        assert!(
866            matches!(result, Err(OutboxError::Internal(_))),
867            "overlength EVENT_TYPE must be rejected before the DB insert"
868        );
869        drop(publisher);
870    }
871}