Skip to main content

outbox_postgres/
lib.rs

1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use sqlx::types::uuid;
4use sqlx::{Executor, PgPool, Postgres};
5use std::sync::Arc;
6use sqlx::postgres::PgListener;
7use tokio::sync::Mutex;
8use tracing::debug;
9
10#[derive(Clone)]
11pub struct PostgresOutbox {
12    inner: Arc<PostgresOutboxInner>,
13}
14
15impl PostgresOutbox {
16    pub fn new(pool: PgPool, config: Arc<OutboxConfig>) -> Self {
17        Self {
18            inner: Arc::new(PostgresOutboxInner {
19                pool,
20                config,
21                listener: Mutex::new(None),
22            }),
23        }
24    }
25}
26
27struct PostgresOutboxInner {
28    pool: PgPool,
29    config: Arc<OutboxConfig>,
30    listener: Mutex<Option<PgListener>>,
31}
32
33#[async_trait]
34impl OutboxStorage for PostgresOutbox {
35    async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event>, OutboxError> {
36        let record = sqlx::query_as::<_, Event>(
37            r"
38                UPDATE outbox_events
39                SET status = 'Processing',
40                    locked_until = NOW() + (INTERVAL '1 minute' * $2)
41                WHERE id IN (
42                    SELECT id
43                    FROM outbox_events
44                    WHERE status='Pending'
45                        OR (status='Processing' AND locked_until < NOW())
46                    ORDER BY locked_until ASC
47                    LIMIT $1
48                    FOR UPDATE SKIP LOCKED
49                )
50                RETURNING
51                id,
52                idempotency_token,
53                event_type,
54                payload,
55                status,
56                created_at,
57                locked_until
58            ",
59        )
60            .bind(i64::from(limit))
61            .bind(self.inner.config.lock_timeout_mins)
62            .fetch_all(&self.inner.pool)
63            .await
64            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
65        Ok(record)
66    }
67
68    async fn updates_status(
69        &self,
70        ids: &Vec<EventId>,
71        status: EventStatus,
72    ) -> Result<(), OutboxError> {
73        let raw_ids: Vec<uuid::Uuid> = ids.iter().map(EventId::as_uuid).collect();
74
75        sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
76            .bind(status)
77            .bind(&raw_ids)
78            .execute(&self.inner.pool)
79            .await
80            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
81
82        Ok(())
83    }
84
85    async fn delete_garbage(&self) -> Result<(), OutboxError> {
86        let result = sqlx::query(
87            r"
88            DELETE
89            FROM outbox_events
90            WHERE id IN (
91                SELECT id FROM outbox_events
92                WHERE status='Sent'
93                    AND created_at < now() - (INTERVAL '1 day' * $1)
94                LIMIT 5000
95            )",
96        )
97            .bind(self.inner.config.retention_days)
98            .execute(&self.inner.pool)
99            .await
100            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
101        debug!(
102            "Garbage collector: deleted {} old messages",
103            result.rows_affected()
104        );
105        Ok(())
106    }
107
108    async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
109
110        let mut guard = self.inner.listener.lock().await;
111
112        if guard.is_none() {
113            let mut listener = sqlx::postgres::PgListener::connect_with(&self.inner.pool)
114                .await
115                .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
116
117            listener
118                .listen(channel)
119                .await
120                .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
121
122            *guard = Some(listener);
123        }
124        match guard.as_mut().unwrap().recv().await {
125            Ok(_) => Ok(()),
126            Err(e) => {
127                *guard = None;
128                Err(OutboxError::InfrastructureError(e.to_string()))
129            }
130        }
131    }
132}
133
134pub struct PostgresWriter<E>(pub E);
135
136#[async_trait]
137impl<'a, E> OutboxWriter for PostgresWriter<E>
138where
139        for<'c> &'c E: Executor<'c, Database = Postgres>,
140        E: Send + Sync,
141{
142    async fn insert_event(&self, event: Event) -> Result<(), OutboxError> {
143        sqlx::query(
144            r"
145        INSERT INTO outbox_events (id, event_type, payload, status, created_at, locked_until)
146        VALUES ($1, $2, $3, $4, $5, $6)
147        ",
148        )
149            .bind(event.id.as_uuid())
150            .bind(event.event_type.as_str())
151            .bind(event.payload.as_json())
152            .bind(event.status)
153            .bind(event.created_at)
154            .bind(event.locked_until)
155            .execute(&self.0)
156            .await
157            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
158
159        Ok(())
160    }
161}
162
163//TODO: Create tests: