Skip to main content

outbox_postgres/
lib.rs

1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use serde::Serialize;
4use sqlx::postgres::PgListener;
5use sqlx::types::uuid;
6use sqlx::{Executor, PgPool, Postgres};
7use std::fmt::Debug;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::debug;
11
12#[derive(Clone)]
13pub struct PostgresOutbox<P>
14where
15    P: Debug + Clone + Serialize + Send + Sync,
16{
17    inner: Arc<PostgresOutboxInner<P>>,
18}
19
20impl<P> PostgresOutbox<P>
21where
22    P: Debug + Clone + Serialize + Send + Sync,
23{
24    pub fn new(pool: PgPool, config: Arc<OutboxConfig<P>>) -> Self {
25        Self {
26            inner: Arc::new(PostgresOutboxInner {
27                pool,
28                config,
29                listener: Mutex::new(None),
30            }),
31        }
32    }
33}
34
35struct PostgresOutboxInner<P>
36where
37    P: Debug + Clone + Serialize + Send + Sync,
38{
39    pool: PgPool,
40    config: Arc<OutboxConfig<P>>,
41    listener: Mutex<Option<PgListener>>,
42}
43
44#[async_trait]
45impl<P> OutboxStorage<P> for PostgresOutbox<P>
46where
47    P: Debug + Clone + Serialize + Send + Sync + for<'de> serde::Deserialize<'de> + Unpin + 'static,
48{
49    async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event<P>>, OutboxError> {
50        let record = sqlx::query_as::<_, Event<P>>(
51            r"
52                UPDATE outbox_events
53                SET status = 'Processing',
54                    locked_until = NOW() + (INTERVAL '1 minute' * $2)
55                WHERE id IN (
56                    SELECT id
57                    FROM outbox_events
58                    WHERE status='Pending'
59                        OR (status='Processing' AND locked_until < NOW())
60                    ORDER BY locked_until ASC
61                    LIMIT $1
62                    FOR UPDATE SKIP LOCKED
63                )
64                RETURNING
65                id,
66                idempotency_token,
67                event_type,
68                payload,
69                status,
70                created_at,
71                locked_until
72            ",
73        )
74        .bind(i64::from(limit))
75        .bind(self.inner.config.lock_timeout_mins)
76        .fetch_all(&self.inner.pool)
77        .await
78        .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
79        Ok(record)
80    }
81
82    async fn update_status(&self, ids: &[EventId], status: EventStatus) -> Result<(), OutboxError> {
83        let raw_ids: Vec<uuid::Uuid> = ids.iter().map(EventId::as_uuid).collect();
84
85        sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
86            .bind(status)
87            .bind(&raw_ids)
88            .execute(&self.inner.pool)
89            .await
90            .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
91
92        Ok(())
93    }
94
95    async fn delete_garbage(&self) -> Result<(), OutboxError> {
96        let result = sqlx::query(
97            r"
98            DELETE
99            FROM outbox_events
100            WHERE id IN (
101                SELECT id FROM outbox_events
102                WHERE status='Sent'
103                    AND created_at < now() - (INTERVAL '1 day' * $1)
104                LIMIT 5000
105            )",
106        )
107        .bind(self.inner.config.retention_days)
108        .execute(&self.inner.pool)
109        .await
110        .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
111        debug!(
112            "Garbage collector: deleted {} old messages",
113            result.rows_affected()
114        );
115        Ok(())
116    }
117
118    async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
119        let mut guard = self.inner.listener.lock().await;
120
121        if guard.is_none() {
122            let mut listener = PgListener::connect_with(&self.inner.pool)
123                .await
124                .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
125
126            listener
127                .listen(channel)
128                .await
129                .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
130
131            *guard = Some(listener);
132        }
133        match guard
134            .as_mut()
135            .expect("Listener initialized above")
136            .recv()
137            .await
138        {
139            Ok(_) => Ok(()),
140            Err(e) => {
141                *guard = None;
142                Err(OutboxError::DatabaseError(e.to_string()))
143            }
144        }
145    }
146
147    #[cfg(feature = "dlq")]
148    async fn quarantine_events(&self, entries: &[DlqEntry]) -> Result<(), OutboxError> {
149        if entries.is_empty() {
150            return Ok(());
151        }
152        let ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id.as_uuid()).collect();
153        let failure_counts: Vec<i32> = entries
154            .iter()
155            .map(|e| i32::try_from(e.failure_count).unwrap_or(i32::MAX))
156            .collect();
157        let last_errors: Vec<Option<String>> =
158            entries.iter().map(|e| e.last_error.clone()).collect();
159        let result = sqlx::query(
160            r"
161            WITH deleted AS (
162                DELETE FROM outbox_events
163                WHERE id = ANY($1::uuid[])
164                RETURNING
165                    id,
166                    idempotency_token,
167                    event_type,
168                    payload,
169                    status,
170                    created_at,
171                    locked_until
172            )
173            INSERT INTO outbox_dead_letters (
174                id,
175                idempotency_token,
176                event_type,
177                payload,
178                original_status,
179                created_at,
180                locked_until,
181                failure_count,
182                last_error
183            )
184            SELECT
185                d.id,
186                d.idempotency_token,
187                d.event_type,
188                d.payload,
189                d.status,
190                d.created_at,
191                d.locked_until,
192                f.failure_count,
193                f.last_error
194            FROM deleted AS d
195            JOIN unnest($1::uuid[], $2::int[], $3::text[])
196                    AS f(id, failure_count, last_error)
197                ON d.id = f.id
198            ",
199        )
200        .bind(&ids)
201        .bind(&failure_counts)
202        .bind(&last_errors)
203        .execute(&self.inner.pool)
204        .await
205        .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
206
207        debug!(
208            "DLQ reaper: quarantined {}/{} events",
209            result.rows_affected(),
210            entries.len()
211        );
212        Ok(())
213    }
214}
215
216pub struct PostgresWriter<E>(pub E);
217
218#[async_trait]
219impl<E, P> OutboxWriter<P> for PostgresWriter<E>
220where
221    for<'c> &'c E: Executor<'c, Database = Postgres>,
222    E: Send + Sync,
223    P: Debug + Clone + Serialize + Send + Sync + 'static,
224{
225    async fn insert_event(&self, event: Event<P>) -> Result<(), OutboxError> {
226        sqlx::query(
227            r"
228        INSERT INTO outbox_events (id, idempotency_token, event_type, payload, status, created_at, locked_until)
229        VALUES ($1, $2, $3, $4, $5, $6, $7)
230        ",
231        )
232            .bind(event.id.as_uuid())
233            .bind(event.idempotency_token)
234            .bind(event.event_type.as_str())
235            .bind(serde_json::to_value(&event.payload).map_err(|e| OutboxError::DatabaseError(e.to_string()))?)
236            .bind(event.status)
237            .bind(event.created_at)
238            .bind(event.locked_until)
239            .execute(&self.0)
240            .await
241            .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
242
243        Ok(())
244    }
245}