1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4use std::time::SystemTime;
5
6use async_trait::async_trait;
7use hexeract_outbox::ErasedHandler;
8use hexeract_outbox::Event;
9use hexeract_outbox::Handler;
10use hexeract_outbox::OutboxEnvelope;
11use hexeract_outbox::OutboxError;
12use hexeract_outbox::OutboxPublisher;
13use hexeract_outbox::OutboxStore;
14use hexeract_outbox::OutboxWorker;
15use hexeract_outbox::OutboxWorkerConfig;
16use hexeract_outbox::TypedHandler;
17use sqlx::Acquire;
18use sqlx::MySql;
19use sqlx::MySqlPool;
20use sqlx::Row;
21use sqlx::Transaction;
22use sqlx::pool::PoolConnection;
23use time::PrimitiveDateTime;
24use uuid::Uuid;
25
26use crate::DEFAULT_TABLE_NAME;
27use crate::dialect::Dialect;
28use crate::envelope::assemble_envelope;
29use crate::envelope::primitive_utc_to_system_time;
30use crate::envelope::to_primitive_utc;
31use crate::validate::validate_table_name;
32
33const DIALECT: Dialect = Dialect::MySql;
34
35fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
36 OutboxError::Database(Box::new(error))
37}
38
39pub async fn ensure_schema(pool: &MySqlPool, table_name: &str) -> Result<(), OutboxError> {
50 let ddl = DIALECT.schema_ddl(table_name)?;
51 sqlx::raw_sql(&ddl)
52 .execute(pool)
53 .await
54 .map_err(database_error)?;
55 Ok(())
56}
57
58#[derive(Debug, Clone)]
62pub struct MySqlOutboxStore {
63 pool: MySqlPool,
64 table_name: Arc<str>,
65 poll_sql: Arc<str>,
66 mark_delivered_sql: Arc<str>,
67 mark_failed_sql: Arc<str>,
68}
69
70impl MySqlOutboxStore {
71 pub fn new(pool: MySqlPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
78 let table_name = table_name.into();
79 validate_table_name(&table_name)?;
80 let poll_sql = DIALECT.poll_sql(&table_name);
81 let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
82 let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
83 Ok(Self {
84 pool,
85 table_name: Arc::from(table_name),
86 poll_sql: Arc::from(poll_sql),
87 mark_delivered_sql: Arc::from(mark_delivered_sql),
88 mark_failed_sql: Arc::from(mark_failed_sql),
89 })
90 }
91
92 #[must_use]
94 pub fn pool(&self) -> &MySqlPool {
95 &self.pool
96 }
97
98 #[must_use]
100 pub fn table_name(&self) -> &str {
101 &self.table_name
102 }
103}
104
105#[async_trait]
106impl OutboxStore for MySqlOutboxStore {
107 type Client = PoolConnection<MySql>;
108 type Tx<'tx> = Transaction<'tx, MySql>;
109
110 async fn acquire(&self) -> Result<Self::Client, OutboxError> {
111 self.pool.acquire().await.map_err(database_error)
112 }
113
114 async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
115 client.begin().await.map_err(database_error)
116 }
117
118 async fn poll<'a>(
119 &self,
120 tx: &mut Self::Tx<'a>,
121 batch_size: usize,
122 max_attempts: u32,
123 ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
124 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
125 let max = i32::try_from(max_attempts).unwrap_or(i32::MAX);
126 let rows = sqlx::query(&self.poll_sql)
127 .bind(max)
128 .bind(limit)
129 .fetch_all(&mut **tx)
130 .await
131 .map_err(database_error)?;
132
133 let mut envelopes = Vec::with_capacity(rows.len());
134 for row in rows {
135 let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
136 let event_type: String = row.try_get("event_type").map_err(database_error)?;
137 let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
138 let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
139 let created_at: PrimitiveDateTime =
140 row.try_get("created_at").map_err(database_error)?;
141 let attempts: i32 = row.try_get("attempts").map_err(database_error)?;
142 let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
143 let next_retry_at: Option<PrimitiveDateTime> =
144 row.try_get("next_retry_at").map_err(database_error)?;
145
146 let payload = serde_json::to_vec(&payload)?;
147
148 envelopes.push(assemble_envelope(
149 event_id,
150 event_type,
151 payload,
152 subject_id,
153 primitive_utc_to_system_time(created_at),
154 u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
155 last_error,
156 next_retry_at.map(primitive_utc_to_system_time),
157 ));
158 }
159 Ok(envelopes)
160 }
161
162 async fn mark_delivered<'a>(
163 &self,
164 tx: &mut Self::Tx<'a>,
165 event_id: Uuid,
166 ) -> Result<(), OutboxError> {
167 sqlx::query(&self.mark_delivered_sql)
168 .bind(event_id)
169 .execute(&mut **tx)
170 .await
171 .map_err(database_error)?;
172 Ok(())
173 }
174
175 async fn mark_failed<'a>(
176 &self,
177 tx: &mut Self::Tx<'a>,
178 event_id: Uuid,
179 error: &str,
180 next_retry_at: SystemTime,
181 ) -> Result<(), OutboxError> {
182 sqlx::query(&self.mark_failed_sql)
183 .bind(error)
184 .bind(to_primitive_utc(next_retry_at))
185 .bind(event_id)
186 .execute(&mut **tx)
187 .await
188 .map_err(database_error)?;
189 Ok(())
190 }
191
192 async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
193 tx.commit().await.map_err(database_error)
194 }
195}
196
197#[derive(Debug, Clone)]
201pub struct MySqlOutboxPublisher {
202 pool: MySqlPool,
203 table_name: Arc<str>,
204 insert_sql: Arc<str>,
205}
206
207impl MySqlOutboxPublisher {
208 pub fn new(pool: MySqlPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
215 let table_name = table_name.into();
216 validate_table_name(&table_name)?;
217 let insert_sql = DIALECT.insert_sql(&table_name);
218 Ok(Self {
219 pool,
220 table_name: Arc::from(table_name),
221 insert_sql: Arc::from(insert_sql),
222 })
223 }
224
225 #[must_use]
227 pub fn pool(&self) -> &MySqlPool {
228 &self.pool
229 }
230
231 #[must_use]
233 pub fn table_name(&self) -> &str {
234 &self.table_name
235 }
236}
237
238impl OutboxPublisher for MySqlOutboxPublisher {
239 type Tx<'tx> = Transaction<'tx, MySql>;
240
241 async fn publish_in_tx<E: Event>(
242 &self,
243 tx: &mut Self::Tx<'_>,
244 event: &E,
245 ) -> Result<Uuid, OutboxError> {
246 let event_id = Uuid::now_v7();
247 let payload = serde_json::to_value(event)?;
248 sqlx::query(&self.insert_sql)
249 .bind(event_id)
250 .bind(E::EVENT_TYPE)
251 .bind(payload)
252 .bind(Option::<Uuid>::None)
253 .execute(&mut **tx)
254 .await
255 .map_err(database_error)?;
256 Ok(event_id)
257 }
258
259 async fn publish_in_tx_with_subject<E: Event>(
260 &self,
261 tx: &mut Self::Tx<'_>,
262 subject_id: Uuid,
263 event: &E,
264 ) -> Result<Uuid, OutboxError> {
265 let event_id = Uuid::now_v7();
266 let payload = serde_json::to_value(event)?;
267 sqlx::query(&self.insert_sql)
268 .bind(event_id)
269 .bind(E::EVENT_TYPE)
270 .bind(payload)
271 .bind(Some(subject_id))
272 .execute(&mut **tx)
273 .await
274 .map_err(database_error)?;
275 Ok(event_id)
276 }
277
278 async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
279 let mut tx = self.pool.begin().await.map_err(database_error)?;
280 let event_id = self.publish_in_tx(&mut tx, event).await?;
281 tx.commit().await.map_err(database_error)?;
282 Ok(event_id)
283 }
284}
285
286pub struct MySqlOutboxWorkerBuilder {
288 pool: MySqlPool,
289 table_name: String,
290 handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
291 config: OutboxWorkerConfig,
292}
293
294impl MySqlOutboxWorkerBuilder {
295 #[must_use]
297 pub fn new(pool: MySqlPool) -> Self {
298 Self {
299 pool,
300 table_name: DEFAULT_TABLE_NAME.to_owned(),
301 handlers: HashMap::new(),
302 config: OutboxWorkerConfig::default(),
303 }
304 }
305
306 #[must_use]
308 pub fn table_name(mut self, name: impl Into<String>) -> Self {
309 self.table_name = name.into();
310 self
311 }
312
313 #[must_use]
318 pub fn register_handler<E, H>(mut self, handler: H) -> Self
319 where
320 E: Event,
321 H: Handler<E>,
322 {
323 let typed = TypedHandler::<E, H>::new(handler);
324 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
325 self.handlers.insert(E::EVENT_TYPE, erased);
326 self
327 }
328
329 #[must_use]
331 pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
332 where
333 E: Event,
334 H: Handler<E>,
335 {
336 let typed = TypedHandler::<E, H>::shared(handler);
337 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
338 self.handlers.insert(E::EVENT_TYPE, erased);
339 self
340 }
341
342 #[must_use]
344 pub fn poll_interval(mut self, d: Duration) -> Self {
345 self.config.poll_interval = d;
346 self
347 }
348
349 #[must_use]
351 pub fn batch_size(mut self, n: usize) -> Self {
352 self.config.batch_size = n;
353 self
354 }
355
356 #[must_use]
358 pub fn max_attempts(mut self, n: u32) -> Self {
359 self.config.max_attempts = n;
360 self
361 }
362
363 #[must_use]
365 pub fn retry_delay(mut self, d: Duration) -> Self {
366 self.config.retry_delay = d;
367 self
368 }
369
370 pub fn build(self) -> Result<OutboxWorker<MySqlOutboxStore>, OutboxError> {
377 let store = MySqlOutboxStore::new(self.pool, self.table_name)?;
378 Ok(OutboxWorker::new(store, self.handlers, self.config))
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use hexeract_core::HandlerContext;
386 use serde::Deserialize;
387 use serde::Serialize;
388
389 fn lazy_pool() -> MySqlPool {
390 MySqlPool::connect_lazy("mysql://nobody:nobody@127.0.0.1:1/nobody")
391 .expect("lazy pool must build from a valid URL")
392 }
393
394 #[derive(Debug, Serialize, Deserialize)]
395 struct UserRegistered {
396 user_id: Uuid,
397 }
398
399 impl Event for UserRegistered {
400 const EVENT_TYPE: &'static str = "users.registered";
401 }
402
403 #[derive(Debug, Serialize, Deserialize)]
404 struct OrderPlaced {
405 order_id: Uuid,
406 }
407
408 impl Event for OrderPlaced {
409 const EVENT_TYPE: &'static str = "orders.placed";
410 }
411
412 struct NoopHandler;
413
414 impl Handler<UserRegistered> for NoopHandler {
415 type Error = OutboxError;
416 async fn handle(
417 &self,
418 _event: UserRegistered,
419 _ctx: &HandlerContext,
420 ) -> Result<(), Self::Error> {
421 Ok(())
422 }
423 }
424
425 impl Handler<OrderPlaced> for NoopHandler {
426 type Error = OutboxError;
427 async fn handle(
428 &self,
429 _event: OrderPlaced,
430 _ctx: &HandlerContext,
431 ) -> Result<(), Self::Error> {
432 Ok(())
433 }
434 }
435
436 #[tokio::test]
437 async fn store_new_rejects_invalid_table_name() {
438 let err = MySqlOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
439 assert!(matches!(err, OutboxError::Internal(_)));
440 }
441
442 #[tokio::test]
443 async fn store_new_caches_mysql_sql_with_validated_table_name() {
444 let store = MySqlOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
445 assert_eq!(store.table_name(), "audit_outbox");
446 assert!(store.poll_sql.contains("FROM audit_outbox"));
447 assert!(store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
448 assert!(store.poll_sql.contains("UTC_TIMESTAMP()"));
449 assert!(store.mark_delivered_sql.contains("UPDATE audit_outbox"));
450 assert!(store.mark_failed_sql.contains("attempts = attempts + 1"));
451 }
452
453 #[tokio::test]
454 async fn publisher_new_caches_insert_sql_with_question_marks() {
455 let publisher = MySqlOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
456 assert_eq!(publisher.table_name(), "audit_outbox");
457 assert!(publisher.insert_sql.contains("INSERT INTO audit_outbox"));
458 assert!(publisher.insert_sql.contains("?, ?, ?, ?"));
459 }
460
461 #[tokio::test]
462 async fn builder_starts_with_default_table_and_empty_handlers() {
463 let builder = MySqlOutboxWorkerBuilder::new(lazy_pool());
464 assert_eq!(builder.table_name, DEFAULT_TABLE_NAME);
465 assert!(builder.handlers.is_empty());
466 }
467
468 #[tokio::test]
469 async fn builder_register_handler_records_event_types() {
470 let builder = MySqlOutboxWorkerBuilder::new(lazy_pool())
471 .register_handler::<UserRegistered, _>(NoopHandler)
472 .register_handler::<OrderPlaced, _>(NoopHandler);
473 assert_eq!(builder.handlers.len(), 2);
474 assert!(builder.handlers.contains_key("users.registered"));
475 assert!(builder.handlers.contains_key("orders.placed"));
476 }
477
478 #[tokio::test]
479 async fn builder_build_rejects_invalid_table_name() {
480 let result = MySqlOutboxWorkerBuilder::new(lazy_pool())
481 .table_name("bad name; DROP TABLE")
482 .build();
483 assert!(matches!(result, Err(OutboxError::Internal(_))));
484 }
485
486 #[tokio::test]
487 async fn builder_build_with_default_table_name_succeeds() {
488 let worker = MySqlOutboxWorkerBuilder::new(lazy_pool()).build();
489 assert!(worker.is_ok());
490 }
491}