Skip to main content

hexeract_outbox_sql/
mysql.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use hexeract_outbox::ErasedHandler;
7use hexeract_outbox::Event;
8use hexeract_outbox::Handler;
9use hexeract_outbox::OutboxEnvelope;
10use hexeract_outbox::OutboxError;
11use hexeract_outbox::OutboxPublisher;
12use hexeract_outbox::OutboxStore;
13use hexeract_outbox::OutboxWorker;
14use hexeract_outbox::OutboxWorkerConfig;
15use hexeract_outbox::TypedHandler;
16use sqlx::Acquire;
17use sqlx::MySql;
18use sqlx::MySqlPool;
19use sqlx::Row;
20use sqlx::Transaction;
21use sqlx::pool::PoolConnection;
22use time::PrimitiveDateTime;
23use uuid::Uuid;
24
25use crate::DEFAULT_TABLE_NAME;
26use crate::dialect::Dialect;
27use crate::envelope::assemble_envelope;
28use crate::envelope::primitive_utc_to_system_time;
29use crate::validate::validate_event_type;
30use crate::validate::validate_table_name;
31
32const DIALECT: Dialect = Dialect::MySql;
33
34/// Convert a backoff/lease [`Duration`] into whole microseconds for binding to
35/// a MySQL `INTERVAL ? MICROSECOND` expression, saturating at [`i64::MAX`].
36fn duration_to_micros(d: Duration) -> i64 {
37    i64::try_from(d.as_micros()).unwrap_or(i64::MAX)
38}
39
40fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
41    OutboxError::Database(Box::new(error))
42}
43
44fn pool_error(error: sqlx::Error) -> OutboxError {
45    if matches!(error, sqlx::Error::PoolTimedOut) {
46        OutboxError::PoolTimeout
47    } else {
48        OutboxError::Database(Box::new(error))
49    }
50}
51
52/// Decode one polled row into an [`OutboxEnvelope`].
53///
54/// Kept separate from the poll loop so a decode failure can be isolated to the
55/// offending row (logged and skipped) instead of aborting the whole batch.
56fn decode_mysql_row(row: &sqlx::mysql::MySqlRow) -> Result<OutboxEnvelope, OutboxError> {
57    let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
58    let event_type: String = row.try_get("event_type").map_err(database_error)?;
59    let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
60    let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
61    let created_at: PrimitiveDateTime = row.try_get("created_at").map_err(database_error)?;
62    let attempts: i32 = row.try_get("attempts").map_err(database_error)?;
63    let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
64    let next_retry_at: Option<PrimitiveDateTime> =
65        row.try_get("next_retry_at").map_err(database_error)?;
66
67    let payload = serde_json::to_vec(&payload)?;
68
69    Ok(assemble_envelope(
70        event_id,
71        event_type,
72        payload,
73        subject_id,
74        primitive_utc_to_system_time(created_at),
75        u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
76        last_error,
77        next_retry_at.map(primitive_utc_to_system_time),
78    ))
79}
80
81#[derive(Debug, Clone)]
82struct DeadLetterSql {
83    insert_sql: Arc<str>,
84    delete_sql: Arc<str>,
85}
86
87/// Apply the canonical MySQL outbox schema to the target database.
88///
89/// **Intended for POCs, integration tests and local development.**
90/// Production deployments should run their own migration tooling against the
91/// SQL rendered by [`Dialect::schema_ddl`].
92///
93/// # Errors
94///
95/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
96/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
97pub async fn ensure_schema(pool: &MySqlPool, table_name: &str) -> Result<(), OutboxError> {
98    let ddl = DIALECT.schema_ddl(table_name)?;
99    sqlx::raw_sql(&ddl)
100        .execute(pool)
101        .await
102        .map_err(database_error)?;
103    Ok(())
104}
105
106/// MySQL implementation of [`OutboxStore`] backed by `sqlx::MySqlPool`.
107///
108/// Cheap to clone (the pool and the cached SQL strings are reference-counted).
109#[derive(Debug, Clone)]
110pub struct MySqlOutboxStore {
111    pool: MySqlPool,
112    table_name: Arc<str>,
113    poll_sql: Arc<str>,
114    mark_delivered_sql: Arc<str>,
115    mark_failed_sql: Arc<str>,
116    dead_letter: Option<Arc<DeadLetterSql>>,
117}
118
119impl MySqlOutboxStore {
120    /// Build a store for the given pool and table.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
125    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
126    pub fn new(pool: MySqlPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
127        let table_name = table_name.into();
128        validate_table_name(&table_name)?;
129        let poll_sql = DIALECT.poll_sql(&table_name);
130        let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
131        let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
132        Ok(Self {
133            pool,
134            table_name: Arc::from(table_name),
135            poll_sql: Arc::from(poll_sql),
136            mark_delivered_sql: Arc::from(mark_delivered_sql),
137            mark_failed_sql: Arc::from(mark_failed_sql),
138            dead_letter: None,
139        })
140    }
141
142    /// Underlying pool.
143    #[must_use]
144    pub fn pool(&self) -> &MySqlPool {
145        &self.pool
146    }
147
148    /// Configured table name.
149    #[must_use]
150    pub fn table_name(&self) -> &str {
151        &self.table_name
152    }
153
154    /// Activate dead-letter persistence for poison messages.
155    ///
156    /// # Errors
157    ///
158    /// Returns [`OutboxError::Internal`] if `dlq_table` is not a valid identifier.
159    pub fn with_dead_letter(mut self, dlq_table: impl Into<String>) -> Result<Self, OutboxError> {
160        let dlq = dlq_table.into();
161        validate_table_name(&dlq)?;
162        let insert_sql = DIALECT.insert_dead_letter_sql(&self.table_name, &dlq);
163        let delete_sql = DIALECT.delete_from_main_sql(&self.table_name);
164        self.dead_letter = Some(Arc::new(DeadLetterSql {
165            insert_sql: Arc::from(insert_sql),
166            delete_sql: Arc::from(delete_sql),
167        }));
168        Ok(self)
169    }
170}
171
172#[async_trait]
173impl OutboxStore for MySqlOutboxStore {
174    type Client = PoolConnection<MySql>;
175    type Tx<'tx> = Transaction<'tx, MySql>;
176
177    async fn acquire(&self) -> Result<Self::Client, OutboxError> {
178        self.pool.acquire().await.map_err(pool_error)
179    }
180
181    async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
182        client.begin().await.map_err(database_error)
183    }
184
185    async fn poll<'a>(
186        &self,
187        tx: &mut Self::Tx<'a>,
188        batch_size: usize,
189        max_attempts: u32,
190    ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
191        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
192        let max = i32::try_from(max_attempts).unwrap_or(i32::MAX);
193        let rows = sqlx::query(&self.poll_sql)
194            .bind(max)
195            .bind(limit)
196            .fetch_all(&mut **tx)
197            .await
198            .map_err(database_error)?;
199
200        let mut envelopes = Vec::with_capacity(rows.len());
201        for row in rows {
202            // A single undecodable row (schema drift, corrupt payload) must not
203            // abort the whole poll: that head-of-line poisons the queue forever
204            // (#214). Log it and skip so the rest of the batch keeps draining.
205            match decode_mysql_row(&row) {
206                Ok(envelope) => envelopes.push(envelope),
207                Err(error) => {
208                    let event_id = row.try_get::<Uuid, _>("event_id").ok();
209                    tracing::error!(
210                        ?event_id,
211                        error = %error,
212                        "skipping undecodable outbox row; the rest of the batch continues"
213                    );
214                }
215            }
216        }
217        Ok(envelopes)
218    }
219
220    async fn mark_delivered<'a>(
221        &self,
222        tx: &mut Self::Tx<'a>,
223        event_id: Uuid,
224    ) -> Result<(), OutboxError> {
225        sqlx::query(&self.mark_delivered_sql)
226            .bind(event_id)
227            .execute(&mut **tx)
228            .await
229            .map_err(database_error)?;
230        Ok(())
231    }
232
233    async fn mark_failed<'a>(
234        &self,
235        tx: &mut Self::Tx<'a>,
236        event_id: Uuid,
237        error: &str,
238        retry_in: Duration,
239    ) -> Result<(), OutboxError> {
240        // The SQL adds this many microseconds to the DB clock (#230).
241        sqlx::query(&self.mark_failed_sql)
242            .bind(error)
243            .bind(duration_to_micros(retry_in))
244            .bind(event_id)
245            .execute(&mut **tx)
246            .await
247            .map_err(database_error)?;
248        Ok(())
249    }
250
251    async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
252        tx.commit().await.map_err(database_error)
253    }
254
255    async fn mark_dead_lettered<'a>(
256        &self,
257        tx: &mut Self::Tx<'a>,
258        event_id: Uuid,
259        _error: &str,
260    ) -> Result<(), OutboxError> {
261        let Some(dlq) = &self.dead_letter else {
262            return Ok(());
263        };
264        sqlx::query(&dlq.insert_sql)
265            .bind(event_id)
266            .execute(&mut **tx)
267            .await
268            .map_err(database_error)?;
269        sqlx::query(&dlq.delete_sql)
270            .bind(event_id)
271            .execute(&mut **tx)
272            .await
273            .map_err(database_error)?;
274        Ok(())
275    }
276
277    async fn claim<'a>(
278        &self,
279        tx: &mut Self::Tx<'a>,
280        event_ids: &[Uuid],
281        lease_for: Duration,
282    ) -> Result<(), OutboxError> {
283        if event_ids.is_empty() {
284            return Ok(());
285        }
286        let sql = DIALECT.claim_sql(&self.table_name, event_ids.len());
287        // The SQL adds this many microseconds to the DB clock (#230).
288        let mut query = sqlx::query(&sql).bind(duration_to_micros(lease_for));
289        for id in event_ids {
290            query = query.bind(*id);
291        }
292        query.execute(&mut **tx).await.map_err(database_error)?;
293        Ok(())
294    }
295}
296
297/// MySQL implementation of [`OutboxPublisher`] backed by `sqlx::MySqlPool`.
298///
299/// Cheap to clone (the pool and the cached insert statement are reference-counted).
300#[derive(Debug, Clone)]
301pub struct MySqlOutboxPublisher {
302    pool: MySqlPool,
303    table_name: Arc<str>,
304    insert_sql: Arc<str>,
305}
306
307impl MySqlOutboxPublisher {
308    /// Create a new publisher for the given pool and table.
309    ///
310    /// # Errors
311    ///
312    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
313    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
314    pub fn new(pool: MySqlPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
315        let table_name = table_name.into();
316        validate_table_name(&table_name)?;
317        let insert_sql = DIALECT.insert_sql(&table_name);
318        Ok(Self {
319            pool,
320            table_name: Arc::from(table_name),
321            insert_sql: Arc::from(insert_sql),
322        })
323    }
324
325    /// Underlying pool, exposed for callers that open their own transactions.
326    #[must_use]
327    pub fn pool(&self) -> &MySqlPool {
328        &self.pool
329    }
330
331    /// Configured table name.
332    #[must_use]
333    pub fn table_name(&self) -> &str {
334        &self.table_name
335    }
336}
337
338impl OutboxPublisher for MySqlOutboxPublisher {
339    type Tx<'tx> = Transaction<'tx, MySql>;
340
341    async fn publish_in_tx<E: Event>(
342        &self,
343        tx: &mut Self::Tx<'_>,
344        event: &E,
345    ) -> Result<Uuid, OutboxError> {
346        validate_event_type(E::EVENT_TYPE)?;
347        let event_id = Uuid::now_v7();
348        let payload = serde_json::to_value(event)?;
349        sqlx::query(&self.insert_sql)
350            .bind(event_id)
351            .bind(E::EVENT_TYPE)
352            .bind(payload)
353            .bind(Option::<Uuid>::None)
354            .execute(&mut **tx)
355            .await
356            .map_err(database_error)?;
357        Ok(event_id)
358    }
359
360    async fn publish_in_tx_with_subject<E: Event>(
361        &self,
362        tx: &mut Self::Tx<'_>,
363        subject_id: Uuid,
364        event: &E,
365    ) -> Result<Uuid, OutboxError> {
366        validate_event_type(E::EVENT_TYPE)?;
367        let event_id = Uuid::now_v7();
368        let payload = serde_json::to_value(event)?;
369        sqlx::query(&self.insert_sql)
370            .bind(event_id)
371            .bind(E::EVENT_TYPE)
372            .bind(payload)
373            .bind(Some(subject_id))
374            .execute(&mut **tx)
375            .await
376            .map_err(database_error)?;
377        Ok(event_id)
378    }
379
380    async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
381        let mut tx = self.pool.begin().await.map_err(database_error)?;
382        let event_id = self.publish_in_tx(&mut tx, event).await?;
383        tx.commit().await.map_err(database_error)?;
384        Ok(event_id)
385    }
386}
387
388/// Fluent builder for an [`OutboxWorker`] backed by [`MySqlOutboxStore`].
389///
390/// # Pool sizing and acquire timeout
391///
392/// The worker and every concurrent publisher draw connections from the same
393/// pool. To avoid indefinite blocking under pressure, configure an acquire
394/// timeout on the `MySqlPool` before passing it here:
395///
396/// ```rust,ignore
397/// use sqlx::mysql::MySqlPoolOptions;
398/// use std::time::Duration;
399///
400/// let pool = MySqlPoolOptions::new()
401///     // 1 connection for the claim cycle + 1 per concurrent publisher + 2 headroom
402///     .max_connections(batch_size + num_publishers + 2)
403///     // surface PoolTimeout instead of blocking indefinitely
404///     .acquire_timeout(Duration::from_secs(5))
405///     .connect("mysql://...")
406///     .await?;
407///
408/// let worker = MySqlOutboxWorkerBuilder::new(pool)
409///     .batch_size(batch_size)
410///     .build()?;
411/// ```
412///
413/// When `acquire_timeout` expires, [`OutboxStore::acquire`] returns
414/// [`OutboxError::PoolTimeout`] instead of hanging. The worker logs the
415/// error and retries after [`OutboxWorkerConfig::poll_interval`].
416///
417/// [`OutboxError::PoolTimeout`]: hexeract_outbox::OutboxError::PoolTimeout
418/// [`OutboxWorkerConfig::poll_interval`]: hexeract_outbox::OutboxWorkerConfig::poll_interval
419pub struct MySqlOutboxWorkerBuilder {
420    pool: MySqlPool,
421    table_name: String,
422    dead_letter_table: Option<String>,
423    handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
424    config: OutboxWorkerConfig,
425}
426
427impl MySqlOutboxWorkerBuilder {
428    /// Start a new builder for the given pool.
429    #[must_use]
430    pub fn new(pool: MySqlPool) -> Self {
431        Self {
432            pool,
433            table_name: DEFAULT_TABLE_NAME.to_owned(),
434            dead_letter_table: None,
435            handlers: HashMap::new(),
436            config: OutboxWorkerConfig::default(),
437        }
438    }
439
440    /// Override the outbox table name (default `"audit_outbox"`).
441    #[must_use]
442    pub fn table_name(mut self, name: impl Into<String>) -> Self {
443        self.table_name = name.into();
444        self
445    }
446
447    /// Enable dead-letter persistence for poison messages.
448    #[must_use]
449    pub fn dead_letter_table(mut self, name: impl Into<String>) -> Self {
450        self.dead_letter_table = Some(name.into());
451        self
452    }
453
454    /// Register a typed handler for the event type `E`.
455    ///
456    /// Registering twice for the same event type silently replaces the
457    /// previous handler.
458    #[must_use]
459    pub fn register_handler<E, H>(mut self, handler: H) -> Self
460    where
461        E: Event,
462        H: Handler<E>,
463    {
464        let typed = TypedHandler::<E, H>::new(handler);
465        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
466        self.handlers.insert(E::EVENT_TYPE, erased);
467        self
468    }
469
470    /// Register a handler already shared behind an `Arc`.
471    #[must_use]
472    pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
473    where
474        E: Event,
475        H: Handler<E>,
476    {
477        let typed = TypedHandler::<E, H>::shared(handler);
478        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
479        self.handlers.insert(E::EVENT_TYPE, erased);
480        self
481    }
482
483    /// Override the poll interval (default 100 ms).
484    #[must_use]
485    pub fn poll_interval(mut self, d: Duration) -> Self {
486        self.config.poll_interval = d;
487        self
488    }
489
490    /// Override the batch size per poll (default 10).
491    #[must_use]
492    pub fn batch_size(mut self, n: usize) -> Self {
493        self.config.batch_size = n;
494        self
495    }
496
497    /// Override the maximum number of attempts per envelope (default 5).
498    #[must_use]
499    pub fn max_attempts(mut self, n: u32) -> Self {
500        self.config.max_attempts = n;
501        self
502    }
503
504    /// Override the base delay for exponential backoff (default 1 s).
505    #[must_use]
506    pub fn retry_base_delay(mut self, d: Duration) -> Self {
507        self.config.retry_base_delay = d;
508        self
509    }
510
511    /// Override the maximum backoff delay (default 5 min).
512    #[must_use]
513    pub fn retry_max_delay(mut self, d: Duration) -> Self {
514        self.config.retry_max_delay = d;
515        self
516    }
517
518    /// Enable or disable full jitter on the backoff delay (default `true`).
519    #[must_use]
520    pub fn jitter(mut self, enabled: bool) -> Self {
521        self.config.jitter = enabled;
522        self
523    }
524
525    /// Override the per-envelope handler deadline and soft-lease unit
526    /// (default 30 s).
527    ///
528    /// Each handler invocation is wrapped in a hard `tokio` timeout of this
529    /// duration; the batch lease is sized as `batch_size x dispatch_timeout`
530    /// internally. Set it to the worst-case duration of a single handler.
531    #[must_use]
532    pub fn dispatch_timeout(mut self, d: Duration) -> Self {
533        self.config.dispatch_timeout = d;
534        self
535    }
536
537    /// Consume the builder and produce an [`OutboxWorker`] ready to spawn.
538    ///
539    /// # Errors
540    ///
541    /// Returns [`OutboxError::Internal`] if the configured `table_name`
542    /// is not a valid identifier.
543    pub fn build(self) -> Result<OutboxWorker<MySqlOutboxStore>, OutboxError> {
544        let mut store = MySqlOutboxStore::new(self.pool, self.table_name)?;
545        if let Some(dlq) = self.dead_letter_table {
546            store = store.with_dead_letter(dlq)?;
547        }
548        Ok(OutboxWorker::new(store, self.handlers, self.config))
549    }
550}
551
552/// Apply the dead-letter schema to the target MySQL database.
553///
554/// **Intended for POCs, integration tests and local development.**
555///
556/// # Errors
557///
558/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
559/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
560pub async fn ensure_dead_letter_schema(
561    pool: &MySqlPool,
562    table_name: &str,
563) -> Result<(), OutboxError> {
564    let ddl = DIALECT.dead_letter_schema_ddl(table_name)?;
565    sqlx::raw_sql(&ddl)
566        .execute(pool)
567        .await
568        .map_err(database_error)?;
569    Ok(())
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use hexeract_core::HandlerContext;
576    use serde::Deserialize;
577    use serde::Serialize;
578
579    fn lazy_pool() -> MySqlPool {
580        MySqlPool::connect_lazy("mysql://nobody:nobody@127.0.0.1:1/nobody")
581            .expect("lazy pool must build from a valid URL")
582    }
583
584    #[derive(Debug, Serialize, Deserialize)]
585    struct UserRegistered {
586        user_id: Uuid,
587    }
588
589    impl Event for UserRegistered {
590        const EVENT_TYPE: &'static str = "users.registered";
591    }
592
593    #[derive(Debug, Serialize, Deserialize)]
594    struct OrderPlaced {
595        order_id: Uuid,
596    }
597
598    impl Event for OrderPlaced {
599        const EVENT_TYPE: &'static str = "orders.placed";
600    }
601
602    struct NoopHandler;
603
604    impl Handler<UserRegistered> for NoopHandler {
605        type Error = OutboxError;
606        async fn handle(
607            &self,
608            _event: UserRegistered,
609            _ctx: &HandlerContext,
610        ) -> Result<(), Self::Error> {
611            Ok(())
612        }
613    }
614
615    impl Handler<OrderPlaced> for NoopHandler {
616        type Error = OutboxError;
617        async fn handle(
618            &self,
619            _event: OrderPlaced,
620            _ctx: &HandlerContext,
621        ) -> Result<(), Self::Error> {
622            Ok(())
623        }
624    }
625
626    #[test]
627    fn pool_error_maps_pool_timed_out_to_pool_timeout_variant() {
628        let err = pool_error(sqlx::Error::PoolTimedOut);
629        assert!(
630            matches!(err, OutboxError::PoolTimeout),
631            "PoolTimedOut must map to OutboxError::PoolTimeout, got {err:?}"
632        );
633    }
634
635    #[test]
636    fn pool_error_wraps_other_errors_as_database_error() {
637        let err = pool_error(sqlx::Error::RowNotFound);
638        assert!(
639            matches!(err, OutboxError::Database(_)),
640            "non-timeout errors must map to OutboxError::Database, got {err:?}"
641        );
642    }
643
644    #[tokio::test]
645    async fn store_new_rejects_invalid_table_name() {
646        let err = MySqlOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
647        assert!(matches!(err, OutboxError::Internal(_)));
648    }
649
650    #[tokio::test]
651    async fn store_new_caches_mysql_sql_with_validated_table_name() {
652        let store = MySqlOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
653        assert_eq!(store.table_name(), "audit_outbox");
654        assert!(store.poll_sql.contains("FROM `audit_outbox`"));
655        assert!(store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
656        assert!(store.poll_sql.contains("UTC_TIMESTAMP(6)"));
657        assert!(store.mark_delivered_sql.contains("UPDATE `audit_outbox`"));
658        // The attempt increment lives in claim_sql now (see #213), so
659        // mark_failed must not increment again.
660        assert!(!store.mark_failed_sql.contains("attempts = attempts + 1"));
661    }
662
663    #[tokio::test]
664    async fn publisher_new_caches_insert_sql_with_question_marks() {
665        let publisher = MySqlOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
666        assert_eq!(publisher.table_name(), "audit_outbox");
667        assert!(publisher.insert_sql.contains("INSERT INTO `audit_outbox`"));
668        assert!(publisher.insert_sql.contains("?, ?, ?, ?"));
669    }
670
671    #[tokio::test]
672    async fn builder_starts_with_default_table_and_empty_handlers() {
673        let builder = MySqlOutboxWorkerBuilder::new(lazy_pool());
674        assert_eq!(builder.table_name, DEFAULT_TABLE_NAME);
675        assert!(builder.handlers.is_empty());
676    }
677
678    #[tokio::test]
679    async fn builder_register_handler_records_event_types() {
680        let builder = MySqlOutboxWorkerBuilder::new(lazy_pool())
681            .register_handler::<UserRegistered, _>(NoopHandler)
682            .register_handler::<OrderPlaced, _>(NoopHandler);
683        assert_eq!(builder.handlers.len(), 2);
684        assert!(builder.handlers.contains_key("users.registered"));
685        assert!(builder.handlers.contains_key("orders.placed"));
686    }
687
688    #[tokio::test]
689    async fn builder_build_rejects_invalid_table_name() {
690        let result = MySqlOutboxWorkerBuilder::new(lazy_pool())
691            .table_name("bad name; DROP TABLE")
692            .build();
693        assert!(matches!(result, Err(OutboxError::Internal(_))));
694    }
695
696    #[tokio::test]
697    async fn builder_build_with_default_table_name_succeeds() {
698        let worker = MySqlOutboxWorkerBuilder::new(lazy_pool()).build();
699        assert!(worker.is_ok());
700    }
701
702    #[tokio::test]
703    async fn builder_dispatch_timeout_overrides_default() {
704        let worker = MySqlOutboxWorkerBuilder::new(lazy_pool())
705            .dispatch_timeout(Duration::from_secs(60))
706            .build()
707            .unwrap();
708        drop(worker);
709    }
710
711    #[test]
712    fn store_claim_sql_embeds_table_name_and_question_mark_placeholders() {
713        let sql = DIALECT.claim_sql("audit_outbox", 2);
714        assert!(sql.contains("UPDATE `audit_outbox`"));
715        assert!(sql.contains("next_retry_at = (UTC_TIMESTAMP(6) + INTERVAL ? MICROSECOND)"));
716        assert!(sql.contains("WHERE event_id IN (?, ?)"));
717        assert!(sql.contains("attempts = attempts + 1"));
718    }
719}