1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use hexeract_outbox::ErasedHandler;
7use hexeract_outbox::Event;
8use hexeract_outbox::Handler;
9use hexeract_outbox::OutboxEnvelope;
10use hexeract_outbox::OutboxError;
11use hexeract_outbox::OutboxPublisher;
12use hexeract_outbox::OutboxStore;
13use hexeract_outbox::OutboxWorker;
14use hexeract_outbox::OutboxWorkerConfig;
15use hexeract_outbox::TypedHandler;
16use sqlx::Acquire;
17use sqlx::MySql;
18use sqlx::MySqlPool;
19use sqlx::Row;
20use sqlx::Transaction;
21use sqlx::pool::PoolConnection;
22use time::PrimitiveDateTime;
23use uuid::Uuid;
24
25use crate::DEFAULT_TABLE_NAME;
26use crate::dialect::Dialect;
27use crate::envelope::assemble_envelope;
28use crate::envelope::primitive_utc_to_system_time;
29use crate::validate::validate_event_type;
30use crate::validate::validate_table_name;
31
32const DIALECT: Dialect = Dialect::MySql;
33
34fn duration_to_micros(d: Duration) -> i64 {
37 i64::try_from(d.as_micros()).unwrap_or(i64::MAX)
38}
39
40fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
41 OutboxError::Database(Box::new(error))
42}
43
44fn pool_error(error: sqlx::Error) -> OutboxError {
45 if matches!(error, sqlx::Error::PoolTimedOut) {
46 OutboxError::PoolTimeout
47 } else {
48 OutboxError::Database(Box::new(error))
49 }
50}
51
52fn decode_mysql_row(row: &sqlx::mysql::MySqlRow) -> Result<OutboxEnvelope, OutboxError> {
57 let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
58 let event_type: String = row.try_get("event_type").map_err(database_error)?;
59 let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
60 let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
61 let created_at: PrimitiveDateTime = row.try_get("created_at").map_err(database_error)?;
62 let attempts: i32 = row.try_get("attempts").map_err(database_error)?;
63 let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
64 let next_retry_at: Option<PrimitiveDateTime> =
65 row.try_get("next_retry_at").map_err(database_error)?;
66
67 let payload = serde_json::to_vec(&payload)?;
68
69 Ok(assemble_envelope(
70 event_id,
71 event_type,
72 payload,
73 subject_id,
74 primitive_utc_to_system_time(created_at),
75 u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
76 last_error,
77 next_retry_at.map(primitive_utc_to_system_time),
78 ))
79}
80
81#[derive(Debug, Clone)]
82struct DeadLetterSql {
83 insert_sql: Arc<str>,
84 delete_sql: Arc<str>,
85}
86
87pub async fn ensure_schema(pool: &MySqlPool, table_name: &str) -> Result<(), OutboxError> {
98 let ddl = DIALECT.schema_ddl(table_name)?;
99 sqlx::raw_sql(&ddl)
100 .execute(pool)
101 .await
102 .map_err(database_error)?;
103 Ok(())
104}
105
106#[derive(Debug, Clone)]
110pub struct MySqlOutboxStore {
111 pool: MySqlPool,
112 table_name: Arc<str>,
113 poll_sql: Arc<str>,
114 mark_delivered_sql: Arc<str>,
115 mark_failed_sql: Arc<str>,
116 dead_letter: Option<Arc<DeadLetterSql>>,
117}
118
119impl MySqlOutboxStore {
120 pub fn new(pool: MySqlPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
127 let table_name = table_name.into();
128 validate_table_name(&table_name)?;
129 let poll_sql = DIALECT.poll_sql(&table_name);
130 let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
131 let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
132 Ok(Self {
133 pool,
134 table_name: Arc::from(table_name),
135 poll_sql: Arc::from(poll_sql),
136 mark_delivered_sql: Arc::from(mark_delivered_sql),
137 mark_failed_sql: Arc::from(mark_failed_sql),
138 dead_letter: None,
139 })
140 }
141
142 #[must_use]
144 pub fn pool(&self) -> &MySqlPool {
145 &self.pool
146 }
147
148 #[must_use]
150 pub fn table_name(&self) -> &str {
151 &self.table_name
152 }
153
154 pub fn with_dead_letter(mut self, dlq_table: impl Into<String>) -> Result<Self, OutboxError> {
160 let dlq = dlq_table.into();
161 validate_table_name(&dlq)?;
162 let insert_sql = DIALECT.insert_dead_letter_sql(&self.table_name, &dlq);
163 let delete_sql = DIALECT.delete_from_main_sql(&self.table_name);
164 self.dead_letter = Some(Arc::new(DeadLetterSql {
165 insert_sql: Arc::from(insert_sql),
166 delete_sql: Arc::from(delete_sql),
167 }));
168 Ok(self)
169 }
170}
171
172#[async_trait]
173impl OutboxStore for MySqlOutboxStore {
174 type Client = PoolConnection<MySql>;
175 type Tx<'tx> = Transaction<'tx, MySql>;
176
177 async fn acquire(&self) -> Result<Self::Client, OutboxError> {
178 self.pool.acquire().await.map_err(pool_error)
179 }
180
181 async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
182 client.begin().await.map_err(database_error)
183 }
184
185 async fn poll<'a>(
186 &self,
187 tx: &mut Self::Tx<'a>,
188 batch_size: usize,
189 max_attempts: u32,
190 ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
191 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
192 let max = i32::try_from(max_attempts).unwrap_or(i32::MAX);
193 let rows = sqlx::query(&self.poll_sql)
194 .bind(max)
195 .bind(limit)
196 .fetch_all(&mut **tx)
197 .await
198 .map_err(database_error)?;
199
200 let mut envelopes = Vec::with_capacity(rows.len());
201 for row in rows {
202 match decode_mysql_row(&row) {
206 Ok(envelope) => envelopes.push(envelope),
207 Err(error) => {
208 let event_id = row.try_get::<Uuid, _>("event_id").ok();
209 tracing::error!(
210 ?event_id,
211 error = %error,
212 "skipping undecodable outbox row; the rest of the batch continues"
213 );
214 }
215 }
216 }
217 Ok(envelopes)
218 }
219
220 async fn mark_delivered<'a>(
221 &self,
222 tx: &mut Self::Tx<'a>,
223 event_id: Uuid,
224 ) -> Result<(), OutboxError> {
225 sqlx::query(&self.mark_delivered_sql)
226 .bind(event_id)
227 .execute(&mut **tx)
228 .await
229 .map_err(database_error)?;
230 Ok(())
231 }
232
233 async fn mark_failed<'a>(
234 &self,
235 tx: &mut Self::Tx<'a>,
236 event_id: Uuid,
237 error: &str,
238 retry_in: Duration,
239 ) -> Result<(), OutboxError> {
240 sqlx::query(&self.mark_failed_sql)
242 .bind(error)
243 .bind(duration_to_micros(retry_in))
244 .bind(event_id)
245 .execute(&mut **tx)
246 .await
247 .map_err(database_error)?;
248 Ok(())
249 }
250
251 async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
252 tx.commit().await.map_err(database_error)
253 }
254
255 async fn mark_dead_lettered<'a>(
256 &self,
257 tx: &mut Self::Tx<'a>,
258 event_id: Uuid,
259 _error: &str,
260 ) -> Result<(), OutboxError> {
261 let Some(dlq) = &self.dead_letter else {
262 return Ok(());
263 };
264 sqlx::query(&dlq.insert_sql)
265 .bind(event_id)
266 .execute(&mut **tx)
267 .await
268 .map_err(database_error)?;
269 sqlx::query(&dlq.delete_sql)
270 .bind(event_id)
271 .execute(&mut **tx)
272 .await
273 .map_err(database_error)?;
274 Ok(())
275 }
276
277 async fn claim<'a>(
278 &self,
279 tx: &mut Self::Tx<'a>,
280 event_ids: &[Uuid],
281 lease_for: Duration,
282 ) -> Result<(), OutboxError> {
283 if event_ids.is_empty() {
284 return Ok(());
285 }
286 let sql = DIALECT.claim_sql(&self.table_name, event_ids.len());
287 let mut query = sqlx::query(&sql).bind(duration_to_micros(lease_for));
289 for id in event_ids {
290 query = query.bind(*id);
291 }
292 query.execute(&mut **tx).await.map_err(database_error)?;
293 Ok(())
294 }
295}
296
297#[derive(Debug, Clone)]
301pub struct MySqlOutboxPublisher {
302 pool: MySqlPool,
303 table_name: Arc<str>,
304 insert_sql: Arc<str>,
305}
306
307impl MySqlOutboxPublisher {
308 pub fn new(pool: MySqlPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
315 let table_name = table_name.into();
316 validate_table_name(&table_name)?;
317 let insert_sql = DIALECT.insert_sql(&table_name);
318 Ok(Self {
319 pool,
320 table_name: Arc::from(table_name),
321 insert_sql: Arc::from(insert_sql),
322 })
323 }
324
325 #[must_use]
327 pub fn pool(&self) -> &MySqlPool {
328 &self.pool
329 }
330
331 #[must_use]
333 pub fn table_name(&self) -> &str {
334 &self.table_name
335 }
336}
337
338impl OutboxPublisher for MySqlOutboxPublisher {
339 type Tx<'tx> = Transaction<'tx, MySql>;
340
341 async fn publish_in_tx<E: Event>(
342 &self,
343 tx: &mut Self::Tx<'_>,
344 event: &E,
345 ) -> Result<Uuid, OutboxError> {
346 validate_event_type(E::EVENT_TYPE)?;
347 let event_id = Uuid::now_v7();
348 let payload = serde_json::to_value(event)?;
349 sqlx::query(&self.insert_sql)
350 .bind(event_id)
351 .bind(E::EVENT_TYPE)
352 .bind(payload)
353 .bind(Option::<Uuid>::None)
354 .execute(&mut **tx)
355 .await
356 .map_err(database_error)?;
357 Ok(event_id)
358 }
359
360 async fn publish_in_tx_with_subject<E: Event>(
361 &self,
362 tx: &mut Self::Tx<'_>,
363 subject_id: Uuid,
364 event: &E,
365 ) -> Result<Uuid, OutboxError> {
366 validate_event_type(E::EVENT_TYPE)?;
367 let event_id = Uuid::now_v7();
368 let payload = serde_json::to_value(event)?;
369 sqlx::query(&self.insert_sql)
370 .bind(event_id)
371 .bind(E::EVENT_TYPE)
372 .bind(payload)
373 .bind(Some(subject_id))
374 .execute(&mut **tx)
375 .await
376 .map_err(database_error)?;
377 Ok(event_id)
378 }
379
380 async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
381 let mut tx = self.pool.begin().await.map_err(database_error)?;
382 let event_id = self.publish_in_tx(&mut tx, event).await?;
383 tx.commit().await.map_err(database_error)?;
384 Ok(event_id)
385 }
386}
387
388pub struct MySqlOutboxWorkerBuilder {
420 pool: MySqlPool,
421 table_name: String,
422 dead_letter_table: Option<String>,
423 handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
424 config: OutboxWorkerConfig,
425}
426
427impl MySqlOutboxWorkerBuilder {
428 #[must_use]
430 pub fn new(pool: MySqlPool) -> Self {
431 Self {
432 pool,
433 table_name: DEFAULT_TABLE_NAME.to_owned(),
434 dead_letter_table: None,
435 handlers: HashMap::new(),
436 config: OutboxWorkerConfig::default(),
437 }
438 }
439
440 #[must_use]
442 pub fn table_name(mut self, name: impl Into<String>) -> Self {
443 self.table_name = name.into();
444 self
445 }
446
447 #[must_use]
449 pub fn dead_letter_table(mut self, name: impl Into<String>) -> Self {
450 self.dead_letter_table = Some(name.into());
451 self
452 }
453
454 #[must_use]
459 pub fn register_handler<E, H>(mut self, handler: H) -> Self
460 where
461 E: Event,
462 H: Handler<E>,
463 {
464 let typed = TypedHandler::<E, H>::new(handler);
465 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
466 self.handlers.insert(E::EVENT_TYPE, erased);
467 self
468 }
469
470 #[must_use]
472 pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
473 where
474 E: Event,
475 H: Handler<E>,
476 {
477 let typed = TypedHandler::<E, H>::shared(handler);
478 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
479 self.handlers.insert(E::EVENT_TYPE, erased);
480 self
481 }
482
483 #[must_use]
485 pub fn poll_interval(mut self, d: Duration) -> Self {
486 self.config.poll_interval = d;
487 self
488 }
489
490 #[must_use]
492 pub fn batch_size(mut self, n: usize) -> Self {
493 self.config.batch_size = n;
494 self
495 }
496
497 #[must_use]
499 pub fn max_attempts(mut self, n: u32) -> Self {
500 self.config.max_attempts = n;
501 self
502 }
503
504 #[must_use]
506 pub fn retry_base_delay(mut self, d: Duration) -> Self {
507 self.config.retry_base_delay = d;
508 self
509 }
510
511 #[must_use]
513 pub fn retry_max_delay(mut self, d: Duration) -> Self {
514 self.config.retry_max_delay = d;
515 self
516 }
517
518 #[must_use]
520 pub fn jitter(mut self, enabled: bool) -> Self {
521 self.config.jitter = enabled;
522 self
523 }
524
525 #[must_use]
532 pub fn dispatch_timeout(mut self, d: Duration) -> Self {
533 self.config.dispatch_timeout = d;
534 self
535 }
536
537 pub fn build(self) -> Result<OutboxWorker<MySqlOutboxStore>, OutboxError> {
544 let mut store = MySqlOutboxStore::new(self.pool, self.table_name)?;
545 if let Some(dlq) = self.dead_letter_table {
546 store = store.with_dead_letter(dlq)?;
547 }
548 Ok(OutboxWorker::new(store, self.handlers, self.config))
549 }
550}
551
552pub async fn ensure_dead_letter_schema(
561 pool: &MySqlPool,
562 table_name: &str,
563) -> Result<(), OutboxError> {
564 let ddl = DIALECT.dead_letter_schema_ddl(table_name)?;
565 sqlx::raw_sql(&ddl)
566 .execute(pool)
567 .await
568 .map_err(database_error)?;
569 Ok(())
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use hexeract_core::HandlerContext;
576 use serde::Deserialize;
577 use serde::Serialize;
578
579 fn lazy_pool() -> MySqlPool {
580 MySqlPool::connect_lazy("mysql://nobody:nobody@127.0.0.1:1/nobody")
581 .expect("lazy pool must build from a valid URL")
582 }
583
584 #[derive(Debug, Serialize, Deserialize)]
585 struct UserRegistered {
586 user_id: Uuid,
587 }
588
589 impl Event for UserRegistered {
590 const EVENT_TYPE: &'static str = "users.registered";
591 }
592
593 #[derive(Debug, Serialize, Deserialize)]
594 struct OrderPlaced {
595 order_id: Uuid,
596 }
597
598 impl Event for OrderPlaced {
599 const EVENT_TYPE: &'static str = "orders.placed";
600 }
601
602 struct NoopHandler;
603
604 impl Handler<UserRegistered> for NoopHandler {
605 type Error = OutboxError;
606 async fn handle(
607 &self,
608 _event: UserRegistered,
609 _ctx: &HandlerContext,
610 ) -> Result<(), Self::Error> {
611 Ok(())
612 }
613 }
614
615 impl Handler<OrderPlaced> for NoopHandler {
616 type Error = OutboxError;
617 async fn handle(
618 &self,
619 _event: OrderPlaced,
620 _ctx: &HandlerContext,
621 ) -> Result<(), Self::Error> {
622 Ok(())
623 }
624 }
625
626 #[test]
627 fn pool_error_maps_pool_timed_out_to_pool_timeout_variant() {
628 let err = pool_error(sqlx::Error::PoolTimedOut);
629 assert!(
630 matches!(err, OutboxError::PoolTimeout),
631 "PoolTimedOut must map to OutboxError::PoolTimeout, got {err:?}"
632 );
633 }
634
635 #[test]
636 fn pool_error_wraps_other_errors_as_database_error() {
637 let err = pool_error(sqlx::Error::RowNotFound);
638 assert!(
639 matches!(err, OutboxError::Database(_)),
640 "non-timeout errors must map to OutboxError::Database, got {err:?}"
641 );
642 }
643
644 #[tokio::test]
645 async fn store_new_rejects_invalid_table_name() {
646 let err = MySqlOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
647 assert!(matches!(err, OutboxError::Internal(_)));
648 }
649
650 #[tokio::test]
651 async fn store_new_caches_mysql_sql_with_validated_table_name() {
652 let store = MySqlOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
653 assert_eq!(store.table_name(), "audit_outbox");
654 assert!(store.poll_sql.contains("FROM `audit_outbox`"));
655 assert!(store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
656 assert!(store.poll_sql.contains("UTC_TIMESTAMP(6)"));
657 assert!(store.mark_delivered_sql.contains("UPDATE `audit_outbox`"));
658 assert!(!store.mark_failed_sql.contains("attempts = attempts + 1"));
661 }
662
663 #[tokio::test]
664 async fn publisher_new_caches_insert_sql_with_question_marks() {
665 let publisher = MySqlOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
666 assert_eq!(publisher.table_name(), "audit_outbox");
667 assert!(publisher.insert_sql.contains("INSERT INTO `audit_outbox`"));
668 assert!(publisher.insert_sql.contains("?, ?, ?, ?"));
669 }
670
671 #[tokio::test]
672 async fn builder_starts_with_default_table_and_empty_handlers() {
673 let builder = MySqlOutboxWorkerBuilder::new(lazy_pool());
674 assert_eq!(builder.table_name, DEFAULT_TABLE_NAME);
675 assert!(builder.handlers.is_empty());
676 }
677
678 #[tokio::test]
679 async fn builder_register_handler_records_event_types() {
680 let builder = MySqlOutboxWorkerBuilder::new(lazy_pool())
681 .register_handler::<UserRegistered, _>(NoopHandler)
682 .register_handler::<OrderPlaced, _>(NoopHandler);
683 assert_eq!(builder.handlers.len(), 2);
684 assert!(builder.handlers.contains_key("users.registered"));
685 assert!(builder.handlers.contains_key("orders.placed"));
686 }
687
688 #[tokio::test]
689 async fn builder_build_rejects_invalid_table_name() {
690 let result = MySqlOutboxWorkerBuilder::new(lazy_pool())
691 .table_name("bad name; DROP TABLE")
692 .build();
693 assert!(matches!(result, Err(OutboxError::Internal(_))));
694 }
695
696 #[tokio::test]
697 async fn builder_build_with_default_table_name_succeeds() {
698 let worker = MySqlOutboxWorkerBuilder::new(lazy_pool()).build();
699 assert!(worker.is_ok());
700 }
701
702 #[tokio::test]
703 async fn builder_dispatch_timeout_overrides_default() {
704 let worker = MySqlOutboxWorkerBuilder::new(lazy_pool())
705 .dispatch_timeout(Duration::from_secs(60))
706 .build()
707 .unwrap();
708 drop(worker);
709 }
710
711 #[test]
712 fn store_claim_sql_embeds_table_name_and_question_mark_placeholders() {
713 let sql = DIALECT.claim_sql("audit_outbox", 2);
714 assert!(sql.contains("UPDATE `audit_outbox`"));
715 assert!(sql.contains("next_retry_at = (UTC_TIMESTAMP(6) + INTERVAL ? MICROSECOND)"));
716 assert!(sql.contains("WHERE event_id IN (?, ?)"));
717 assert!(sql.contains("attempts = attempts + 1"));
718 }
719}