Skip to main content

hexeract_outbox_sql/
postgres.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4use std::time::SystemTime;
5
6use async_trait::async_trait;
7use hexeract_outbox::ErasedHandler;
8use hexeract_outbox::Event;
9use hexeract_outbox::Handler;
10use hexeract_outbox::OutboxEnvelope;
11use hexeract_outbox::OutboxError;
12use hexeract_outbox::OutboxPublisher;
13use hexeract_outbox::OutboxStore;
14use hexeract_outbox::OutboxWorker;
15use hexeract_outbox::OutboxWorkerConfig;
16use hexeract_outbox::TypedHandler;
17use sqlx::Acquire;
18use sqlx::PgPool;
19use sqlx::Postgres;
20use sqlx::Row;
21use sqlx::Transaction;
22use sqlx::pool::PoolConnection;
23use time::OffsetDateTime;
24use uuid::Uuid;
25
26use crate::DEFAULT_TABLE_NAME;
27use crate::dialect::Dialect;
28use crate::envelope::assemble_envelope;
29use crate::envelope::to_offset;
30use crate::envelope::to_system_time;
31use crate::validate::validate_table_name;
32
33const DIALECT: Dialect = Dialect::Postgres;
34
35fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
36    OutboxError::Database(Box::new(error))
37}
38
39/// Apply the canonical Postgres outbox schema to the target database.
40///
41/// **Intended for POCs, integration tests and local development.**
42/// Production deployments should run their own migration tooling against the
43/// SQL rendered by [`Dialect::schema_ddl`]. Applying DDL from the running
44/// application typically requires elevated privileges that the runtime role
45/// should not own, and clashes with versioned migration workflows.
46///
47/// # Errors
48///
49/// - [`OutboxError::Internal`] if `table_name` is not a valid identifier.
50/// - [`OutboxError::Database`] if the connection or the DDL statement fails.
51pub async fn ensure_schema(pool: &PgPool, table_name: &str) -> Result<(), OutboxError> {
52    let ddl = DIALECT.schema_ddl(table_name)?;
53    sqlx::raw_sql(&ddl)
54        .execute(pool)
55        .await
56        .map_err(database_error)?;
57    Ok(())
58}
59
60/// PostgreSQL implementation of [`OutboxStore`] backed by `sqlx::PgPool`.
61///
62/// Cheap to clone (the pool and the cached SQL strings are reference-counted).
63#[derive(Debug, Clone)]
64pub struct PgOutboxStore {
65    pool: PgPool,
66    table_name: Arc<str>,
67    poll_sql: Arc<str>,
68    mark_delivered_sql: Arc<str>,
69    mark_failed_sql: Arc<str>,
70}
71
72impl PgOutboxStore {
73    /// Build a store for the given pool and table.
74    ///
75    /// SQL statements are templated and cached at construction so each poll
76    /// cycle re-uses the same strings.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
81    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
82    pub fn new(pool: PgPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
83        let table_name = table_name.into();
84        validate_table_name(&table_name)?;
85        let poll_sql = DIALECT.poll_sql(&table_name);
86        let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
87        let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
88        Ok(Self {
89            pool,
90            table_name: Arc::from(table_name),
91            poll_sql: Arc::from(poll_sql),
92            mark_delivered_sql: Arc::from(mark_delivered_sql),
93            mark_failed_sql: Arc::from(mark_failed_sql),
94        })
95    }
96
97    /// Underlying pool.
98    #[must_use]
99    pub fn pool(&self) -> &PgPool {
100        &self.pool
101    }
102
103    /// Configured table name.
104    #[must_use]
105    pub fn table_name(&self) -> &str {
106        &self.table_name
107    }
108}
109
110#[async_trait]
111impl OutboxStore for PgOutboxStore {
112    type Client = PoolConnection<Postgres>;
113    type Tx<'tx> = Transaction<'tx, Postgres>;
114
115    async fn acquire(&self) -> Result<Self::Client, OutboxError> {
116        self.pool.acquire().await.map_err(database_error)
117    }
118
119    async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
120        client.begin().await.map_err(database_error)
121    }
122
123    async fn poll<'a>(
124        &self,
125        tx: &mut Self::Tx<'a>,
126        batch_size: usize,
127        max_attempts: u32,
128    ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
129        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
130        let max = i32::try_from(max_attempts).unwrap_or(i32::MAX);
131        let rows = sqlx::query(&self.poll_sql)
132            .bind(max)
133            .bind(limit)
134            .fetch_all(&mut **tx)
135            .await
136            .map_err(database_error)?;
137
138        let mut envelopes = Vec::with_capacity(rows.len());
139        for row in rows {
140            let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
141            let event_type: String = row.try_get("event_type").map_err(database_error)?;
142            let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
143            let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
144            let created_at: OffsetDateTime = row.try_get("created_at").map_err(database_error)?;
145            let attempts: i32 = row.try_get("attempts").map_err(database_error)?;
146            let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
147            let next_retry_at: Option<OffsetDateTime> =
148                row.try_get("next_retry_at").map_err(database_error)?;
149
150            let payload = serde_json::to_vec(&payload)?;
151
152            envelopes.push(assemble_envelope(
153                event_id,
154                event_type,
155                payload,
156                subject_id,
157                to_system_time(created_at),
158                u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
159                last_error,
160                next_retry_at.map(to_system_time),
161            ));
162        }
163        Ok(envelopes)
164    }
165
166    async fn mark_delivered<'a>(
167        &self,
168        tx: &mut Self::Tx<'a>,
169        event_id: Uuid,
170    ) -> Result<(), OutboxError> {
171        sqlx::query(&self.mark_delivered_sql)
172            .bind(event_id)
173            .execute(&mut **tx)
174            .await
175            .map_err(database_error)?;
176        Ok(())
177    }
178
179    async fn mark_failed<'a>(
180        &self,
181        tx: &mut Self::Tx<'a>,
182        event_id: Uuid,
183        error: &str,
184        next_retry_at: SystemTime,
185    ) -> Result<(), OutboxError> {
186        sqlx::query(&self.mark_failed_sql)
187            .bind(error)
188            .bind(to_offset(next_retry_at))
189            .bind(event_id)
190            .execute(&mut **tx)
191            .await
192            .map_err(database_error)?;
193        Ok(())
194    }
195
196    async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
197        tx.commit().await.map_err(database_error)
198    }
199}
200
201/// PostgreSQL implementation of [`OutboxPublisher`] backed by `sqlx::PgPool`.
202///
203/// Cheap to clone (the pool and the cached insert statement are reference-counted).
204#[derive(Debug, Clone)]
205pub struct PgOutboxPublisher {
206    pool: PgPool,
207    table_name: Arc<str>,
208    insert_sql: Arc<str>,
209}
210
211impl PgOutboxPublisher {
212    /// Create a new publisher for the given pool and table.
213    ///
214    /// # Errors
215    ///
216    /// Returns [`OutboxError::Internal`] if `table_name` is not a valid
217    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
218    pub fn new(pool: PgPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
219        let table_name = table_name.into();
220        validate_table_name(&table_name)?;
221        let insert_sql = DIALECT.insert_sql(&table_name);
222        Ok(Self {
223            pool,
224            table_name: Arc::from(table_name),
225            insert_sql: Arc::from(insert_sql),
226        })
227    }
228
229    /// Underlying pool, exposed for callers that open their own transactions.
230    #[must_use]
231    pub fn pool(&self) -> &PgPool {
232        &self.pool
233    }
234
235    /// Configured table name.
236    #[must_use]
237    pub fn table_name(&self) -> &str {
238        &self.table_name
239    }
240}
241
242impl OutboxPublisher for PgOutboxPublisher {
243    type Tx<'tx> = Transaction<'tx, Postgres>;
244
245    async fn publish_in_tx<E: Event>(
246        &self,
247        tx: &mut Self::Tx<'_>,
248        event: &E,
249    ) -> Result<Uuid, OutboxError> {
250        let event_id = Uuid::now_v7();
251        let payload = serde_json::to_value(event)?;
252        sqlx::query(&self.insert_sql)
253            .bind(event_id)
254            .bind(E::EVENT_TYPE)
255            .bind(payload)
256            .bind(Option::<Uuid>::None)
257            .execute(&mut **tx)
258            .await
259            .map_err(database_error)?;
260        Ok(event_id)
261    }
262
263    async fn publish_in_tx_with_subject<E: Event>(
264        &self,
265        tx: &mut Self::Tx<'_>,
266        subject_id: Uuid,
267        event: &E,
268    ) -> Result<Uuid, OutboxError> {
269        let event_id = Uuid::now_v7();
270        let payload = serde_json::to_value(event)?;
271        sqlx::query(&self.insert_sql)
272            .bind(event_id)
273            .bind(E::EVENT_TYPE)
274            .bind(payload)
275            .bind(Some(subject_id))
276            .execute(&mut **tx)
277            .await
278            .map_err(database_error)?;
279        Ok(event_id)
280    }
281
282    async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
283        let mut tx = self.pool.begin().await.map_err(database_error)?;
284        let event_id = self.publish_in_tx(&mut tx, event).await?;
285        tx.commit().await.map_err(database_error)?;
286        Ok(event_id)
287    }
288}
289
290/// Fluent builder for an [`OutboxWorker`] backed by [`PgOutboxStore`].
291pub struct PgOutboxWorkerBuilder {
292    pool: PgPool,
293    table_name: String,
294    handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
295    config: OutboxWorkerConfig,
296}
297
298impl PgOutboxWorkerBuilder {
299    /// Start a new builder for the given pool.
300    #[must_use]
301    pub fn new(pool: PgPool) -> Self {
302        Self {
303            pool,
304            table_name: DEFAULT_TABLE_NAME.to_owned(),
305            handlers: HashMap::new(),
306            config: OutboxWorkerConfig::default(),
307        }
308    }
309
310    /// Override the outbox table name (default `"audit_outbox"`).
311    #[must_use]
312    pub fn table_name(mut self, name: impl Into<String>) -> Self {
313        self.table_name = name.into();
314        self
315    }
316
317    /// Register a typed handler for the event type `E`.
318    ///
319    /// Registering twice for the same event type silently replaces the
320    /// previous handler.
321    #[must_use]
322    pub fn register_handler<E, H>(mut self, handler: H) -> Self
323    where
324        E: Event,
325        H: Handler<E>,
326    {
327        let typed = TypedHandler::<E, H>::new(handler);
328        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
329        self.handlers.insert(E::EVENT_TYPE, erased);
330        self
331    }
332
333    /// Register a handler already shared behind an `Arc`.
334    #[must_use]
335    pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
336    where
337        E: Event,
338        H: Handler<E>,
339    {
340        let typed = TypedHandler::<E, H>::shared(handler);
341        let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
342        self.handlers.insert(E::EVENT_TYPE, erased);
343        self
344    }
345
346    /// Override the poll interval (default 100 ms).
347    #[must_use]
348    pub fn poll_interval(mut self, d: Duration) -> Self {
349        self.config.poll_interval = d;
350        self
351    }
352
353    /// Override the batch size per poll (default 10).
354    #[must_use]
355    pub fn batch_size(mut self, n: usize) -> Self {
356        self.config.batch_size = n;
357        self
358    }
359
360    /// Override the maximum number of attempts per envelope (default 5).
361    #[must_use]
362    pub fn max_attempts(mut self, n: u32) -> Self {
363        self.config.max_attempts = n;
364        self
365    }
366
367    /// Override the constant retry delay between failed attempts (default 5 s).
368    #[must_use]
369    pub fn retry_delay(mut self, d: Duration) -> Self {
370        self.config.retry_delay = d;
371        self
372    }
373
374    /// Consume the builder and produce an [`OutboxWorker`] ready to spawn.
375    ///
376    /// # Errors
377    ///
378    /// Returns [`OutboxError::Internal`] if the configured `table_name`
379    /// is not a valid identifier.
380    pub fn build(self) -> Result<OutboxWorker<PgOutboxStore>, OutboxError> {
381        let store = PgOutboxStore::new(self.pool, self.table_name)?;
382        Ok(OutboxWorker::new(store, self.handlers, self.config))
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use hexeract_core::HandlerContext;
390    use serde::Deserialize;
391    use serde::Serialize;
392
393    fn lazy_pool() -> PgPool {
394        PgPool::connect_lazy("postgres://nobody:nobody@127.0.0.1:1/nobody")
395            .expect("lazy pool must build from a valid URL")
396    }
397
398    #[derive(Debug, Serialize, Deserialize)]
399    struct UserRegistered {
400        user_id: Uuid,
401    }
402
403    impl Event for UserRegistered {
404        const EVENT_TYPE: &'static str = "users.registered";
405    }
406
407    #[derive(Debug, Serialize, Deserialize)]
408    struct OrderPlaced {
409        order_id: Uuid,
410    }
411
412    impl Event for OrderPlaced {
413        const EVENT_TYPE: &'static str = "orders.placed";
414    }
415
416    struct NoopHandler;
417
418    impl Handler<UserRegistered> for NoopHandler {
419        type Error = OutboxError;
420        async fn handle(
421            &self,
422            _event: UserRegistered,
423            _ctx: &HandlerContext,
424        ) -> Result<(), Self::Error> {
425            Ok(())
426        }
427    }
428
429    impl Handler<OrderPlaced> for NoopHandler {
430        type Error = OutboxError;
431        async fn handle(
432            &self,
433            _event: OrderPlaced,
434            _ctx: &HandlerContext,
435        ) -> Result<(), Self::Error> {
436            Ok(())
437        }
438    }
439
440    #[tokio::test]
441    async fn store_new_rejects_invalid_table_name() {
442        let err = PgOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
443        assert!(matches!(err, OutboxError::Internal(_)));
444    }
445
446    #[tokio::test]
447    async fn store_new_caches_postgres_sql_with_validated_table_name() {
448        let store = PgOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
449        assert_eq!(store.table_name(), "audit_outbox");
450        assert!(store.poll_sql.contains("FROM audit_outbox"));
451        assert!(store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
452        assert!(store.mark_delivered_sql.contains("UPDATE audit_outbox"));
453        assert!(store.mark_failed_sql.contains("attempts = attempts + 1"));
454    }
455
456    #[tokio::test]
457    async fn publisher_new_rejects_invalid_table_name() {
458        let err = PgOutboxPublisher::new(lazy_pool(), "bad name; DROP").unwrap_err();
459        assert!(matches!(err, OutboxError::Internal(_)));
460    }
461
462    #[tokio::test]
463    async fn publisher_new_caches_insert_sql_with_validated_table_name() {
464        let publisher = PgOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
465        assert_eq!(publisher.table_name(), "audit_outbox");
466        assert!(publisher.insert_sql.contains("INSERT INTO audit_outbox"));
467        assert!(publisher.insert_sql.contains("$1, $2, $3, $4"));
468    }
469
470    #[tokio::test]
471    async fn builder_starts_with_default_table_and_empty_handlers() {
472        let builder = PgOutboxWorkerBuilder::new(lazy_pool());
473        assert_eq!(builder.table_name, DEFAULT_TABLE_NAME);
474        assert!(builder.handlers.is_empty());
475        let default_cfg = OutboxWorkerConfig::default();
476        assert_eq!(builder.config.batch_size, default_cfg.batch_size);
477        assert_eq!(builder.config.max_attempts, default_cfg.max_attempts);
478    }
479
480    #[tokio::test]
481    async fn builder_table_name_can_be_customized() {
482        let builder = PgOutboxWorkerBuilder::new(lazy_pool()).table_name("my_outbox");
483        assert_eq!(builder.table_name, "my_outbox");
484    }
485
486    #[tokio::test]
487    async fn builder_register_handler_records_event_types() {
488        let builder = PgOutboxWorkerBuilder::new(lazy_pool())
489            .register_handler::<UserRegistered, _>(NoopHandler)
490            .register_handler::<OrderPlaced, _>(NoopHandler);
491        assert_eq!(builder.handlers.len(), 2);
492        assert!(builder.handlers.contains_key("users.registered"));
493        assert!(builder.handlers.contains_key("orders.placed"));
494    }
495
496    #[tokio::test]
497    async fn builder_register_handler_twice_replaces_silently() {
498        let builder = PgOutboxWorkerBuilder::new(lazy_pool())
499            .register_handler::<UserRegistered, _>(NoopHandler)
500            .register_handler::<UserRegistered, _>(NoopHandler);
501        assert_eq!(builder.handlers.len(), 1);
502    }
503
504    #[tokio::test]
505    async fn builder_build_rejects_invalid_table_name() {
506        let result = PgOutboxWorkerBuilder::new(lazy_pool())
507            .table_name("bad name; DROP TABLE")
508            .build();
509        assert!(matches!(result, Err(OutboxError::Internal(_))));
510    }
511
512    #[tokio::test]
513    async fn builder_build_with_default_table_name_succeeds() {
514        let worker = PgOutboxWorkerBuilder::new(lazy_pool()).build();
515        assert!(worker.is_ok());
516    }
517}