Skip to main content

outbox_postgres/
lib.rs

1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use sqlx::types::uuid;
4use sqlx::{Executor, PgPool, Postgres};
5use std::sync::Arc;
6use tracing::debug;
7
8#[derive(Clone)]
9pub struct PostgresOutbox {
10    pool: PgPool,
11    config: Arc<OutboxConfig>,
12}
13
14impl PostgresOutbox {
15    pub fn new(pool: PgPool, config: Arc<OutboxConfig>) -> Self {
16        Self { pool, config }
17    }
18}
19
20#[async_trait]
21impl OutboxStorage for PostgresOutbox {
22    async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<OutboxSlot>, OutboxError> {
23        let record = sqlx::query_as::<_, OutboxSlot>(
24            r"
25                UPDATE outbox_events
26                SET status = 'Processing',
27                    locked_until = NOW() + (INTERVAL '1 minute' * $2)
28                WHERE id IN (
29                    SELECT id
30                    FROM outbox_events
31                    WHERE status='Pending'
32                        OR (status='Processing' AND locked_until < NOW())
33                    ORDER BY locked_until ASC
34                    LIMIT $1
35                    FOR UPDATE SKIP LOCKED
36                )
37                RETURNING
38                id,
39                event_type,
40                payload,
41                status,
42                created_at,
43                locked_until
44            ",
45        )
46        .bind(i64::from(limit))
47        .bind(self.config.lock_timeout_mins)
48        .fetch_all(&self.pool)
49        .await
50        .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
51        Ok(record)
52    }
53
54    async fn updates_status(
55        &self,
56        ids: &Vec<SlotId>,
57        status: SlotStatus,
58    ) -> Result<(), OutboxError> {
59        let raw_ids: Vec<uuid::Uuid> = ids.iter().map(SlotId::as_uuid).collect();
60
61        sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
62            .bind(status)
63            .bind(&raw_ids)
64            .execute(&self.pool)
65            .await
66            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
67
68        Ok(())
69    }
70
71    async fn delete_garbage(&self) -> Result<(), OutboxError> {
72        let result = sqlx::query(
73            r"
74            DELETE
75            FROM outbox_events
76            WHERE id IN (
77                SELECT id FROM outbox_events
78                WHERE status='Sent'
79                    AND created_at < now() - (INTERVAL '1 day' * $1)
80                LIMIT 5000
81            )",
82        )
83        .bind(self.config.retention_days)
84        .execute(&self.pool)
85        .await
86        .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
87        debug!(
88            "Garbage collector: deleted {} old messages",
89            result.rows_affected()
90        );
91        Ok(())
92    }
93
94    async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
95        let mut listener = sqlx::postgres::PgListener::connect_with(&self.pool)
96            .await
97            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
98
99        listener
100            .listen(channel)
101            .await
102            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
103
104        listener
105            .recv()
106            .await
107            .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
108
109        Ok(())
110    }
111}
112
113pub struct PostgresWriter<E>(pub E);
114
115#[async_trait]
116impl<'a, E> OutboxWriter for PostgresWriter<E>
117where
118    for<'c> &'c E: Executor<'c, Database = Postgres>,
119    E: Send + Sync,
120{
121    async fn insert_event(&self, event: OutboxSlot) -> Result<(), OutboxError> {
122        sqlx::query(
123            r"
124        INSERT INTO outbox_events (id, event_type, payload, status, created_at, locked_until)
125        VALUES ($1, $2, $3, $4, $5, $6)
126        ",
127        )
128        .bind(event.id.as_uuid())
129        .bind(event.event_type.as_str())
130        .bind(event.payload.as_json())
131        .bind(event.status)
132        .bind(event.created_at)
133        .bind(event.locked_until)
134        .execute(&self.0)
135        .await
136        .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
137
138        Ok(())
139    }
140}
141
142//TODO: Create tests: