1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4use std::time::SystemTime;
5
6use async_trait::async_trait;
7use hexeract_outbox::ErasedHandler;
8use hexeract_outbox::Event;
9use hexeract_outbox::Handler;
10use hexeract_outbox::OutboxEnvelope;
11use hexeract_outbox::OutboxError;
12use hexeract_outbox::OutboxPublisher;
13use hexeract_outbox::OutboxStore;
14use hexeract_outbox::OutboxWorker;
15use hexeract_outbox::OutboxWorkerConfig;
16use hexeract_outbox::TypedHandler;
17use sqlx::Acquire;
18use sqlx::PgPool;
19use sqlx::Postgres;
20use sqlx::Row;
21use sqlx::Transaction;
22use sqlx::pool::PoolConnection;
23use time::OffsetDateTime;
24use uuid::Uuid;
25
26use crate::DEFAULT_TABLE_NAME;
27use crate::dialect::Dialect;
28use crate::envelope::assemble_envelope;
29use crate::envelope::to_offset;
30use crate::envelope::to_system_time;
31use crate::validate::validate_table_name;
32
33const DIALECT: Dialect = Dialect::Postgres;
34
35fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
36 OutboxError::Database(Box::new(error))
37}
38
39pub async fn ensure_schema(pool: &PgPool, table_name: &str) -> Result<(), OutboxError> {
52 let ddl = DIALECT.schema_ddl(table_name)?;
53 sqlx::raw_sql(&ddl)
54 .execute(pool)
55 .await
56 .map_err(database_error)?;
57 Ok(())
58}
59
60#[derive(Debug, Clone)]
64pub struct PgOutboxStore {
65 pool: PgPool,
66 table_name: Arc<str>,
67 poll_sql: Arc<str>,
68 mark_delivered_sql: Arc<str>,
69 mark_failed_sql: Arc<str>,
70}
71
72impl PgOutboxStore {
73 pub fn new(pool: PgPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
83 let table_name = table_name.into();
84 validate_table_name(&table_name)?;
85 let poll_sql = DIALECT.poll_sql(&table_name);
86 let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
87 let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
88 Ok(Self {
89 pool,
90 table_name: Arc::from(table_name),
91 poll_sql: Arc::from(poll_sql),
92 mark_delivered_sql: Arc::from(mark_delivered_sql),
93 mark_failed_sql: Arc::from(mark_failed_sql),
94 })
95 }
96
97 #[must_use]
99 pub fn pool(&self) -> &PgPool {
100 &self.pool
101 }
102
103 #[must_use]
105 pub fn table_name(&self) -> &str {
106 &self.table_name
107 }
108}
109
110#[async_trait]
111impl OutboxStore for PgOutboxStore {
112 type Client = PoolConnection<Postgres>;
113 type Tx<'tx> = Transaction<'tx, Postgres>;
114
115 async fn acquire(&self) -> Result<Self::Client, OutboxError> {
116 self.pool.acquire().await.map_err(database_error)
117 }
118
119 async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
120 client.begin().await.map_err(database_error)
121 }
122
123 async fn poll<'a>(
124 &self,
125 tx: &mut Self::Tx<'a>,
126 batch_size: usize,
127 max_attempts: u32,
128 ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
129 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
130 let max = i32::try_from(max_attempts).unwrap_or(i32::MAX);
131 let rows = sqlx::query(&self.poll_sql)
132 .bind(max)
133 .bind(limit)
134 .fetch_all(&mut **tx)
135 .await
136 .map_err(database_error)?;
137
138 let mut envelopes = Vec::with_capacity(rows.len());
139 for row in rows {
140 let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
141 let event_type: String = row.try_get("event_type").map_err(database_error)?;
142 let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
143 let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
144 let created_at: OffsetDateTime = row.try_get("created_at").map_err(database_error)?;
145 let attempts: i32 = row.try_get("attempts").map_err(database_error)?;
146 let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
147 let next_retry_at: Option<OffsetDateTime> =
148 row.try_get("next_retry_at").map_err(database_error)?;
149
150 let payload = serde_json::to_vec(&payload)?;
151
152 envelopes.push(assemble_envelope(
153 event_id,
154 event_type,
155 payload,
156 subject_id,
157 to_system_time(created_at),
158 u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
159 last_error,
160 next_retry_at.map(to_system_time),
161 ));
162 }
163 Ok(envelopes)
164 }
165
166 async fn mark_delivered<'a>(
167 &self,
168 tx: &mut Self::Tx<'a>,
169 event_id: Uuid,
170 ) -> Result<(), OutboxError> {
171 sqlx::query(&self.mark_delivered_sql)
172 .bind(event_id)
173 .execute(&mut **tx)
174 .await
175 .map_err(database_error)?;
176 Ok(())
177 }
178
179 async fn mark_failed<'a>(
180 &self,
181 tx: &mut Self::Tx<'a>,
182 event_id: Uuid,
183 error: &str,
184 next_retry_at: SystemTime,
185 ) -> Result<(), OutboxError> {
186 sqlx::query(&self.mark_failed_sql)
187 .bind(error)
188 .bind(to_offset(next_retry_at))
189 .bind(event_id)
190 .execute(&mut **tx)
191 .await
192 .map_err(database_error)?;
193 Ok(())
194 }
195
196 async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
197 tx.commit().await.map_err(database_error)
198 }
199}
200
201#[derive(Debug, Clone)]
205pub struct PgOutboxPublisher {
206 pool: PgPool,
207 table_name: Arc<str>,
208 insert_sql: Arc<str>,
209}
210
211impl PgOutboxPublisher {
212 pub fn new(pool: PgPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
219 let table_name = table_name.into();
220 validate_table_name(&table_name)?;
221 let insert_sql = DIALECT.insert_sql(&table_name);
222 Ok(Self {
223 pool,
224 table_name: Arc::from(table_name),
225 insert_sql: Arc::from(insert_sql),
226 })
227 }
228
229 #[must_use]
231 pub fn pool(&self) -> &PgPool {
232 &self.pool
233 }
234
235 #[must_use]
237 pub fn table_name(&self) -> &str {
238 &self.table_name
239 }
240}
241
242impl OutboxPublisher for PgOutboxPublisher {
243 type Tx<'tx> = Transaction<'tx, Postgres>;
244
245 async fn publish_in_tx<E: Event>(
246 &self,
247 tx: &mut Self::Tx<'_>,
248 event: &E,
249 ) -> Result<Uuid, OutboxError> {
250 let event_id = Uuid::now_v7();
251 let payload = serde_json::to_value(event)?;
252 sqlx::query(&self.insert_sql)
253 .bind(event_id)
254 .bind(E::EVENT_TYPE)
255 .bind(payload)
256 .bind(Option::<Uuid>::None)
257 .execute(&mut **tx)
258 .await
259 .map_err(database_error)?;
260 Ok(event_id)
261 }
262
263 async fn publish_in_tx_with_subject<E: Event>(
264 &self,
265 tx: &mut Self::Tx<'_>,
266 subject_id: Uuid,
267 event: &E,
268 ) -> Result<Uuid, OutboxError> {
269 let event_id = Uuid::now_v7();
270 let payload = serde_json::to_value(event)?;
271 sqlx::query(&self.insert_sql)
272 .bind(event_id)
273 .bind(E::EVENT_TYPE)
274 .bind(payload)
275 .bind(Some(subject_id))
276 .execute(&mut **tx)
277 .await
278 .map_err(database_error)?;
279 Ok(event_id)
280 }
281
282 async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
283 let mut tx = self.pool.begin().await.map_err(database_error)?;
284 let event_id = self.publish_in_tx(&mut tx, event).await?;
285 tx.commit().await.map_err(database_error)?;
286 Ok(event_id)
287 }
288}
289
290pub struct PgOutboxWorkerBuilder {
292 pool: PgPool,
293 table_name: String,
294 handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
295 config: OutboxWorkerConfig,
296}
297
298impl PgOutboxWorkerBuilder {
299 #[must_use]
301 pub fn new(pool: PgPool) -> Self {
302 Self {
303 pool,
304 table_name: DEFAULT_TABLE_NAME.to_owned(),
305 handlers: HashMap::new(),
306 config: OutboxWorkerConfig::default(),
307 }
308 }
309
310 #[must_use]
312 pub fn table_name(mut self, name: impl Into<String>) -> Self {
313 self.table_name = name.into();
314 self
315 }
316
317 #[must_use]
322 pub fn register_handler<E, H>(mut self, handler: H) -> Self
323 where
324 E: Event,
325 H: Handler<E>,
326 {
327 let typed = TypedHandler::<E, H>::new(handler);
328 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
329 self.handlers.insert(E::EVENT_TYPE, erased);
330 self
331 }
332
333 #[must_use]
335 pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
336 where
337 E: Event,
338 H: Handler<E>,
339 {
340 let typed = TypedHandler::<E, H>::shared(handler);
341 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
342 self.handlers.insert(E::EVENT_TYPE, erased);
343 self
344 }
345
346 #[must_use]
348 pub fn poll_interval(mut self, d: Duration) -> Self {
349 self.config.poll_interval = d;
350 self
351 }
352
353 #[must_use]
355 pub fn batch_size(mut self, n: usize) -> Self {
356 self.config.batch_size = n;
357 self
358 }
359
360 #[must_use]
362 pub fn max_attempts(mut self, n: u32) -> Self {
363 self.config.max_attempts = n;
364 self
365 }
366
367 #[must_use]
369 pub fn retry_delay(mut self, d: Duration) -> Self {
370 self.config.retry_delay = d;
371 self
372 }
373
374 pub fn build(self) -> Result<OutboxWorker<PgOutboxStore>, OutboxError> {
381 let store = PgOutboxStore::new(self.pool, self.table_name)?;
382 Ok(OutboxWorker::new(store, self.handlers, self.config))
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use hexeract_core::HandlerContext;
390 use serde::Deserialize;
391 use serde::Serialize;
392
393 fn lazy_pool() -> PgPool {
394 PgPool::connect_lazy("postgres://nobody:nobody@127.0.0.1:1/nobody")
395 .expect("lazy pool must build from a valid URL")
396 }
397
398 #[derive(Debug, Serialize, Deserialize)]
399 struct UserRegistered {
400 user_id: Uuid,
401 }
402
403 impl Event for UserRegistered {
404 const EVENT_TYPE: &'static str = "users.registered";
405 }
406
407 #[derive(Debug, Serialize, Deserialize)]
408 struct OrderPlaced {
409 order_id: Uuid,
410 }
411
412 impl Event for OrderPlaced {
413 const EVENT_TYPE: &'static str = "orders.placed";
414 }
415
416 struct NoopHandler;
417
418 impl Handler<UserRegistered> for NoopHandler {
419 type Error = OutboxError;
420 async fn handle(
421 &self,
422 _event: UserRegistered,
423 _ctx: &HandlerContext,
424 ) -> Result<(), Self::Error> {
425 Ok(())
426 }
427 }
428
429 impl Handler<OrderPlaced> for NoopHandler {
430 type Error = OutboxError;
431 async fn handle(
432 &self,
433 _event: OrderPlaced,
434 _ctx: &HandlerContext,
435 ) -> Result<(), Self::Error> {
436 Ok(())
437 }
438 }
439
440 #[tokio::test]
441 async fn store_new_rejects_invalid_table_name() {
442 let err = PgOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
443 assert!(matches!(err, OutboxError::Internal(_)));
444 }
445
446 #[tokio::test]
447 async fn store_new_caches_postgres_sql_with_validated_table_name() {
448 let store = PgOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
449 assert_eq!(store.table_name(), "audit_outbox");
450 assert!(store.poll_sql.contains("FROM audit_outbox"));
451 assert!(store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
452 assert!(store.mark_delivered_sql.contains("UPDATE audit_outbox"));
453 assert!(store.mark_failed_sql.contains("attempts = attempts + 1"));
454 }
455
456 #[tokio::test]
457 async fn publisher_new_rejects_invalid_table_name() {
458 let err = PgOutboxPublisher::new(lazy_pool(), "bad name; DROP").unwrap_err();
459 assert!(matches!(err, OutboxError::Internal(_)));
460 }
461
462 #[tokio::test]
463 async fn publisher_new_caches_insert_sql_with_validated_table_name() {
464 let publisher = PgOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
465 assert_eq!(publisher.table_name(), "audit_outbox");
466 assert!(publisher.insert_sql.contains("INSERT INTO audit_outbox"));
467 assert!(publisher.insert_sql.contains("$1, $2, $3, $4"));
468 }
469
470 #[tokio::test]
471 async fn builder_starts_with_default_table_and_empty_handlers() {
472 let builder = PgOutboxWorkerBuilder::new(lazy_pool());
473 assert_eq!(builder.table_name, DEFAULT_TABLE_NAME);
474 assert!(builder.handlers.is_empty());
475 let default_cfg = OutboxWorkerConfig::default();
476 assert_eq!(builder.config.batch_size, default_cfg.batch_size);
477 assert_eq!(builder.config.max_attempts, default_cfg.max_attempts);
478 }
479
480 #[tokio::test]
481 async fn builder_table_name_can_be_customized() {
482 let builder = PgOutboxWorkerBuilder::new(lazy_pool()).table_name("my_outbox");
483 assert_eq!(builder.table_name, "my_outbox");
484 }
485
486 #[tokio::test]
487 async fn builder_register_handler_records_event_types() {
488 let builder = PgOutboxWorkerBuilder::new(lazy_pool())
489 .register_handler::<UserRegistered, _>(NoopHandler)
490 .register_handler::<OrderPlaced, _>(NoopHandler);
491 assert_eq!(builder.handlers.len(), 2);
492 assert!(builder.handlers.contains_key("users.registered"));
493 assert!(builder.handlers.contains_key("orders.placed"));
494 }
495
496 #[tokio::test]
497 async fn builder_register_handler_twice_replaces_silently() {
498 let builder = PgOutboxWorkerBuilder::new(lazy_pool())
499 .register_handler::<UserRegistered, _>(NoopHandler)
500 .register_handler::<UserRegistered, _>(NoopHandler);
501 assert_eq!(builder.handlers.len(), 1);
502 }
503
504 #[tokio::test]
505 async fn builder_build_rejects_invalid_table_name() {
506 let result = PgOutboxWorkerBuilder::new(lazy_pool())
507 .table_name("bad name; DROP TABLE")
508 .build();
509 assert!(matches!(result, Err(OutboxError::Internal(_))));
510 }
511
512 #[tokio::test]
513 async fn builder_build_with_default_table_name_succeeds() {
514 let worker = PgOutboxWorkerBuilder::new(lazy_pool()).build();
515 assert!(worker.is_ok());
516 }
517}