1use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use hexeract_outbox::ErasedHandler;
19use hexeract_outbox::Event;
20use hexeract_outbox::Handler;
21use hexeract_outbox::OutboxEnvelope;
22use hexeract_outbox::OutboxError;
23use hexeract_outbox::OutboxPublisher;
24use hexeract_outbox::OutboxStore;
25use hexeract_outbox::OutboxWorker;
26use hexeract_outbox::OutboxWorkerConfig;
27use hexeract_outbox::TypedHandler;
28use sqlx::Acquire;
29use sqlx::Row;
30use sqlx::Sqlite;
31use sqlx::SqlitePool;
32use sqlx::Transaction;
33use sqlx::pool::PoolConnection;
34use uuid::Uuid;
35
36use crate::DEFAULT_TABLE_NAME;
37use crate::dialect::Dialect;
38use crate::envelope::assemble_envelope;
39use crate::envelope::parse_sqlite_utc;
40use crate::validate::validate_event_type;
41use crate::validate::validate_table_name;
42
43const DIALECT: Dialect = Dialect::Sqlite;
44
45const MAX_SQLITE_INTERVAL_SECS: u64 = 9_223_372_036; fn sqlite_seconds_modifier(d: Duration) -> String {
62 let capped = d.min(Duration::from_secs(MAX_SQLITE_INTERVAL_SECS));
63 format!("+{:.3} seconds", capped.as_secs_f64())
64}
65
66fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
67 OutboxError::Database(Box::new(error))
68}
69
70fn pool_error(error: sqlx::Error) -> OutboxError {
71 if matches!(error, sqlx::Error::PoolTimedOut) {
72 OutboxError::PoolTimeout
73 } else {
74 OutboxError::Database(Box::new(error))
75 }
76}
77
78fn decode_sqlite_row(row: &sqlx::sqlite::SqliteRow) -> Result<OutboxEnvelope, OutboxError> {
84 let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
85 let event_type: String = row.try_get("event_type").map_err(database_error)?;
86 let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
87 let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
88 let created_at: String = row.try_get("created_at").map_err(database_error)?;
89 let attempts: i64 = row.try_get("attempts").map_err(database_error)?;
90 let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
91 let next_retry_at: Option<String> = row.try_get("next_retry_at").map_err(database_error)?;
92
93 let payload = serde_json::to_vec(&payload)?;
94 let next_retry_at = next_retry_at.as_deref().map(parse_sqlite_utc).transpose()?;
95
96 Ok(assemble_envelope(
97 event_id,
98 event_type,
99 payload,
100 subject_id,
101 parse_sqlite_utc(&created_at)?,
102 u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
103 last_error,
104 next_retry_at,
105 ))
106}
107
108#[derive(Debug, Clone)]
109struct DeadLetterSql {
110 insert_sql: Arc<str>,
111 delete_sql: Arc<str>,
112}
113
114pub async fn ensure_schema(pool: &SqlitePool, table_name: &str) -> Result<(), OutboxError> {
125 let ddl = DIALECT.schema_ddl(table_name)?;
126 sqlx::raw_sql(&ddl)
127 .execute(pool)
128 .await
129 .map_err(database_error)?;
130 Ok(())
131}
132
133#[derive(Debug, Clone)]
138pub struct SqliteOutboxStore {
139 pool: SqlitePool,
140 table_name: Arc<str>,
141 poll_sql: Arc<str>,
142 mark_delivered_sql: Arc<str>,
143 mark_failed_sql: Arc<str>,
144 dead_letter: Option<Arc<DeadLetterSql>>,
145}
146
147impl SqliteOutboxStore {
148 pub fn new(pool: SqlitePool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
155 let table_name = table_name.into();
156 validate_table_name(&table_name)?;
157 let poll_sql = DIALECT.poll_sql(&table_name);
158 let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
159 let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
160 Ok(Self {
161 pool,
162 table_name: Arc::from(table_name),
163 poll_sql: Arc::from(poll_sql),
164 mark_delivered_sql: Arc::from(mark_delivered_sql),
165 mark_failed_sql: Arc::from(mark_failed_sql),
166 dead_letter: None,
167 })
168 }
169
170 #[must_use]
172 pub fn pool(&self) -> &SqlitePool {
173 &self.pool
174 }
175
176 #[must_use]
178 pub fn table_name(&self) -> &str {
179 &self.table_name
180 }
181
182 pub fn with_dead_letter(mut self, dlq_table: impl Into<String>) -> Result<Self, OutboxError> {
188 let dlq = dlq_table.into();
189 validate_table_name(&dlq)?;
190 let insert_sql = DIALECT.insert_dead_letter_sql(&self.table_name, &dlq);
191 let delete_sql = DIALECT.delete_from_main_sql(&self.table_name);
192 self.dead_letter = Some(Arc::new(DeadLetterSql {
193 insert_sql: Arc::from(insert_sql),
194 delete_sql: Arc::from(delete_sql),
195 }));
196 Ok(self)
197 }
198}
199
200#[async_trait]
201impl OutboxStore for SqliteOutboxStore {
202 type Client = PoolConnection<Sqlite>;
203 type Tx<'tx> = Transaction<'tx, Sqlite>;
204
205 async fn acquire(&self) -> Result<Self::Client, OutboxError> {
206 self.pool.acquire().await.map_err(pool_error)
207 }
208
209 async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
210 client.begin().await.map_err(database_error)
211 }
212
213 async fn poll<'a>(
214 &self,
215 tx: &mut Self::Tx<'a>,
216 batch_size: usize,
217 max_attempts: u32,
218 ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
219 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
220 let max = i64::from(max_attempts);
221 let rows = sqlx::query(&self.poll_sql)
222 .bind(max)
223 .bind(limit)
224 .fetch_all(&mut **tx)
225 .await
226 .map_err(database_error)?;
227
228 let mut envelopes = Vec::with_capacity(rows.len());
229 for row in rows {
230 match decode_sqlite_row(&row) {
235 Ok(envelope) => envelopes.push(envelope),
236 Err(error) => {
237 let event_id = row.try_get::<Uuid, _>("event_id").ok();
238 tracing::error!(
239 ?event_id,
240 error = %error,
241 "skipping undecodable outbox row; the rest of the batch continues"
242 );
243 }
244 }
245 }
246 Ok(envelopes)
247 }
248
249 async fn mark_delivered<'a>(
250 &self,
251 tx: &mut Self::Tx<'a>,
252 event_id: Uuid,
253 ) -> Result<(), OutboxError> {
254 sqlx::query(&self.mark_delivered_sql)
255 .bind(event_id)
256 .execute(&mut **tx)
257 .await
258 .map_err(database_error)?;
259 Ok(())
260 }
261
262 async fn mark_failed<'a>(
263 &self,
264 tx: &mut Self::Tx<'a>,
265 event_id: Uuid,
266 error: &str,
267 retry_in: Duration,
268 ) -> Result<(), OutboxError> {
269 sqlx::query(&self.mark_failed_sql)
272 .bind(error)
273 .bind(sqlite_seconds_modifier(retry_in))
274 .bind(event_id)
275 .execute(&mut **tx)
276 .await
277 .map_err(database_error)?;
278 Ok(())
279 }
280
281 async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
282 tx.commit().await.map_err(database_error)
283 }
284
285 async fn mark_dead_lettered<'a>(
286 &self,
287 tx: &mut Self::Tx<'a>,
288 event_id: Uuid,
289 _error: &str,
290 ) -> Result<(), OutboxError> {
291 let Some(dlq) = &self.dead_letter else {
292 return Ok(());
293 };
294 sqlx::query(&dlq.insert_sql)
295 .bind(event_id)
296 .execute(&mut **tx)
297 .await
298 .map_err(database_error)?;
299 sqlx::query(&dlq.delete_sql)
300 .bind(event_id)
301 .execute(&mut **tx)
302 .await
303 .map_err(database_error)?;
304 Ok(())
305 }
306
307 async fn claim<'a>(
316 &self,
317 tx: &mut Self::Tx<'a>,
318 event_ids: &[Uuid],
319 lease_for: Duration,
320 ) -> Result<(), OutboxError> {
321 if event_ids.is_empty() {
322 return Ok(());
323 }
324 let sql = DIALECT.claim_sql(&self.table_name, event_ids.len());
326 let mut query = sqlx::query(&sql).bind(sqlite_seconds_modifier(lease_for));
327 for id in event_ids {
328 query = query.bind(*id);
329 }
330 query.execute(&mut **tx).await.map_err(database_error)?;
331 Ok(())
332 }
333}
334
335#[derive(Debug, Clone)]
339pub struct SqliteOutboxPublisher {
340 pool: SqlitePool,
341 table_name: Arc<str>,
342 insert_sql: Arc<str>,
343}
344
345impl SqliteOutboxPublisher {
346 pub fn new(pool: SqlitePool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
353 let table_name = table_name.into();
354 validate_table_name(&table_name)?;
355 let insert_sql = DIALECT.insert_sql(&table_name);
356 Ok(Self {
357 pool,
358 table_name: Arc::from(table_name),
359 insert_sql: Arc::from(insert_sql),
360 })
361 }
362
363 #[must_use]
365 pub fn pool(&self) -> &SqlitePool {
366 &self.pool
367 }
368
369 #[must_use]
371 pub fn table_name(&self) -> &str {
372 &self.table_name
373 }
374}
375
376impl OutboxPublisher for SqliteOutboxPublisher {
377 type Tx<'tx> = Transaction<'tx, Sqlite>;
378
379 async fn publish_in_tx<E: Event>(
380 &self,
381 tx: &mut Self::Tx<'_>,
382 event: &E,
383 ) -> Result<Uuid, OutboxError> {
384 validate_event_type(E::EVENT_TYPE)?;
385 let event_id = Uuid::now_v7();
386 let payload = serde_json::to_value(event)?;
387 sqlx::query(&self.insert_sql)
388 .bind(event_id)
389 .bind(E::EVENT_TYPE)
390 .bind(payload)
391 .bind(Option::<Uuid>::None)
392 .execute(&mut **tx)
393 .await
394 .map_err(database_error)?;
395 Ok(event_id)
396 }
397
398 async fn publish_in_tx_with_subject<E: Event>(
399 &self,
400 tx: &mut Self::Tx<'_>,
401 subject_id: Uuid,
402 event: &E,
403 ) -> Result<Uuid, OutboxError> {
404 validate_event_type(E::EVENT_TYPE)?;
405 let event_id = Uuid::now_v7();
406 let payload = serde_json::to_value(event)?;
407 sqlx::query(&self.insert_sql)
408 .bind(event_id)
409 .bind(E::EVENT_TYPE)
410 .bind(payload)
411 .bind(Some(subject_id))
412 .execute(&mut **tx)
413 .await
414 .map_err(database_error)?;
415 Ok(event_id)
416 }
417
418 async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
419 let mut tx = self.pool.begin().await.map_err(database_error)?;
420 let event_id = self.publish_in_tx(&mut tx, event).await?;
421 tx.commit().await.map_err(database_error)?;
422 Ok(event_id)
423 }
424}
425
426pub struct SqliteOutboxWorkerBuilder {
458 pool: SqlitePool,
459 table_name: String,
460 dead_letter_table: Option<String>,
461 handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
462 config: OutboxWorkerConfig,
463}
464
465impl SqliteOutboxWorkerBuilder {
466 #[must_use]
468 pub fn new(pool: SqlitePool) -> Self {
469 Self {
470 pool,
471 table_name: DEFAULT_TABLE_NAME.to_owned(),
472 dead_letter_table: None,
473 handlers: HashMap::new(),
474 config: OutboxWorkerConfig::default(),
475 }
476 }
477
478 #[must_use]
480 pub fn table_name(mut self, name: impl Into<String>) -> Self {
481 self.table_name = name.into();
482 self
483 }
484
485 #[must_use]
487 pub fn dead_letter_table(mut self, name: impl Into<String>) -> Self {
488 self.dead_letter_table = Some(name.into());
489 self
490 }
491
492 #[must_use]
497 pub fn register_handler<E, H>(mut self, handler: H) -> Self
498 where
499 E: Event,
500 H: Handler<E>,
501 {
502 let typed = TypedHandler::<E, H>::new(handler);
503 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
504 self.handlers.insert(E::EVENT_TYPE, erased);
505 self
506 }
507
508 #[must_use]
510 pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
511 where
512 E: Event,
513 H: Handler<E>,
514 {
515 let typed = TypedHandler::<E, H>::shared(handler);
516 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
517 self.handlers.insert(E::EVENT_TYPE, erased);
518 self
519 }
520
521 #[must_use]
523 pub fn poll_interval(mut self, d: Duration) -> Self {
524 self.config.poll_interval = d;
525 self
526 }
527
528 #[must_use]
530 pub fn batch_size(mut self, n: usize) -> Self {
531 self.config.batch_size = n;
532 self
533 }
534
535 #[must_use]
537 pub fn max_attempts(mut self, n: u32) -> Self {
538 self.config.max_attempts = n;
539 self
540 }
541
542 #[must_use]
544 pub fn retry_base_delay(mut self, d: Duration) -> Self {
545 self.config.retry_base_delay = d;
546 self
547 }
548
549 #[must_use]
551 pub fn retry_max_delay(mut self, d: Duration) -> Self {
552 self.config.retry_max_delay = d;
553 self
554 }
555
556 #[must_use]
558 pub fn jitter(mut self, enabled: bool) -> Self {
559 self.config.jitter = enabled;
560 self
561 }
562
563 #[must_use]
572 pub fn dispatch_timeout(mut self, d: Duration) -> Self {
573 self.config.dispatch_timeout = d;
574 self
575 }
576
577 pub fn build(self) -> Result<OutboxWorker<SqliteOutboxStore>, OutboxError> {
584 let mut store = SqliteOutboxStore::new(self.pool, self.table_name)?;
585 if let Some(dlq) = self.dead_letter_table {
586 store = store.with_dead_letter(dlq)?;
587 }
588 Ok(OutboxWorker::new(store, self.handlers, self.config))
589 }
590}
591
592pub async fn ensure_dead_letter_schema(
601 pool: &SqlitePool,
602 table_name: &str,
603) -> Result<(), OutboxError> {
604 let ddl = DIALECT.dead_letter_schema_ddl(table_name)?;
605 sqlx::raw_sql(&ddl)
606 .execute(pool)
607 .await
608 .map_err(database_error)?;
609 Ok(())
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615 use hexeract_core::HandlerContext;
616 use serde::Deserialize;
617 use serde::Serialize;
618
619 fn lazy_pool() -> SqlitePool {
620 SqlitePool::connect_lazy("sqlite::memory:").expect("lazy pool must build from a valid URL")
621 }
622
623 #[derive(Debug, Serialize, Deserialize)]
624 struct UserRegistered {
625 user_id: Uuid,
626 }
627
628 impl Event for UserRegistered {
629 const EVENT_TYPE: &'static str = "users.registered";
630 }
631
632 #[derive(Debug, Serialize, Deserialize)]
633 struct OrderPlaced {
634 order_id: Uuid,
635 }
636
637 impl Event for OrderPlaced {
638 const EVENT_TYPE: &'static str = "orders.placed";
639 }
640
641 struct NoopHandler;
642
643 impl Handler<UserRegistered> for NoopHandler {
644 type Error = OutboxError;
645 async fn handle(
646 &self,
647 _event: UserRegistered,
648 _ctx: &HandlerContext,
649 ) -> Result<(), Self::Error> {
650 Ok(())
651 }
652 }
653
654 impl Handler<OrderPlaced> for NoopHandler {
655 type Error = OutboxError;
656 async fn handle(
657 &self,
658 _event: OrderPlaced,
659 _ctx: &HandlerContext,
660 ) -> Result<(), Self::Error> {
661 Ok(())
662 }
663 }
664
665 #[test]
666 fn pool_error_maps_pool_timed_out_to_pool_timeout_variant() {
667 let err = pool_error(sqlx::Error::PoolTimedOut);
668 assert!(
669 matches!(err, OutboxError::PoolTimeout),
670 "PoolTimedOut must map to OutboxError::PoolTimeout, got {err:?}"
671 );
672 }
673
674 #[test]
675 fn pool_error_wraps_other_errors_as_database_error() {
676 let err = pool_error(sqlx::Error::RowNotFound);
677 assert!(
678 matches!(err, OutboxError::Database(_)),
679 "non-timeout errors must map to OutboxError::Database, got {err:?}"
680 );
681 }
682
683 #[tokio::test]
684 async fn store_new_rejects_invalid_table_name() {
685 let err = SqliteOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
686 assert!(matches!(err, OutboxError::Internal(_)));
687 }
688
689 #[tokio::test]
690 async fn store_new_caches_sqlite_sql_without_skip_locked() {
691 let store = SqliteOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
692 assert_eq!(store.table_name(), "audit_outbox");
693 assert!(store.poll_sql.contains("FROM \"audit_outbox\""));
694 assert!(!store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
695 assert!(store.poll_sql.contains("strftime"));
696 assert!(!store.mark_failed_sql.contains("attempts = attempts + 1"));
699 }
700
701 #[tokio::test]
702 async fn publisher_new_caches_insert_sql_with_question_marks() {
703 let publisher = SqliteOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
704 assert_eq!(publisher.table_name(), "audit_outbox");
705 assert!(
706 publisher
707 .insert_sql
708 .contains("INSERT INTO \"audit_outbox\"")
709 );
710 assert!(publisher.insert_sql.contains("?, ?, ?, ?"));
711 }
712
713 #[test]
714 fn sqlite_seconds_modifier_caps_huge_duration() {
715 let modifier = sqlite_seconds_modifier(Duration::MAX);
718 assert!(
719 !modifier.contains("inf"),
720 "Duration::MAX must not produce an inf modifier, got: {modifier}"
721 );
722 assert!(modifier.starts_with('+'), "modifier must start with '+'");
723 assert!(
724 modifier.ends_with(" seconds"),
725 "modifier must end with ' seconds'"
726 );
727 }
728
729 #[test]
730 fn sqlite_seconds_modifier_preserves_ordinary_values() {
731 let modifier = sqlite_seconds_modifier(Duration::from_millis(1_500));
732 assert_eq!(modifier, "+1.500 seconds");
733 }
734
735 #[tokio::test]
736 async fn builder_register_handler_records_event_types() {
737 let builder = SqliteOutboxWorkerBuilder::new(lazy_pool())
738 .register_handler::<UserRegistered, _>(NoopHandler)
739 .register_handler::<OrderPlaced, _>(NoopHandler);
740 assert_eq!(builder.handlers.len(), 2);
741 assert!(builder.handlers.contains_key("users.registered"));
742 assert!(builder.handlers.contains_key("orders.placed"));
743 }
744
745 #[tokio::test]
746 async fn builder_build_rejects_invalid_table_name() {
747 let result = SqliteOutboxWorkerBuilder::new(lazy_pool())
748 .table_name("bad name; DROP TABLE")
749 .build();
750 assert!(matches!(result, Err(OutboxError::Internal(_))));
751 }
752
753 #[tokio::test]
754 async fn builder_build_with_default_table_name_succeeds() {
755 let worker = SqliteOutboxWorkerBuilder::new(lazy_pool()).build();
756 assert!(worker.is_ok());
757 }
758}