1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use sqlx::types::uuid;
4use sqlx::{Executor, PgPool, Postgres};
5use std::sync::Arc;
6use sqlx::postgres::PgListener;
7use tokio::sync::Mutex;
8use tracing::debug;
9
10#[derive(Clone)]
11pub struct PostgresOutbox {
12 inner: Arc<PostgresOutboxInner>,
13}
14
15impl PostgresOutbox {
16 pub fn new(pool: PgPool, config: Arc<OutboxConfig>) -> Self {
17 Self {
18 inner: Arc::new(PostgresOutboxInner {
19 pool,
20 config,
21 listener: Mutex::new(None),
22 }),
23 }
24 }
25}
26
27struct PostgresOutboxInner {
28 pool: PgPool,
29 config: Arc<OutboxConfig>,
30 listener: Mutex<Option<PgListener>>,
31}
32
33#[async_trait]
34impl OutboxStorage for PostgresOutbox {
35 async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event>, OutboxError> {
36 let record = sqlx::query_as::<_, Event>(
37 r"
38 UPDATE outbox_events
39 SET status = 'Processing',
40 locked_until = NOW() + (INTERVAL '1 minute' * $2)
41 WHERE id IN (
42 SELECT id
43 FROM outbox_events
44 WHERE status='Pending'
45 OR (status='Processing' AND locked_until < NOW())
46 ORDER BY locked_until ASC
47 LIMIT $1
48 FOR UPDATE SKIP LOCKED
49 )
50 RETURNING
51 id,
52 idempotency_token,
53 event_type,
54 payload,
55 status,
56 created_at,
57 locked_until
58 ",
59 )
60 .bind(i64::from(limit))
61 .bind(self.inner.config.lock_timeout_mins)
62 .fetch_all(&self.inner.pool)
63 .await
64 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
65 Ok(record)
66 }
67
68 async fn updates_status(
69 &self,
70 ids: &Vec<EventId>,
71 status: EventStatus,
72 ) -> Result<(), OutboxError> {
73 let raw_ids: Vec<uuid::Uuid> = ids.iter().map(EventId::as_uuid).collect();
74
75 sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
76 .bind(status)
77 .bind(&raw_ids)
78 .execute(&self.inner.pool)
79 .await
80 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
81
82 Ok(())
83 }
84
85 async fn delete_garbage(&self) -> Result<(), OutboxError> {
86 let result = sqlx::query(
87 r"
88 DELETE
89 FROM outbox_events
90 WHERE id IN (
91 SELECT id FROM outbox_events
92 WHERE status='Sent'
93 AND created_at < now() - (INTERVAL '1 day' * $1)
94 LIMIT 5000
95 )",
96 )
97 .bind(self.inner.config.retention_days)
98 .execute(&self.inner.pool)
99 .await
100 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
101 debug!(
102 "Garbage collector: deleted {} old messages",
103 result.rows_affected()
104 );
105 Ok(())
106 }
107
108 async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
109
110 let mut guard = self.inner.listener.lock().await;
111
112 if guard.is_none() {
113 let mut listener = sqlx::postgres::PgListener::connect_with(&self.inner.pool)
114 .await
115 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
116
117 listener
118 .listen(channel)
119 .await
120 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
121
122 *guard = Some(listener);
123 }
124 match guard.as_mut().unwrap().recv().await {
125 Ok(_) => Ok(()),
126 Err(e) => {
127 *guard = None;
128 Err(OutboxError::InfrastructureError(e.to_string()))
129 }
130 }
131 }
132}
133
134pub struct PostgresWriter<E>(pub E);
135
136#[async_trait]
137impl<'a, E> OutboxWriter for PostgresWriter<E>
138where
139 for<'c> &'c E: Executor<'c, Database = Postgres>,
140 E: Send + Sync,
141{
142 async fn insert_event(&self, event: Event) -> Result<(), OutboxError> {
143 sqlx::query(
144 r"
145 INSERT INTO outbox_events (id, event_type, payload, status, created_at, locked_until)
146 VALUES ($1, $2, $3, $4, $5, $6)
147 ",
148 )
149 .bind(event.id.as_uuid())
150 .bind(event.event_type.as_str())
151 .bind(event.payload.as_json())
152 .bind(event.status)
153 .bind(event.created_at)
154 .bind(event.locked_until)
155 .execute(&self.0)
156 .await
157 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
158
159 Ok(())
160 }
161}
162
163