1use async_trait::async_trait;
2use outbox_core::prelude::*;
3use sqlx::types::uuid;
4use sqlx::{Executor, PgPool, Postgres};
5use std::sync::Arc;
6use tracing::debug;
7
8#[derive(Clone)]
9pub struct PostgresOutbox {
10 pool: PgPool,
11 config: Arc<OutboxConfig>,
12}
13
14impl PostgresOutbox {
15 pub fn new(pool: PgPool, config: Arc<OutboxConfig>) -> Self {
16 Self { pool, config }
17 }
18}
19
20#[async_trait]
21impl OutboxStorage for PostgresOutbox {
22 async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<OutboxSlot>, OutboxError> {
23 let record = sqlx::query_as::<_, OutboxSlot>(
24 r"
25 UPDATE outbox_events
26 SET status = 'Processing',
27 locked_until = NOW() + (INTERVAL '1 minute' * $2)
28 WHERE id IN (
29 SELECT id
30 FROM outbox_events
31 WHERE status='Pending'
32 OR (status='Processing' AND locked_until < NOW())
33 ORDER BY locked_until ASC
34 LIMIT $1
35 FOR UPDATE SKIP LOCKED
36 )
37 RETURNING
38 id,
39 event_type,
40 payload,
41 status,
42 created_at,
43 locked_until
44 ",
45 )
46 .bind(i64::from(limit))
47 .bind(self.config.lock_timeout_mins)
48 .fetch_all(&self.pool)
49 .await
50 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
51 Ok(record)
52 }
53
54 async fn updates_status(
55 &self,
56 ids: &Vec<SlotId>,
57 status: SlotStatus,
58 ) -> Result<(), OutboxError> {
59 let raw_ids: Vec<uuid::Uuid> = ids.iter().map(SlotId::as_uuid).collect();
60
61 sqlx::query(r"UPDATE outbox_events SET status = $1 WHERE id = ANY($2)")
62 .bind(status)
63 .bind(&raw_ids)
64 .execute(&self.pool)
65 .await
66 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
67
68 Ok(())
69 }
70
71 async fn delete_garbage(&self) -> Result<(), OutboxError> {
72 let result = sqlx::query(
73 r"
74 DELETE
75 FROM outbox_events
76 WHERE id IN (
77 SELECT id FROM outbox_events
78 WHERE status='Sent'
79 AND created_at < now() - (INTERVAL '1 day' * $1)
80 LIMIT 5000
81 )",
82 )
83 .bind(self.config.retention_days)
84 .execute(&self.pool)
85 .await
86 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
87 debug!(
88 "Garbage collector: deleted {} old messages",
89 result.rows_affected()
90 );
91 Ok(())
92 }
93
94 async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError> {
95 let mut listener = sqlx::postgres::PgListener::connect_with(&self.pool)
96 .await
97 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
98
99 listener
100 .listen(channel)
101 .await
102 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
103
104 listener
105 .recv()
106 .await
107 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
108
109 Ok(())
110 }
111}
112
113pub struct PostgresWriter<E>(pub E);
114
115#[async_trait]
116impl<'a, E> OutboxWriter for PostgresWriter<E>
117where
118 for<'c> &'c E: Executor<'c, Database = Postgres>,
119 E: Send + Sync,
120{
121 async fn insert_event(&self, event: OutboxSlot) -> Result<(), OutboxError> {
122 sqlx::query(
123 r"
124 INSERT INTO outbox_events (id, event_type, payload, status, created_at, locked_until)
125 VALUES ($1, $2, $3, $4, $5, $6)
126 ",
127 )
128 .bind(event.id.as_uuid())
129 .bind(event.event_type.as_str())
130 .bind(event.payload.as_json())
131 .bind(event.status)
132 .bind(event.created_at)
133 .bind(event.locked_until)
134 .execute(&self.0)
135 .await
136 .map_err(|e| OutboxError::InfrastructureError(e.to_string()))?;
137
138 Ok(())
139 }
140}
141
142