1use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use std::time::SystemTime;
17
18use async_trait::async_trait;
19use hexeract_outbox::ErasedHandler;
20use hexeract_outbox::Event;
21use hexeract_outbox::Handler;
22use hexeract_outbox::OutboxEnvelope;
23use hexeract_outbox::OutboxError;
24use hexeract_outbox::OutboxPublisher;
25use hexeract_outbox::OutboxStore;
26use hexeract_outbox::OutboxWorker;
27use hexeract_outbox::OutboxWorkerConfig;
28use hexeract_outbox::TypedHandler;
29use sqlx::Acquire;
30use sqlx::Row;
31use sqlx::Sqlite;
32use sqlx::SqlitePool;
33use sqlx::Transaction;
34use sqlx::pool::PoolConnection;
35use uuid::Uuid;
36
37use crate::DEFAULT_TABLE_NAME;
38use crate::dialect::Dialect;
39use crate::envelope::assemble_envelope;
40use crate::envelope::format_sqlite_utc;
41use crate::envelope::parse_sqlite_utc;
42use crate::validate::validate_table_name;
43
44const DIALECT: Dialect = Dialect::Sqlite;
45
46fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
47 OutboxError::Database(Box::new(error))
48}
49
50pub async fn ensure_schema(pool: &SqlitePool, table_name: &str) -> Result<(), OutboxError> {
61 let ddl = DIALECT.schema_ddl(table_name)?;
62 sqlx::raw_sql(&ddl)
63 .execute(pool)
64 .await
65 .map_err(database_error)?;
66 Ok(())
67}
68
69#[derive(Debug, Clone)]
74pub struct SqliteOutboxStore {
75 pool: SqlitePool,
76 table_name: Arc<str>,
77 poll_sql: Arc<str>,
78 mark_delivered_sql: Arc<str>,
79 mark_failed_sql: Arc<str>,
80}
81
82impl SqliteOutboxStore {
83 pub fn new(pool: SqlitePool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
90 let table_name = table_name.into();
91 validate_table_name(&table_name)?;
92 let poll_sql = DIALECT.poll_sql(&table_name);
93 let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
94 let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
95 Ok(Self {
96 pool,
97 table_name: Arc::from(table_name),
98 poll_sql: Arc::from(poll_sql),
99 mark_delivered_sql: Arc::from(mark_delivered_sql),
100 mark_failed_sql: Arc::from(mark_failed_sql),
101 })
102 }
103
104 #[must_use]
106 pub fn pool(&self) -> &SqlitePool {
107 &self.pool
108 }
109
110 #[must_use]
112 pub fn table_name(&self) -> &str {
113 &self.table_name
114 }
115}
116
117#[async_trait]
118impl OutboxStore for SqliteOutboxStore {
119 type Client = PoolConnection<Sqlite>;
120 type Tx<'tx> = Transaction<'tx, Sqlite>;
121
122 async fn acquire(&self) -> Result<Self::Client, OutboxError> {
123 self.pool.acquire().await.map_err(database_error)
124 }
125
126 async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
127 client.begin().await.map_err(database_error)
128 }
129
130 async fn poll<'a>(
131 &self,
132 tx: &mut Self::Tx<'a>,
133 batch_size: usize,
134 max_attempts: u32,
135 ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
136 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
137 let max = i64::from(max_attempts);
138 let rows = sqlx::query(&self.poll_sql)
139 .bind(max)
140 .bind(limit)
141 .fetch_all(&mut **tx)
142 .await
143 .map_err(database_error)?;
144
145 let mut envelopes = Vec::with_capacity(rows.len());
146 for row in rows {
147 let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
148 let event_type: String = row.try_get("event_type").map_err(database_error)?;
149 let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
150 let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
151 let created_at: String = row.try_get("created_at").map_err(database_error)?;
152 let attempts: i64 = row.try_get("attempts").map_err(database_error)?;
153 let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
154 let next_retry_at: Option<String> =
155 row.try_get("next_retry_at").map_err(database_error)?;
156
157 let payload = serde_json::to_vec(&payload)?;
158 let next_retry_at = next_retry_at.as_deref().map(parse_sqlite_utc).transpose()?;
159
160 envelopes.push(assemble_envelope(
161 event_id,
162 event_type,
163 payload,
164 subject_id,
165 parse_sqlite_utc(&created_at)?,
166 u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
167 last_error,
168 next_retry_at,
169 ));
170 }
171 Ok(envelopes)
172 }
173
174 async fn mark_delivered<'a>(
175 &self,
176 tx: &mut Self::Tx<'a>,
177 event_id: Uuid,
178 ) -> Result<(), OutboxError> {
179 sqlx::query(&self.mark_delivered_sql)
180 .bind(event_id)
181 .execute(&mut **tx)
182 .await
183 .map_err(database_error)?;
184 Ok(())
185 }
186
187 async fn mark_failed<'a>(
188 &self,
189 tx: &mut Self::Tx<'a>,
190 event_id: Uuid,
191 error: &str,
192 next_retry_at: SystemTime,
193 ) -> Result<(), OutboxError> {
194 let next_retry_at = format_sqlite_utc(next_retry_at)?;
195 sqlx::query(&self.mark_failed_sql)
196 .bind(error)
197 .bind(next_retry_at)
198 .bind(event_id)
199 .execute(&mut **tx)
200 .await
201 .map_err(database_error)?;
202 Ok(())
203 }
204
205 async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
206 tx.commit().await.map_err(database_error)
207 }
208}
209
210#[derive(Debug, Clone)]
214pub struct SqliteOutboxPublisher {
215 pool: SqlitePool,
216 table_name: Arc<str>,
217 insert_sql: Arc<str>,
218}
219
220impl SqliteOutboxPublisher {
221 pub fn new(pool: SqlitePool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
228 let table_name = table_name.into();
229 validate_table_name(&table_name)?;
230 let insert_sql = DIALECT.insert_sql(&table_name);
231 Ok(Self {
232 pool,
233 table_name: Arc::from(table_name),
234 insert_sql: Arc::from(insert_sql),
235 })
236 }
237
238 #[must_use]
240 pub fn pool(&self) -> &SqlitePool {
241 &self.pool
242 }
243
244 #[must_use]
246 pub fn table_name(&self) -> &str {
247 &self.table_name
248 }
249}
250
251impl OutboxPublisher for SqliteOutboxPublisher {
252 type Tx<'tx> = Transaction<'tx, Sqlite>;
253
254 async fn publish_in_tx<E: Event>(
255 &self,
256 tx: &mut Self::Tx<'_>,
257 event: &E,
258 ) -> Result<Uuid, OutboxError> {
259 let event_id = Uuid::now_v7();
260 let payload = serde_json::to_value(event)?;
261 sqlx::query(&self.insert_sql)
262 .bind(event_id)
263 .bind(E::EVENT_TYPE)
264 .bind(payload)
265 .bind(Option::<Uuid>::None)
266 .execute(&mut **tx)
267 .await
268 .map_err(database_error)?;
269 Ok(event_id)
270 }
271
272 async fn publish_in_tx_with_subject<E: Event>(
273 &self,
274 tx: &mut Self::Tx<'_>,
275 subject_id: Uuid,
276 event: &E,
277 ) -> Result<Uuid, OutboxError> {
278 let event_id = Uuid::now_v7();
279 let payload = serde_json::to_value(event)?;
280 sqlx::query(&self.insert_sql)
281 .bind(event_id)
282 .bind(E::EVENT_TYPE)
283 .bind(payload)
284 .bind(Some(subject_id))
285 .execute(&mut **tx)
286 .await
287 .map_err(database_error)?;
288 Ok(event_id)
289 }
290
291 async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
292 let mut tx = self.pool.begin().await.map_err(database_error)?;
293 let event_id = self.publish_in_tx(&mut tx, event).await?;
294 tx.commit().await.map_err(database_error)?;
295 Ok(event_id)
296 }
297}
298
299pub struct SqliteOutboxWorkerBuilder {
303 pool: SqlitePool,
304 table_name: String,
305 handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
306 config: OutboxWorkerConfig,
307}
308
309impl SqliteOutboxWorkerBuilder {
310 #[must_use]
312 pub fn new(pool: SqlitePool) -> Self {
313 Self {
314 pool,
315 table_name: DEFAULT_TABLE_NAME.to_owned(),
316 handlers: HashMap::new(),
317 config: OutboxWorkerConfig::default(),
318 }
319 }
320
321 #[must_use]
323 pub fn table_name(mut self, name: impl Into<String>) -> Self {
324 self.table_name = name.into();
325 self
326 }
327
328 #[must_use]
333 pub fn register_handler<E, H>(mut self, handler: H) -> Self
334 where
335 E: Event,
336 H: Handler<E>,
337 {
338 let typed = TypedHandler::<E, H>::new(handler);
339 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
340 self.handlers.insert(E::EVENT_TYPE, erased);
341 self
342 }
343
344 #[must_use]
346 pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
347 where
348 E: Event,
349 H: Handler<E>,
350 {
351 let typed = TypedHandler::<E, H>::shared(handler);
352 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
353 self.handlers.insert(E::EVENT_TYPE, erased);
354 self
355 }
356
357 #[must_use]
359 pub fn poll_interval(mut self, d: Duration) -> Self {
360 self.config.poll_interval = d;
361 self
362 }
363
364 #[must_use]
366 pub fn batch_size(mut self, n: usize) -> Self {
367 self.config.batch_size = n;
368 self
369 }
370
371 #[must_use]
373 pub fn max_attempts(mut self, n: u32) -> Self {
374 self.config.max_attempts = n;
375 self
376 }
377
378 #[must_use]
380 pub fn retry_delay(mut self, d: Duration) -> Self {
381 self.config.retry_delay = d;
382 self
383 }
384
385 pub fn build(self) -> Result<OutboxWorker<SqliteOutboxStore>, OutboxError> {
392 let store = SqliteOutboxStore::new(self.pool, self.table_name)?;
393 Ok(OutboxWorker::new(store, self.handlers, self.config))
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use hexeract_core::HandlerContext;
401 use serde::Deserialize;
402 use serde::Serialize;
403
404 fn lazy_pool() -> SqlitePool {
405 SqlitePool::connect_lazy("sqlite::memory:").expect("lazy pool must build from a valid URL")
406 }
407
408 #[derive(Debug, Serialize, Deserialize)]
409 struct UserRegistered {
410 user_id: Uuid,
411 }
412
413 impl Event for UserRegistered {
414 const EVENT_TYPE: &'static str = "users.registered";
415 }
416
417 #[derive(Debug, Serialize, Deserialize)]
418 struct OrderPlaced {
419 order_id: Uuid,
420 }
421
422 impl Event for OrderPlaced {
423 const EVENT_TYPE: &'static str = "orders.placed";
424 }
425
426 struct NoopHandler;
427
428 impl Handler<UserRegistered> for NoopHandler {
429 type Error = OutboxError;
430 async fn handle(
431 &self,
432 _event: UserRegistered,
433 _ctx: &HandlerContext,
434 ) -> Result<(), Self::Error> {
435 Ok(())
436 }
437 }
438
439 impl Handler<OrderPlaced> for NoopHandler {
440 type Error = OutboxError;
441 async fn handle(
442 &self,
443 _event: OrderPlaced,
444 _ctx: &HandlerContext,
445 ) -> Result<(), Self::Error> {
446 Ok(())
447 }
448 }
449
450 #[tokio::test]
451 async fn store_new_rejects_invalid_table_name() {
452 let err = SqliteOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
453 assert!(matches!(err, OutboxError::Internal(_)));
454 }
455
456 #[tokio::test]
457 async fn store_new_caches_sqlite_sql_without_skip_locked() {
458 let store = SqliteOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
459 assert_eq!(store.table_name(), "audit_outbox");
460 assert!(store.poll_sql.contains("FROM audit_outbox"));
461 assert!(!store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
462 assert!(store.poll_sql.contains("strftime"));
463 assert!(store.mark_failed_sql.contains("attempts = attempts + 1"));
464 }
465
466 #[tokio::test]
467 async fn publisher_new_caches_insert_sql_with_question_marks() {
468 let publisher = SqliteOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
469 assert_eq!(publisher.table_name(), "audit_outbox");
470 assert!(publisher.insert_sql.contains("INSERT INTO audit_outbox"));
471 assert!(publisher.insert_sql.contains("?, ?, ?, ?"));
472 }
473
474 #[tokio::test]
475 async fn builder_register_handler_records_event_types() {
476 let builder = SqliteOutboxWorkerBuilder::new(lazy_pool())
477 .register_handler::<UserRegistered, _>(NoopHandler)
478 .register_handler::<OrderPlaced, _>(NoopHandler);
479 assert_eq!(builder.handlers.len(), 2);
480 assert!(builder.handlers.contains_key("users.registered"));
481 assert!(builder.handlers.contains_key("orders.placed"));
482 }
483
484 #[tokio::test]
485 async fn builder_build_rejects_invalid_table_name() {
486 let result = SqliteOutboxWorkerBuilder::new(lazy_pool())
487 .table_name("bad name; DROP TABLE")
488 .build();
489 assert!(matches!(result, Err(OutboxError::Internal(_))));
490 }
491
492 #[tokio::test]
493 async fn builder_build_with_default_table_name_succeeds() {
494 let worker = SqliteOutboxWorkerBuilder::new(lazy_pool()).build();
495 assert!(worker.is_ok());
496 }
497}