1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use serde::Serialize;
4use sqlx::postgres::PgListener;
5use sqlx::types::uuid;
6use sqlx::{Executor, PgPool, Postgres};
7use std::fmt::Debug;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::debug;
11
12#[derive(Clone)]
13pub struct PostgresOutbox<P>
14where
15 P: Debug + Clone + Serialize + Send + Sync,
16{
17 inner: Arc<PostgresOutboxInner<P>>,
18}
19
20impl<P> PostgresOutbox<P>
21where
22 P: Debug + Clone + Serialize + Send + Sync,
23{
24 pub fn new(pool: PgPool, config: Arc<OutboxConfig<P>>) -> Self {
25 Self {
26 inner: Arc::new(PostgresOutboxInner {
27 pool,
28 config,
29 listener: Mutex::new(None),
30 }),
31 }
32 }
33}
34
35struct PostgresOutboxInner<P>
36where
37 P: Debug + Clone + Serialize + Send + Sync,
38{
39 pool: PgPool,
40 config: Arc<OutboxConfig<P>>,
41 listener: Mutex<Option<PgListener>>,
42}
43
44#[async_trait]
45impl<P> OutboxStorage<P> for PostgresOutbox<P>
46where
47 P: Debug + Clone + Serialize + Send + Sync + for<'de> serde::Deserialize<'de> + Unpin + 'static,
48{
49 async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event<P>>, OutboxError> {
50 let record = sqlx::query_as::<_, Event<P>>(
51 r"
52 UPDATE outbox_events
53 SET status = 'Processing',
54 locked_until = NOW() + (INTERVAL '1 minute' * $2)
55 WHERE id IN (
56 SELECT id
57 FROM outbox_events
58 WHERE status='Pending'
59 OR (status='Processing' AND locked_until < NOW())
60 ORDER BY locked_until ASC
61 LIMIT $1
62 FOR UPDATE SKIP LOCKED
63 )
64 RETURNING
65 id,
66 idempotency_token,
67 event_type,
68 payload,
69 status,
70 created_at,
71 locked_until
72 ",
73 )
74 .bind(i64::from(limit))
75 .bind(self.inner.config.lock_timeout_mins)
76 .fetch_all(&self.inner.pool)
77 .await
78 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
79 Ok(record)
80 }
81
82 async fn updates_status(
83 &self,
84 ids: &[EventId],
85 status: EventStatus,
86 ) -> Result<(), OutboxError> {
87 let raw_ids: Vec<uuid::Uuid> = ids.iter().map(EventId::as_uuid).collect();
88
89 sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
90 .bind(status)
91 .bind(&raw_ids)
92 .execute(&self.inner.pool)
93 .await
94 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
95
96 Ok(())
97 }
98
99 async fn delete_garbage(&self) -> Result<(), OutboxError> {
100 let result = sqlx::query(
101 r"
102 DELETE
103 FROM outbox_events
104 WHERE id IN (
105 SELECT id FROM outbox_events
106 WHERE status='Sent'
107 AND created_at < now() - (INTERVAL '1 day' * $1)
108 LIMIT 5000
109 )",
110 )
111 .bind(self.inner.config.retention_days)
112 .execute(&self.inner.pool)
113 .await
114 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
115 debug!(
116 "Garbage collector: deleted {} old messages",
117 result.rows_affected()
118 );
119 Ok(())
120 }
121
122 async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
123 let mut guard = self.inner.listener.lock().await;
124
125 if guard.is_none() {
126 let mut listener = PgListener::connect_with(&self.inner.pool)
127 .await
128 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
129
130 listener
131 .listen(channel)
132 .await
133 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
134
135 *guard = Some(listener);
136 }
137 match guard
138 .as_mut()
139 .expect("Listener initialized above")
140 .recv()
141 .await
142 {
143 Ok(_) => Ok(()),
144 Err(e) => {
145 *guard = None;
146 Err(OutboxError::InfrastructureError(e.to_string()))
147 }
148 }
149 }
150}
151
152pub struct PostgresWriter<E>(pub E);
153
154#[async_trait]
155impl<E, P> OutboxWriter<P> for PostgresWriter<E>
156where
157 for<'c> &'c E: Executor<'c, Database = Postgres>,
158 E: Send + Sync,
159 P: Debug + Clone + Serialize + Send + Sync + 'static,
160{
161 async fn insert_event(&self, event: Event<P>) -> Result<(), OutboxError> {
162 sqlx::query(
163 r"
164 INSERT INTO outbox_events (id, idempotency_token, event_type, payload, status, created_at, locked_until)
165 VALUES ($1, $2, $3, $4, $5, $6, $7)
166 ",
167 )
168 .bind(event.id.as_uuid())
169 .bind(event.idempotency_token)
170 .bind(event.event_type.as_str())
171 .bind(serde_json::to_value(&event.payload).map_err(|e| OutboxError::InfrastructureError(e.to_string()))?)
172 .bind(event.status)
173 .bind(event.created_at)
174 .bind(event.locked_until)
175 .execute(&self.0)
176 .await
177 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
178
179 Ok(())
180 }
181}
182
183