1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use serde::Serialize;
4use sqlx::postgres::PgListener;
5use sqlx::types::uuid;
6use sqlx::{Executor, PgPool, Postgres};
7use std::fmt::Debug;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::debug;
11
12#[derive(Clone)]
13pub struct PostgresOutbox<P>
14where
15 P: Debug + Clone + Serialize + Send + Sync,
16{
17 inner: Arc<PostgresOutboxInner<P>>,
18}
19
20impl<P> PostgresOutbox<P>
21where
22 P: Debug + Clone + Serialize + Send + Sync,
23{
24 pub fn new(pool: PgPool, config: Arc<OutboxConfig<P>>) -> Self {
25 Self {
26 inner: Arc::new(PostgresOutboxInner {
27 pool,
28 config,
29 listener: Mutex::new(None),
30 }),
31 }
32 }
33}
34
35struct PostgresOutboxInner<P>
36where
37 P: Debug + Clone + Serialize + Send + Sync,
38{
39 pool: PgPool,
40 config: Arc<OutboxConfig<P>>,
41 listener: Mutex<Option<PgListener>>,
42}
43
44#[async_trait]
45impl<P> OutboxStorage<P> for PostgresOutbox<P>
46where
47 P: Debug + Clone + Serialize + Send + Sync + for<'de> serde::Deserialize<'de> + Unpin + 'static,
48{
49 async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event<P>>, OutboxError> {
50 let record = sqlx::query_as::<_, Event<P>>(
51 r"
52 UPDATE outbox_events
53 SET status = 'Processing',
54 locked_until = NOW() + (INTERVAL '1 minute' * $2)
55 WHERE id IN (
56 SELECT id
57 FROM outbox_events
58 WHERE status='Pending'
59 OR (status='Processing' AND locked_until < NOW())
60 ORDER BY locked_until ASC
61 LIMIT $1
62 FOR UPDATE SKIP LOCKED
63 )
64 RETURNING
65 id,
66 idempotency_token,
67 event_type,
68 payload,
69 status,
70 created_at,
71 locked_until
72 ",
73 )
74 .bind(i64::from(limit))
75 .bind(self.inner.config.lock_timeout_mins)
76 .fetch_all(&self.inner.pool)
77 .await
78 .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
79 Ok(record)
80 }
81
82 async fn update_status(&self, ids: &[EventId], status: EventStatus) -> Result<(), OutboxError> {
83 let raw_ids: Vec<uuid::Uuid> = ids.iter().map(EventId::as_uuid).collect();
84
85 sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
86 .bind(status)
87 .bind(&raw_ids)
88 .execute(&self.inner.pool)
89 .await
90 .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
91
92 Ok(())
93 }
94
95 async fn delete_garbage(&self) -> Result<(), OutboxError> {
96 let result = sqlx::query(
97 r"
98 DELETE
99 FROM outbox_events
100 WHERE id IN (
101 SELECT id FROM outbox_events
102 WHERE status='Sent'
103 AND created_at < now() - (INTERVAL '1 day' * $1)
104 LIMIT 5000
105 )",
106 )
107 .bind(self.inner.config.retention_days)
108 .execute(&self.inner.pool)
109 .await
110 .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
111 debug!(
112 "Garbage collector: deleted {} old messages",
113 result.rows_affected()
114 );
115 Ok(())
116 }
117
118 async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
119 let mut guard = self.inner.listener.lock().await;
120
121 if guard.is_none() {
122 let mut listener = PgListener::connect_with(&self.inner.pool)
123 .await
124 .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
125
126 listener
127 .listen(channel)
128 .await
129 .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
130
131 *guard = Some(listener);
132 }
133 match guard
134 .as_mut()
135 .expect("Listener initialized above")
136 .recv()
137 .await
138 {
139 Ok(_) => Ok(()),
140 Err(e) => {
141 *guard = None;
142 Err(OutboxError::DatabaseError(e.to_string()))
143 }
144 }
145 }
146
147 #[cfg(feature = "dlq")]
148 async fn quarantine_events(&self, entries: &[DlqEntry]) -> Result<(), OutboxError> {
149 if entries.is_empty() {
150 return Ok(());
151 }
152 let ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id.as_uuid()).collect();
153 let failure_counts: Vec<i32> = entries
154 .iter()
155 .map(|e| i32::try_from(e.failure_count).unwrap_or(i32::MAX))
156 .collect();
157 let last_errors: Vec<Option<String>> =
158 entries.iter().map(|e| e.last_error.clone()).collect();
159 let result = sqlx::query(
160 r"
161 WITH deleted AS (
162 DELETE FROM outbox_events
163 WHERE id = ANY($1::uuid[])
164 RETURNING
165 id,
166 idempotency_token,
167 event_type,
168 payload,
169 status,
170 created_at,
171 locked_until
172 )
173 INSERT INTO outbox_dead_letters (
174 id,
175 idempotency_token,
176 event_type,
177 payload,
178 original_status,
179 created_at,
180 locked_until,
181 failure_count,
182 last_error
183 )
184 SELECT
185 d.id,
186 d.idempotency_token,
187 d.event_type,
188 d.payload,
189 d.status,
190 d.created_at,
191 d.locked_until,
192 f.failure_count,
193 f.last_error
194 FROM deleted AS d
195 JOIN unnest($1::uuid[], $2::int[], $3::text[])
196 AS f(id, failure_count, last_error)
197 ON d.id = f.id
198 ",
199 )
200 .bind(&ids)
201 .bind(&failure_counts)
202 .bind(&last_errors)
203 .execute(&self.inner.pool)
204 .await
205 .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
206
207 debug!(
208 "DLQ reaper: quarantined {}/{} events",
209 result.rows_affected(),
210 entries.len()
211 );
212 Ok(())
213 }
214}
215
216pub struct PostgresWriter<E>(pub E);
217
218#[async_trait]
219impl<E, P> OutboxWriter<P> for PostgresWriter<E>
220where
221 for<'c> &'c E: Executor<'c, Database = Postgres>,
222 E: Send + Sync,
223 P: Debug + Clone + Serialize + Send + Sync + 'static,
224{
225 async fn insert_event(&self, event: Event<P>) -> Result<(), OutboxError> {
226 sqlx::query(
227 r"
228 INSERT INTO outbox_events (id, idempotency_token, event_type, payload, status, created_at, locked_until)
229 VALUES ($1, $2, $3, $4, $5, $6, $7)
230 ",
231 )
232 .bind(event.id.as_uuid())
233 .bind(event.idempotency_token)
234 .bind(event.event_type.as_str())
235 .bind(serde_json::to_value(&event.payload).map_err(|e| OutboxError::DatabaseError(e.to_string()))?)
236 .bind(event.status)
237 .bind(event.created_at)
238 .bind(event.locked_until)
239 .execute(&self.0)
240 .await
241 .map_err(|e| OutboxError::DatabaseError(e.to_string()))?;
242
243 Ok(())
244 }
245}