Skip to main content

outbox_postgres/
lib.rs

1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use serde::Serialize;
4use sqlx::postgres::PgListener;
5use sqlx::types::uuid;
6use sqlx::{Executor, PgPool, Postgres};
7use std::fmt::Debug;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::debug;
11
12#[derive(Clone)]
13pub struct PostgresOutbox<P>
14where
15    P: Debug + Clone + Serialize + Send + Sync,
16{
17    inner: Arc<PostgresOutboxInner<P>>,
18}
19
20impl<P> PostgresOutbox<P>
21where
22    P: Debug + Clone + Serialize + Send + Sync,
23{
24    pub fn new(pool: PgPool, config: Arc<OutboxConfig<P>>) -> Self {
25        Self {
26            inner: Arc::new(PostgresOutboxInner {
27                pool,
28                config,
29                listener: Mutex::new(None),
30            }),
31        }
32    }
33}
34
35struct PostgresOutboxInner<P>
36where
37    P: Debug + Clone + Serialize + Send + Sync,
38{
39    pool: PgPool,
40    config: Arc<OutboxConfig<P>>,
41    listener: Mutex<Option<PgListener>>,
42}
43
44#[async_trait]
45impl<P> OutboxStorage<P> for PostgresOutbox<P>
46where
47    P: Debug + Clone + Serialize + Send + Sync + for<'de> serde::Deserialize<'de> + Unpin + 'static,
48{
49    async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event<P>>, OutboxError> {
50        let record = sqlx::query_as::<_, Event<P>>(
51            r"
52                UPDATE outbox_events
53                SET status = 'Processing',
54                    locked_until = NOW() + (INTERVAL '1 minute' * $2)
55                WHERE id IN (
56                    SELECT id
57                    FROM outbox_events
58                    WHERE status='Pending'
59                        OR (status='Processing' AND locked_until < NOW())
60                    ORDER BY locked_until ASC
61                    LIMIT $1
62                    FOR UPDATE SKIP LOCKED
63                )
64                RETURNING
65                id,
66                idempotency_token,
67                event_type,
68                payload,
69                status,
70                created_at,
71                locked_until
72            ",
73        )
74        .bind(i64::from(limit))
75        .bind(self.inner.config.lock_timeout_mins)
76        .fetch_all(&self.inner.pool)
77        .await
78        .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
79        Ok(record)
80    }
81
82    async fn updates_status(
83        &self,
84        ids: &[EventId],
85        status: EventStatus,
86    ) -> Result<(), OutboxError> {
87        let raw_ids: Vec<uuid::Uuid> = ids.iter().map(EventId::as_uuid).collect();
88
89        sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
90            .bind(status)
91            .bind(&raw_ids)
92            .execute(&self.inner.pool)
93            .await
94            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
95
96        Ok(())
97    }
98
99    async fn delete_garbage(&self) -> Result<(), OutboxError> {
100        let result = sqlx::query(
101            r"
102            DELETE
103            FROM outbox_events
104            WHERE id IN (
105                SELECT id FROM outbox_events
106                WHERE status='Sent'
107                    AND created_at < now() - (INTERVAL '1 day' * $1)
108                LIMIT 5000
109            )",
110        )
111        .bind(self.inner.config.retention_days)
112        .execute(&self.inner.pool)
113        .await
114        .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
115        debug!(
116            "Garbage collector: deleted {} old messages",
117            result.rows_affected()
118        );
119        Ok(())
120    }
121
122    async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
123        let mut guard = self.inner.listener.lock().await;
124
125        if guard.is_none() {
126            let mut listener = PgListener::connect_with(&self.inner.pool)
127                .await
128                .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
129
130            listener
131                .listen(channel)
132                .await
133                .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
134
135            *guard = Some(listener);
136        }
137        match guard
138            .as_mut()
139            .expect("Listener initialized above")
140            .recv()
141            .await
142        {
143            Ok(_) => Ok(()),
144            Err(e) => {
145                *guard = None;
146                Err(OutboxError::InfrastructureError(e.to_string()))
147            }
148        }
149    }
150}
151
152pub struct PostgresWriter<E>(pub E);
153
154#[async_trait]
155impl<E, P> OutboxWriter<P> for PostgresWriter<E>
156where
157    for<'c> &'c E: Executor<'c, Database = Postgres>,
158    E: Send + Sync,
159    P: Debug + Clone + Serialize + Send + Sync + 'static,
160{
161    async fn insert_event(&self, event: Event<P>) -> Result<(), OutboxError> {
162        sqlx::query(
163            r"
164        INSERT INTO outbox_events (id, idempotency_token, event_type, payload, status, created_at, locked_until)
165        VALUES ($1, $2, $3, $4, $5, $6, $7)
166        ",
167        )
168            .bind(event.id.as_uuid())
169            .bind(event.idempotency_token)
170            .bind(event.event_type.as_str())
171            .bind(serde_json::to_value(&event.payload).map_err(|e| OutboxError::InfrastructureError(e.to_string()))?)
172            .bind(event.status)
173            .bind(event.created_at)
174            .bind(event.locked_until)
175            .execute(&self.0)
176            .await
177            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
178
179        Ok(())
180    }
181}
182
183//TODO: Create tests: