1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use sqlx::postgres::PgListener;
4use sqlx::types::uuid;
5use sqlx::{Executor, PgPool, Postgres};
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use tracing::debug;
9
10#[derive(Clone)]
11pub struct PostgresOutbox {
12 inner: Arc<PostgresOutboxInner>,
13}
14
15impl PostgresOutbox {
16 pub fn new(pool: PgPool, config: Arc<OutboxConfig>) -> Self {
17 Self {
18 inner: Arc::new(PostgresOutboxInner {
19 pool,
20 config,
21 listener: Mutex::new(None),
22 }),
23 }
24 }
25}
26
27struct PostgresOutboxInner {
28 pool: PgPool,
29 config: Arc<OutboxConfig>,
30 listener: Mutex<Option<PgListener>>,
31}
32
33#[async_trait]
34impl OutboxStorage for PostgresOutbox {
35 async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event>, OutboxError> {
36 let record = sqlx::query_as::<_, Event>(
37 r"
38 UPDATE outbox_events
39 SET status = 'Processing',
40 locked_until = NOW() + (INTERVAL '1 minute' * $2)
41 WHERE id IN (
42 SELECT id
43 FROM outbox_events
44 WHERE status='Pending'
45 OR (status='Processing' AND locked_until < NOW())
46 ORDER BY locked_until ASC
47 LIMIT $1
48 FOR UPDATE SKIP LOCKED
49 )
50 RETURNING
51 id,
52 idempotency_token,
53 event_type,
54 payload,
55 status,
56 created_at,
57 locked_until
58 ",
59 )
60 .bind(i64::from(limit))
61 .bind(self.inner.config.lock_timeout_mins)
62 .fetch_all(&self.inner.pool)
63 .await
64 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
65 Ok(record)
66 }
67
68 async fn updates_status(
69 &self,
70 ids: &[EventId],
71 status: EventStatus,
72 ) -> Result<(), OutboxError> {
73 let raw_ids: Vec<uuid::Uuid> = ids.iter().map(EventId::as_uuid).collect();
74
75 sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
76 .bind(status)
77 .bind(&raw_ids)
78 .execute(&self.inner.pool)
79 .await
80 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
81
82 Ok(())
83 }
84
85 async fn delete_garbage(&self) -> Result<(), OutboxError> {
86 let result = sqlx::query(
87 r"
88 DELETE
89 FROM outbox_events
90 WHERE id IN (
91 SELECT id FROM outbox_events
92 WHERE status='Sent'
93 AND created_at < now() - (INTERVAL '1 day' * $1)
94 LIMIT 5000
95 )",
96 )
97 .bind(self.inner.config.retention_days)
98 .execute(&self.inner.pool)
99 .await
100 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
101 debug!(
102 "Garbage collector: deleted {} old messages",
103 result.rows_affected()
104 );
105 Ok(())
106 }
107
108 async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
109 let mut guard = self.inner.listener.lock().await;
110
111 if guard.is_none() {
112 let mut listener = PgListener::connect_with(&self.inner.pool)
113 .await
114 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
115
116 listener
117 .listen(channel)
118 .await
119 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
120
121 *guard = Some(listener);
122 }
123 match guard
124 .as_mut()
125 .expect("Listener initialized above")
126 .recv()
127 .await
128 {
129 Ok(_) => Ok(()),
130 Err(e) => {
131 *guard = None;
132 Err(OutboxError::InfrastructureError(e.to_string()))
133 }
134 }
135 }
136}
137
138pub struct PostgresWriter<E>(pub E);
139
140#[async_trait]
141impl<E> OutboxWriter for PostgresWriter<E>
142where
143 for<'c> &'c E: Executor<'c, Database = Postgres>,
144 E: Send + Sync,
145{
146 async fn insert_event(&self, event: Event) -> Result<(), OutboxError> {
147 sqlx::query(
148 r"
149 INSERT INTO outbox_events (id, idempotency_token, event_type, payload, status, created_at, locked_until)
150 VALUES ($1, $2, $3, $4, $5, $6, $7)
151 ",
152 )
153 .bind(event.id.as_uuid())
154 .bind(event.idempotency_token)
155 .bind(event.event_type.as_str())
156 .bind(event.payload.as_json())
157 .bind(event.status)
158 .bind(event.created_at)
159 .bind(event.locked_until)
160 .execute(&self.0)
161 .await
162 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
163
164 Ok(())
165 }
166}
167
168