1use async_trait::async_trait;
38use klauthed_core::time::Timestamp;
39use sqlx::AnyPool;
40use sqlx::Row;
41
42use crate::error::DataError;
43use crate::outbox::{Outbox, OutboxEntry, OutboxId};
44
45#[derive(Clone)]
49pub struct SqlOutbox {
50 pool: AnyPool,
51 table: String,
52}
53
54impl SqlOutbox {
55 pub const DEFAULT_TABLE: &'static str = "outbox";
57
58 pub const CREATE_TABLE_SQL: &'static str = "\
62CREATE TABLE IF NOT EXISTS outbox (
63 id TEXT NOT NULL PRIMARY KEY,
64 aggregate_type TEXT NOT NULL,
65 aggregate_id TEXT NOT NULL,
66 event_type TEXT NOT NULL,
67 sequence BIGINT NOT NULL,
68 payload TEXT NOT NULL,
69 occurred_at TEXT NOT NULL,
70 published INTEGER NOT NULL DEFAULT 0,
71 published_at TEXT
72)";
73
74 pub fn new(pool: AnyPool) -> Self {
77 Self { pool, table: Self::DEFAULT_TABLE.to_owned() }
78 }
79
80 pub fn pool(&self) -> &AnyPool {
82 &self.pool
83 }
84
85 pub async fn ensure_schema(&self) -> Result<(), DataError> {
90 sqlx::query(Self::CREATE_TABLE_SQL).execute(&self.pool).await?;
91 Ok(())
92 }
93
94 fn select_prefix(&self) -> String {
96 format!(
97 "SELECT id, aggregate_type, aggregate_id, event_type, sequence, \
98 payload, occurred_at, published, published_at FROM {}",
99 self.table
100 )
101 }
102
103 #[cfg(feature = "postgres")]
115 pub async fn fetch_unpublished_skip_locked(
116 &self,
117 limit: usize,
118 ) -> Result<Vec<OutboxEntry>, DataError> {
119 let sql = format!(
120 "{prefix} WHERE published = 0 ORDER BY sequence ASC LIMIT {limit} FOR UPDATE SKIP LOCKED",
121 prefix = self.select_prefix(),
122 limit = limit as i64,
123 );
124 let rows = sqlx::query(sqlx::AssertSqlSafe(&*sql)).fetch_all(&self.pool).await?;
125 rows.iter().map(row_to_entry).collect()
126 }
127}
128
129fn row_to_entry(row: &sqlx::any::AnyRow) -> Result<OutboxEntry, DataError> {
131 let id_str: String = row.try_get("id")?;
132 let id: OutboxId = id_str
133 .parse()
134 .map_err(|e| DataError::Outbox(format!("invalid outbox id '{id_str}': {e}")))?;
135
136 let payload_str: String = row.try_get("payload")?;
137 let payload: serde_json::Value = serde_json::from_str(&payload_str)
138 .map_err(|e| DataError::Outbox(format!("invalid stored payload json: {e}")))?;
139
140 let occurred_at_str: String = row.try_get("occurred_at")?;
141 let occurred_at = parse_timestamp(&occurred_at_str)?;
142
143 let published_at_str: Option<String> = row.try_get("published_at")?;
144 let published_at = match published_at_str {
145 Some(s) => Some(parse_timestamp(&s)?),
146 None => None,
147 };
148
149 let sequence: i64 = row.try_get("sequence")?;
150 let published: i64 = row.try_get("published")?;
151
152 Ok(OutboxEntry {
153 id,
154 aggregate_type: row.try_get("aggregate_type")?,
155 aggregate_id: row.try_get("aggregate_id")?,
156 event_type: row.try_get("event_type")?,
157 sequence: sequence as u64,
158 payload,
159 occurred_at,
160 published: published != 0,
161 published_at,
162 })
163}
164
165fn parse_timestamp(s: &str) -> Result<Timestamp, DataError> {
167 serde_json::from_value(serde_json::Value::String(s.to_owned()))
168 .map_err(|e| DataError::Outbox(format!("invalid stored timestamp '{s}': {e}")))
169}
170
171#[async_trait]
172impl Outbox for SqlOutbox {
173 async fn enqueue(&self, entries: Vec<OutboxEntry>) -> Result<(), DataError> {
174 if entries.is_empty() {
175 return Ok(());
176 }
177
178 let sql = format!(
179 "INSERT INTO {} \
180 (id, aggregate_type, aggregate_id, event_type, sequence, payload, occurred_at, published, published_at) \
181 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
182 self.table
183 );
184
185 let mut tx = self.pool.begin().await?;
187 for entry in entries {
188 let payload = serde_json::to_string(&entry.payload).map_err(|e| {
189 DataError::Outbox(format!("failed to serialize outbox payload: {e}"))
190 })?;
191 let published_at = entry.published_at.map(|t| t.to_rfc3339());
192 sqlx::query(sqlx::AssertSqlSafe(&*sql))
193 .bind(entry.id.to_string())
194 .bind(entry.aggregate_type)
195 .bind(entry.aggregate_id)
196 .bind(entry.event_type)
197 .bind(entry.sequence as i64)
198 .bind(payload)
199 .bind(entry.occurred_at.to_rfc3339())
200 .bind(i64::from(entry.published))
201 .bind(published_at)
202 .execute(&mut *tx)
203 .await?;
204 }
205 tx.commit().await?;
206 Ok(())
207 }
208
209 async fn fetch_unpublished(&self, limit: usize) -> Result<Vec<OutboxEntry>, DataError> {
210 let sql = format!(
211 "{prefix} WHERE published = 0 ORDER BY sequence ASC LIMIT {limit}",
212 prefix = self.select_prefix(),
213 limit = limit as i64,
214 );
215 let rows = sqlx::query(sqlx::AssertSqlSafe(&*sql)).fetch_all(&self.pool).await?;
216 rows.iter().map(row_to_entry).collect()
217 }
218
219 async fn mark_published(&self, ids: &[OutboxId]) -> Result<(), DataError> {
220 if ids.is_empty() {
221 return Ok(());
222 }
223 let now = Timestamp::now().to_rfc3339();
224 let sql = format!(
225 "UPDATE {} SET published = 1, published_at = ? WHERE id = ? AND published = 0",
226 self.table
227 );
228 let mut tx = self.pool.begin().await?;
229 for id in ids {
230 sqlx::query(sqlx::AssertSqlSafe(&*sql))
231 .bind(now.clone())
232 .bind(id.to_string())
233 .execute(&mut *tx)
234 .await?;
235 }
236 tx.commit().await?;
237 Ok(())
238 }
239}
240
241#[cfg(all(test, feature = "sqlite"))]
242mod tests {
243 use super::*;
244 use klauthed_core::domain::{DomainEvent, EventEnvelope};
245 use klauthed_core::id::Id;
246 use serde::Serialize;
247 use std::borrow::Cow;
248
249 #[derive(Debug, Serialize)]
250 struct Opened {
251 owner: String,
252 }
253
254 impl DomainEvent for Opened {
255 fn event_type(&self) -> &'static str {
256 "account.opened"
257 }
258 }
259
260 fn entry(seq: u64) -> OutboxEntry {
261 let envelope = EventEnvelope {
262 event_id: Id::new(),
263 event_type: Cow::Borrowed("account.opened"),
264 aggregate_id: "acct-1".to_owned(),
265 aggregate_type: Cow::Borrowed("account"),
266 sequence: seq,
267 occurred_at: Timestamp::from_unix_millis(1_000 + seq as i64),
268 payload: Opened { owner: format!("owner-{seq}") },
269 };
270 OutboxEntry::from_envelope(&envelope).unwrap()
271 }
272
273 async fn memory_outbox() -> SqlOutbox {
274 sqlx::any::install_default_drivers();
275 let pool = sqlx::pool::PoolOptions::new()
279 .max_connections(1)
280 .connect("sqlite::memory:")
281 .await
282 .expect("connect in-memory sqlite");
283 let outbox = SqlOutbox::new(pool);
284 outbox.ensure_schema().await.expect("ensure schema");
285 outbox
286 }
287
288 #[tokio::test]
289 async fn ensure_schema_is_idempotent() {
290 let outbox = memory_outbox().await;
291 outbox.ensure_schema().await.unwrap();
293 assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
294 }
295
296 #[tokio::test]
297 async fn enqueue_fetch_mark_round_trip_over_any_sqlite() {
298 let outbox = memory_outbox().await;
299 let e1 = entry(1);
300 let e2 = entry(2);
301 let (id1, id2) = (e1.id, e2.id);
302
303 outbox.enqueue(vec![e1.clone(), e2.clone()]).await.unwrap();
304
305 let unpublished = outbox.fetch_unpublished(10).await.unwrap();
306 assert_eq!(unpublished.len(), 2);
307 assert_eq!(unpublished[0], e1);
309 assert_eq!(unpublished[1].id, id2);
310 assert_eq!(unpublished[0].payload["owner"], "owner-1");
311 assert!(!unpublished[0].published);
312
313 outbox.mark_published(&[id1]).await.unwrap();
315 let remaining = outbox.fetch_unpublished(10).await.unwrap();
316 assert_eq!(remaining.len(), 1);
317 assert_eq!(remaining[0].id, id2);
318
319 outbox.mark_published(&[id2]).await.unwrap();
320 assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
321 }
322
323 #[tokio::test]
324 async fn fetch_honors_limit_and_sequence_order() {
325 let outbox = memory_outbox().await;
326 let entries: Vec<_> = (1..=5).map(entry).collect();
327 outbox.enqueue(entries).await.unwrap();
328
329 let two = outbox.fetch_unpublished(2).await.unwrap();
330 assert_eq!(two.len(), 2);
331 assert_eq!(two[0].sequence, 1);
332 assert_eq!(two[1].sequence, 2);
333
334 assert_eq!(outbox.fetch_unpublished(100).await.unwrap().len(), 5);
335 }
336
337 #[tokio::test]
338 async fn marking_published_stores_published_at() {
339 let outbox = memory_outbox().await;
340 let e = entry(1);
341 let id = e.id;
342 outbox.enqueue(vec![e]).await.unwrap();
343 outbox.mark_published(&[id]).await.unwrap();
344
345 outbox.mark_published(&[id]).await.unwrap();
347 assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
348 }
349
350 #[tokio::test]
351 async fn empty_batches_are_noops() {
352 let outbox = memory_outbox().await;
353 outbox.enqueue(vec![]).await.unwrap();
354 outbox.mark_published(&[]).await.unwrap();
355 assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
356 }
357}