Skip to main content

hexeract_outbox_sql/
mysql.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4use std::time::SystemTime;
5
6use async_trait::async_trait;
7use hexeract_outbox::ErasedHandler;
8use hexeract_outbox::Event;
9use hexeract_outbox::Handler;
10use hexeract_outbox::OutboxEnvelope;
11use hexeract_outbox::OutboxError;
12use hexeract_outbox::OutboxPublisher;
13use hexeract_outbox::OutboxStore;
14use hexeract_outbox::OutboxWorker;
15use hexeract_outbox::OutboxWorkerConfig;
16use hexeract_outbox::TypedHandler;
17use sqlx::Acquire;
18use sqlx::MySql;
19use sqlx::MySqlPool;
20use sqlx::Row;
21use sqlx::Transaction;
22use sqlx::pool::PoolConnection;
23use time::PrimitiveDateTime;
24use uuid::Uuid;
25
26use crate::DEFAULT_TABLE_NAME;
27use crate::dialect::Dialect;
28use crate::envelope::assemble_envelope;
29use crate::envelope::primitive_utc_to_system_time;
30use crate::envelope::to_primitive_utc;
31use crate::validate::validate_table_name;
32
33const DIALECT: Dialect = Dialect::MySql;
34
35fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
36    OutboxError::Database(Box::new(error))
37}
38
39/// Apply the canonical MySQL outbox schema to the target database.
40///
41/// **Intended for POCs, integration tests and local development.**
42/// Production deployments should run their own migration tooling against the
43/// SQL rendered by [`Dialect::schema_ddl`].
44///
45/// # Errors
46///
47/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
48/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
49pub async fn ensure_schema(pool: &MySqlPool, table_name: &str) -> Result<(), OutboxError> {
50    let ddl = DIALECT.schema_ddl(table_name)?;
51    sqlx::raw_sql(&ddl)
52        .execute(pool)
53        .await
54        .map_err(database_error)?;
55    Ok(())
56}
57
58/// MySQL implementation of [`OutboxStore`] backed by `sqlx::MySqlPool`.
59///
60/// Cheap to clone (the pool and the cached SQL strings are reference-counted).
61#[derive(Debug, Clone)]
62pub struct MySqlOutboxStore {
63    pool: MySqlPool,
64    table_name: Arc<str>,
65    poll_sql: Arc<str>,
66    mark_delivered_sql: Arc<str>,
67    mark_failed_sql: Arc<str>,
68}
69
70impl MySqlOutboxStore {
71    /// Build a store for the given pool and table.
72    ///
73    /// # Errors
74    ///
75    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
76    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
77    pub fn new(pool: MySqlPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
78        let table_name = table_name.into();
79        validate_table_name(&table_name)?;
80        let poll_sql = DIALECT.poll_sql(&table_name);
81        let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
82        let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
83        Ok(Self {
84            pool,
85            table_name: Arc::from(table_name),
86            poll_sql: Arc::from(poll_sql),
87            mark_delivered_sql: Arc::from(mark_delivered_sql),
88            mark_failed_sql: Arc::from(mark_failed_sql),
89        })
90    }
91
92    /// Underlying pool.
93    #[must_use]
94    pub fn pool(&self) -> &MySqlPool {
95        &self.pool
96    }
97
98    /// Configured table name.
99    #[must_use]
100    pub fn table_name(&self) -> &str {
101        &self.table_name
102    }
103}
104
105#[async_trait]
106impl OutboxStore for MySqlOutboxStore {
107    type Client = PoolConnection<MySql>;
108    type Tx<'tx> = Transaction<'tx, MySql>;
109
110    async fn acquire(&self) -> Result<Self::Client, OutboxError> {
111        self.pool.acquire().await.map_err(database_error)
112    }
113
114    async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
115        client.begin().await.map_err(database_error)
116    }
117
118    async fn poll<'a>(
119        &self,
120        tx: &mut Self::Tx<'a>,
121        batch_size: usize,
122        max_attempts: u32,
123    ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
124        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
125        let max = i32::try_from(max_attempts).unwrap_or(i32::MAX);
126        let rows = sqlx::query(&self.poll_sql)
127            .bind(max)
128            .bind(limit)
129            .fetch_all(&mut **tx)
130            .await
131            .map_err(database_error)?;
132
133        let mut envelopes = Vec::with_capacity(rows.len());
134        for row in rows {
135            let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
136            let event_type: String = row.try_get("event_type").map_err(database_error)?;
137            let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
138            let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
139            let created_at: PrimitiveDateTime =
140                row.try_get("created_at").map_err(database_error)?;
141            let attempts: i32 = row.try_get("attempts").map_err(database_error)?;
142            let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
143            let next_retry_at: Option<PrimitiveDateTime> =
144                row.try_get("next_retry_at").map_err(database_error)?;
145
146            let payload = serde_json::to_vec(&payload)?;
147
148            envelopes.push(assemble_envelope(
149                event_id,
150                event_type,
151                payload,
152                subject_id,
153                primitive_utc_to_system_time(created_at),
154                u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
155                last_error,
156                next_retry_at.map(primitive_utc_to_system_time),
157            ));
158        }
159        Ok(envelopes)
160    }
161
162    async fn mark_delivered<'a>(
163        &self,
164        tx: &mut Self::Tx<'a>,
165        event_id: Uuid,
166    ) -> Result<(), OutboxError> {
167        sqlx::query(&self.mark_delivered_sql)
168            .bind(event_id)
169            .execute(&mut **tx)
170            .await
171            .map_err(database_error)?;
172        Ok(())
173    }
174
175    async fn mark_failed<'a>(
176        &self,
177        tx: &mut Self::Tx<'a>,
178        event_id: Uuid,
179        error: &str,
180        next_retry_at: SystemTime,
181    ) -> Result<(), OutboxError> {
182        sqlx::query(&self.mark_failed_sql)
183            .bind(error)
184            .bind(to_primitive_utc(next_retry_at))
185            .bind(event_id)
186            .execute(&mut **tx)
187            .await
188            .map_err(database_error)?;
189        Ok(())
190    }
191
192    async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
193        tx.commit().await.map_err(database_error)
194    }
195}
196
197/// MySQL implementation of [`OutboxPublisher`] backed by `sqlx::MySqlPool`.
198///
199/// Cheap to clone (the pool and the cached insert statement are reference-counted).
200#[derive(Debug, Clone)]
201pub struct MySqlOutboxPublisher {
202    pool: MySqlPool,
203    table_name: Arc<str>,
204    insert_sql: Arc<str>,
205}
206
207impl MySqlOutboxPublisher {
208    /// Create a new publisher for the given pool and table.
209    ///
210    /// # Errors
211    ///
212    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
213    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
214    pub fn new(pool: MySqlPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
215        let table_name = table_name.into();
216        validate_table_name(&table_name)?;
217        let insert_sql = DIALECT.insert_sql(&table_name);
218        Ok(Self {
219            pool,
220            table_name: Arc::from(table_name),
221            insert_sql: Arc::from(insert_sql),
222        })
223    }
224
225    /// Underlying pool, exposed for callers that open their own transactions.
226    #[must_use]
227    pub fn pool(&self) -> &MySqlPool {
228        &self.pool
229    }
230
231    /// Configured table name.
232    #[must_use]
233    pub fn table_name(&self) -> &str {
234        &self.table_name
235    }
236}
237
238impl OutboxPublisher for MySqlOutboxPublisher {
239    type Tx<'tx> = Transaction<'tx, MySql>;
240
241    async fn publish_in_tx<E: Event>(
242        &self,
243        tx: &mut Self::Tx<'_>,
244        event: &E,
245    ) -> Result<Uuid, OutboxError> {
246        let event_id = Uuid::now_v7();
247        let payload = serde_json::to_value(event)?;
248        sqlx::query(&self.insert_sql)
249            .bind(event_id)
250            .bind(E::EVENT_TYPE)
251            .bind(payload)
252            .bind(Option::<Uuid>::None)
253            .execute(&mut **tx)
254            .await
255            .map_err(database_error)?;
256        Ok(event_id)
257    }
258
259    async fn publish_in_tx_with_subject<E: Event>(
260        &self,
261        tx: &mut Self::Tx<'_>,
262        subject_id: Uuid,
263        event: &E,
264    ) -> Result<Uuid, OutboxError> {
265        let event_id = Uuid::now_v7();
266        let payload = serde_json::to_value(event)?;
267        sqlx::query(&self.insert_sql)
268            .bind(event_id)
269            .bind(E::EVENT_TYPE)
270            .bind(payload)
271            .bind(Some(subject_id))
272            .execute(&mut **tx)
273            .await
274            .map_err(database_error)?;
275        Ok(event_id)
276    }
277
278    async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
279        let mut tx = self.pool.begin().await.map_err(database_error)?;
280        let event_id = self.publish_in_tx(&mut tx, event).await?;
281        tx.commit().await.map_err(database_error)?;
282        Ok(event_id)
283    }
284}
285
286/// Fluent builder for an [`OutboxWorker`] backed by [`MySqlOutboxStore`].
287pub struct MySqlOutboxWorkerBuilder {
288    pool: MySqlPool,
289    table_name: String,
290    handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
291    config: OutboxWorkerConfig,
292}
293
294impl MySqlOutboxWorkerBuilder {
295    /// Start a new builder for the given pool.
296    #[must_use]
297    pub fn new(pool: MySqlPool) -> Self {
298        Self {
299            pool,
300            table_name: DEFAULT_TABLE_NAME.to_owned(),
301            handlers: HashMap::new(),
302            config: OutboxWorkerConfig::default(),
303        }
304    }
305
306    /// Override the outbox table name (default `"audit_outbox"`).
307    #[must_use]
308    pub fn table_name(mut self, name: impl Into<String>) -> Self {
309        self.table_name = name.into();
310        self
311    }
312
313    /// Register a typed handler for the event type `E`.
314    ///
315    /// Registering twice for the same event type silently replaces the
316    /// previous handler.
317    #[must_use]
318    pub fn register_handler<E, H>(mut self, handler: H) -> Self
319    where
320        E: Event,
321        H: Handler<E>,
322    {
323        let typed = TypedHandler::<E, H>::new(handler);
324        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
325        self.handlers.insert(E::EVENT_TYPE, erased);
326        self
327    }
328
329    /// Register a handler already shared behind an `Arc`.
330    #[must_use]
331    pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
332    where
333        E: Event,
334        H: Handler<E>,
335    {
336        let typed = TypedHandler::<E, H>::shared(handler);
337        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
338        self.handlers.insert(E::EVENT_TYPE, erased);
339        self
340    }
341
342    /// Override the poll interval (default 100 ms).
343    #[must_use]
344    pub fn poll_interval(mut self, d: Duration) -> Self {
345        self.config.poll_interval = d;
346        self
347    }
348
349    /// Override the batch size per poll (default 10).
350    #[must_use]
351    pub fn batch_size(mut self, n: usize) -> Self {
352        self.config.batch_size = n;
353        self
354    }
355
356    /// Override the maximum number of attempts per envelope (default 5).
357    #[must_use]
358    pub fn max_attempts(mut self, n: u32) -> Self {
359        self.config.max_attempts = n;
360        self
361    }
362
363    /// Override the constant retry delay between failed attempts (default 5 s).
364    #[must_use]
365    pub fn retry_delay(mut self, d: Duration) -> Self {
366        self.config.retry_delay = d;
367        self
368    }
369
370    /// Consume the builder and produce an [`OutboxWorker`] ready to spawn.
371    ///
372    /// # Errors
373    ///
374    /// Returns [`OutboxError::Internal`] if the configured `table_name`
375    /// is not a valid identifier.
376    pub fn build(self) -> Result<OutboxWorker<MySqlOutboxStore>, OutboxError> {
377        let store = MySqlOutboxStore::new(self.pool, self.table_name)?;
378        Ok(OutboxWorker::new(store, self.handlers, self.config))
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use hexeract_core::HandlerContext;
386    use serde::Deserialize;
387    use serde::Serialize;
388
389    fn lazy_pool() -> MySqlPool {
390        MySqlPool::connect_lazy("mysql://nobody:nobody@127.0.0.1:1/nobody")
391            .expect("lazy pool must build from a valid URL")
392    }
393
394    #[derive(Debug, Serialize, Deserialize)]
395    struct UserRegistered {
396        user_id: Uuid,
397    }
398
399    impl Event for UserRegistered {
400        const EVENT_TYPE: &'static str = "users.registered";
401    }
402
403    #[derive(Debug, Serialize, Deserialize)]
404    struct OrderPlaced {
405        order_id: Uuid,
406    }
407
408    impl Event for OrderPlaced {
409        const EVENT_TYPE: &'static str = "orders.placed";
410    }
411
412    struct NoopHandler;
413
414    impl Handler<UserRegistered> for NoopHandler {
415        type Error = OutboxError;
416        async fn handle(
417            &self,
418            _event: UserRegistered,
419            _ctx: &HandlerContext,
420        ) -> Result<(), Self::Error> {
421            Ok(())
422        }
423    }
424
425    impl Handler<OrderPlaced> for NoopHandler {
426        type Error = OutboxError;
427        async fn handle(
428            &self,
429            _event: OrderPlaced,
430            _ctx: &HandlerContext,
431        ) -> Result<(), Self::Error> {
432            Ok(())
433        }
434    }
435
436    #[tokio::test]
437    async fn store_new_rejects_invalid_table_name() {
438        let err = MySqlOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
439        assert!(matches!(err, OutboxError::Internal(_)));
440    }
441
442    #[tokio::test]
443    async fn store_new_caches_mysql_sql_with_validated_table_name() {
444        let store = MySqlOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
445        assert_eq!(store.table_name(), "audit_outbox");
446        assert!(store.poll_sql.contains("FROM audit_outbox"));
447        assert!(store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
448        assert!(store.poll_sql.contains("UTC_TIMESTAMP()"));
449        assert!(store.mark_delivered_sql.contains("UPDATE audit_outbox"));
450        assert!(store.mark_failed_sql.contains("attempts = attempts + 1"));
451    }
452
453    #[tokio::test]
454    async fn publisher_new_caches_insert_sql_with_question_marks() {
455        let publisher = MySqlOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
456        assert_eq!(publisher.table_name(), "audit_outbox");
457        assert!(publisher.insert_sql.contains("INSERT INTO audit_outbox"));
458        assert!(publisher.insert_sql.contains("?, ?, ?, ?"));
459    }
460
461    #[tokio::test]
462    async fn builder_starts_with_default_table_and_empty_handlers() {
463        let builder = MySqlOutboxWorkerBuilder::new(lazy_pool());
464        assert_eq!(builder.table_name, DEFAULT_TABLE_NAME);
465        assert!(builder.handlers.is_empty());
466    }
467
468    #[tokio::test]
469    async fn builder_register_handler_records_event_types() {
470        let builder = MySqlOutboxWorkerBuilder::new(lazy_pool())
471            .register_handler::<UserRegistered, _>(NoopHandler)
472            .register_handler::<OrderPlaced, _>(NoopHandler);
473        assert_eq!(builder.handlers.len(), 2);
474        assert!(builder.handlers.contains_key("users.registered"));
475        assert!(builder.handlers.contains_key("orders.placed"));
476    }
477
478    #[tokio::test]
479    async fn builder_build_rejects_invalid_table_name() {
480        let result = MySqlOutboxWorkerBuilder::new(lazy_pool())
481            .table_name("bad name; DROP TABLE")
482            .build();
483        assert!(matches!(result, Err(OutboxError::Internal(_))));
484    }
485
486    #[tokio::test]
487    async fn builder_build_with_default_table_name_succeeds() {
488        let worker = MySqlOutboxWorkerBuilder::new(lazy_pool()).build();
489        assert!(worker.is_ok());
490    }
491}