1use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{EmittedEvent, QueuedEffectExecution, QueuedEvent, Store, NAMESPACE_SEESAW};
8use serde::{Deserialize, Serialize};
9use sqlx::{FromRow, PgPool};
10use uuid::Uuid;
11
12const EVENT_CLAIM_SECONDS: i64 = 30;
13
14fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
15 parent_created_at
16 .date_naive()
17 .and_hms_opt(0, 0, 0)
18 .expect("midnight should always be a valid UTC timestamp")
19 .and_utc()
20}
21
22pub struct PostgresStore {
24 pool: PgPool,
25}
26
27impl PostgresStore {
28 pub fn new(pool: PgPool) -> Self {
29 Self { pool }
30 }
31
32 pub fn pool(&self) -> &PgPool {
33 &self.pool
34 }
35}
36
37impl Clone for PostgresStore {
38 fn clone(&self) -> Self {
39 Self {
40 pool: self.pool.clone(),
41 }
42 }
43}
44
45#[derive(FromRow)]
46struct EventRow {
47 id: i64,
48 event_id: Uuid,
49 parent_id: Option<Uuid>,
50 correlation_id: Uuid,
51 event_type: String,
52 payload: serde_json::Value,
53 hops: i32,
54 created_at: DateTime<Utc>,
55}
56
57#[derive(FromRow)]
58struct StateRow {
59 state: serde_json::Value,
60 version: i32,
61}
62
63#[derive(FromRow)]
64struct EffectRow {
65 event_id: Uuid,
66 effect_id: String,
67 correlation_id: Uuid,
68 event_type: String,
69 event_payload: serde_json::Value,
70 parent_event_id: Option<Uuid>,
71 execute_at: DateTime<Utc>,
72 timeout_seconds: i32,
73 max_attempts: i32,
74 priority: i32,
75 attempts: i32,
76}
77
78#[derive(FromRow)]
79struct ParentEventRow {
80 hops: i32,
81 created_at: DateTime<Utc>,
82}
83
84#[async_trait]
85impl Store for PostgresStore {
86 async fn publish(&self, event: QueuedEvent) -> Result<()> {
87 let mut tx = self.pool.begin().await?;
88
89 let inserted: Option<Uuid> = sqlx::query_scalar(
92 "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
93 VALUES ($1, $2, $3)
94 ON CONFLICT (event_id) DO NOTHING
95 RETURNING event_id",
96 )
97 .bind(event.event_id)
98 .bind(event.correlation_id)
99 .bind(event.created_at)
100 .fetch_optional(&mut *tx)
101 .await?;
102
103 if inserted.is_none() {
104 tx.commit().await?;
105 return Ok(());
106 }
107
108 sqlx::query(
109 "INSERT INTO seesaw_events (
110 event_id, parent_id, correlation_id, event_type, payload, hops, created_at
111 )
112 VALUES ($1, $2, $3, $4, $5, $6, $7)",
113 )
114 .bind(event.event_id)
115 .bind(event.parent_id)
116 .bind(event.correlation_id)
117 .bind(event.event_type)
118 .bind(event.payload)
119 .bind(event.hops)
120 .bind(event.created_at)
121 .execute(&mut *tx)
122 .await?;
123
124 tx.commit().await?;
125
126 Ok(())
127 }
128
129 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
130 let row: Option<EventRow> = sqlx::query_as(
131 "WITH next_event AS (
132 SELECT e.id
133 FROM seesaw_events e
134 WHERE e.processed_at IS NULL
135 AND (e.locked_until IS NULL OR e.locked_until < NOW())
136 AND NOT EXISTS (
137 SELECT 1
138 FROM seesaw_events older
139 WHERE older.correlation_id = e.correlation_id
140 AND older.processed_at IS NULL
141 AND (
142 older.created_at < e.created_at
143 OR (older.created_at = e.created_at AND older.id < e.id)
144 )
145 )
146 ORDER BY e.created_at ASC, e.id ASC
147 LIMIT 1
148 FOR UPDATE SKIP LOCKED
149 )
150 UPDATE seesaw_events e
151 SET locked_until = NOW() + ($1 * INTERVAL '1 second')
152 FROM next_event
153 WHERE e.id = next_event.id
154 RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload, e.hops, e.created_at",
155 )
156 .bind(EVENT_CLAIM_SECONDS)
157 .fetch_optional(&self.pool)
158 .await?;
159
160 Ok(row.map(|r| QueuedEvent {
161 id: r.id,
162 event_id: r.event_id,
163 parent_id: r.parent_id,
164 correlation_id: r.correlation_id,
165 event_type: r.event_type,
166 payload: r.payload,
167 hops: r.hops,
168 created_at: r.created_at,
169 }))
170 }
171
172 async fn ack(&self, id: i64) -> Result<()> {
173 sqlx::query("UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1")
174 .bind(id)
175 .execute(&self.pool)
176 .await?;
177 Ok(())
178 }
179
180 async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
181 let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
182 sqlx::query(
183 "UPDATE seesaw_events
184 SET retry_count = retry_count + 1,
185 locked_until = $2
186 WHERE id = $1",
187 )
188 .bind(id)
189 .bind(locked_until)
190 .execute(&self.pool)
191 .await?;
192 Ok(())
193 }
194
195 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
196 where
197 S: for<'de> Deserialize<'de> + Send,
198 {
199 let row: Option<StateRow> =
200 sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
201 .bind(correlation_id)
202 .fetch_optional(&self.pool)
203 .await?;
204
205 match row {
206 Some(r) => {
207 let state: S = serde_json::from_value(r.state)?;
208 Ok(Some((state, r.version)))
209 }
210 None => Ok(None),
211 }
212 }
213
214 async fn save_state<S>(&self, correlation_id: Uuid, state: &S, expected_version: i32) -> Result<i32>
215 where
216 S: Serialize + Send + Sync,
217 {
218 let state_json = serde_json::to_value(state)?;
219 let new_version = expected_version + 1;
220
221 let result = sqlx::query(
222 "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
223 VALUES ($1, $2, $3, NOW())
224 ON CONFLICT (correlation_id) DO UPDATE
225 SET state = $2,
226 version = $3,
227 updated_at = NOW()
228 WHERE seesaw_state.version = $4",
229 )
230 .bind(correlation_id)
231 .bind(&state_json)
232 .bind(new_version)
233 .bind(expected_version)
234 .execute(&self.pool)
235 .await?;
236
237 if result.rows_affected() == 0 {
238 anyhow::bail!("Version conflict: state was modified concurrently");
239 }
240
241 Ok(new_version)
242 }
243
244 async fn insert_effect_intent(
245 &self,
246 event_id: Uuid,
247 effect_id: String,
248 correlation_id: Uuid,
249 event_type: String,
250 event_payload: serde_json::Value,
251 parent_event_id: Option<Uuid>,
252 execute_at: DateTime<Utc>,
253 timeout_seconds: i32,
254 max_attempts: i32,
255 priority: i32,
256 ) -> Result<()> {
257 sqlx::query(
258 "INSERT INTO seesaw_effect_executions (
259 event_id, effect_id, correlation_id, status,
260 event_type, event_payload, parent_event_id,
261 execute_at, timeout_seconds, max_attempts, priority
262 )
263 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10)",
264 )
265 .bind(event_id)
266 .bind(effect_id)
267 .bind(correlation_id)
268 .bind(event_type)
269 .bind(event_payload)
270 .bind(parent_event_id)
271 .bind(execute_at)
272 .bind(timeout_seconds)
273 .bind(max_attempts)
274 .bind(priority)
275 .execute(&self.pool)
276 .await?;
277
278 Ok(())
279 }
280
281 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
282 let row: Option<EffectRow> = sqlx::query_as(
283 "WITH next_effect AS (
284 SELECT event_id, effect_id
285 FROM seesaw_effect_executions
286 WHERE status = 'pending'
287 AND execute_at <= NOW()
288 ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
289 LIMIT 1
290 FOR UPDATE SKIP LOCKED
291 )
292 UPDATE seesaw_effect_executions e
293 SET status = 'executing',
294 claimed_at = NOW(),
295 last_attempted_at = NOW(),
296 attempts = e.attempts + 1
297 FROM next_effect
298 WHERE e.event_id = next_effect.event_id
299 AND e.effect_id = next_effect.effect_id
300 RETURNING
301 e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
302 e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
303 )
304 .fetch_optional(&self.pool)
305 .await?;
306
307 if let Some(r) = row {
308 Ok(Some(QueuedEffectExecution {
309 event_id: r.event_id,
310 effect_id: r.effect_id,
311 correlation_id: r.correlation_id,
312 event_type: r.event_type,
313 event_payload: r.event_payload,
314 parent_event_id: r.parent_event_id,
315 execute_at: r.execute_at,
316 timeout_seconds: r.timeout_seconds,
317 max_attempts: r.max_attempts,
318 priority: r.priority,
319 attempts: r.attempts,
320 }))
321 } else {
322 Ok(None)
323 }
324 }
325
326 async fn complete_effect(
327 &self,
328 event_id: Uuid,
329 effect_id: String,
330 result: serde_json::Value,
331 ) -> Result<()> {
332 sqlx::query(
333 "UPDATE seesaw_effect_executions
334 SET status = 'completed',
335 result = $3,
336 completed_at = NOW()
337 WHERE event_id = $1 AND effect_id = $2",
338 )
339 .bind(event_id)
340 .bind(effect_id)
341 .bind(result)
342 .execute(&self.pool)
343 .await?;
344
345 Ok(())
346 }
347
348 async fn complete_effect_with_events(
349 &self,
350 event_id: Uuid,
351 effect_id: String,
352 result: serde_json::Value,
353 emitted_events: Vec<EmittedEvent>,
354 ) -> Result<()> {
355 let effect: EffectRow = sqlx::query_as(
357 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
358 execute_at, timeout_seconds, max_attempts, priority, attempts
359 FROM seesaw_effect_executions
360 WHERE event_id = $1 AND effect_id = $2",
361 )
362 .bind(event_id)
363 .bind(&effect_id)
364 .fetch_one(&self.pool)
365 .await?;
366
367 let parent: ParentEventRow = sqlx::query_as(
369 "SELECT hops, created_at
370 FROM seesaw_events
371 WHERE event_id = $1
372 ORDER BY created_at ASC, id ASC
373 LIMIT 1",
374 )
375 .bind(event_id)
376 .fetch_one(&self.pool)
377 .await?;
378
379 let mut tx = self.pool.begin().await?;
381
382 for emitted in emitted_events {
384 let deterministic_id = Uuid::new_v5(
386 &NAMESPACE_SEESAW,
387 format!("{}-{}-{}", event_id, effect_id, emitted.event_type).as_bytes(),
388 );
389
390 let deterministic_timestamp = emitted_event_created_at(parent.created_at);
393
394 sqlx::query(
396 "INSERT INTO seesaw_events (
397 event_id, parent_id, correlation_id, event_type, payload, hops, created_at
398 )
399 VALUES ($1, $2, $3, $4, $5, $6, $7)
400 ON CONFLICT (event_id, created_at) DO NOTHING",
401 )
402 .bind(deterministic_id)
403 .bind(Some(event_id))
404 .bind(effect.correlation_id)
405 .bind(&emitted.event_type)
406 .bind(emitted.payload)
407 .bind(parent.hops + 1)
408 .bind(deterministic_timestamp)
409 .execute(&mut *tx)
410 .await?;
411 }
412
413 sqlx::query(
415 "UPDATE seesaw_effect_executions
416 SET status = 'completed',
417 result = $3,
418 completed_at = NOW()
419 WHERE event_id = $1 AND effect_id = $2",
420 )
421 .bind(event_id)
422 .bind(effect_id)
423 .bind(result)
424 .execute(&mut *tx)
425 .await?;
426
427 tx.commit().await?;
429
430 Ok(())
431 }
432
433 async fn fail_effect(
434 &self,
435 event_id: Uuid,
436 effect_id: String,
437 error: String,
438 attempts: i32,
439 ) -> Result<()> {
440 sqlx::query(
441 "UPDATE seesaw_effect_executions
442 SET status = 'failed',
443 error = $3
444 WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
445 )
446 .bind(event_id)
447 .bind(effect_id)
448 .bind(error)
449 .bind(attempts)
450 .execute(&self.pool)
451 .await?;
452
453 Ok(())
454 }
455
456 async fn dlq_effect(
457 &self,
458 event_id: Uuid,
459 effect_id: String,
460 error: String,
461 reason: String,
462 attempts: i32,
463 ) -> Result<()> {
464 let effect: EffectRow = sqlx::query_as(
466 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
467 execute_at, timeout_seconds, max_attempts, priority, attempts
468 FROM seesaw_effect_executions
469 WHERE event_id = $1 AND effect_id = $2",
470 )
471 .bind(event_id)
472 .bind(&effect_id)
473 .fetch_one(&self.pool)
474 .await?;
475
476 sqlx::query(
478 "INSERT INTO seesaw_dlq (
479 event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
480 )
481 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
482 )
483 .bind(event_id)
484 .bind(&effect_id)
485 .bind(effect.correlation_id)
486 .bind(error)
487 .bind(effect.event_type)
488 .bind(effect.event_payload)
489 .bind(reason)
490 .bind(attempts)
491 .execute(&self.pool)
492 .await?;
493
494 sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
496 .bind(event_id)
497 .bind(effect_id)
498 .execute(&self.pool)
499 .await?;
500
501 Ok(())
502 }
503
504 async fn subscribe_saga_events(
505 &self,
506 correlation_id: Uuid,
507 ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::SagaEvent> + Send + Unpin>> {
508 use futures::stream::StreamExt;
509 use sqlx::postgres::PgListener;
510
511 let channel = format!("seesaw_saga_{}", correlation_id);
512
513 let mut listener = PgListener::connect_with(&self.pool).await?;
515 listener.listen(&channel).await?;
516
517 let stream = listener.into_stream().filter_map(|result| {
519 Box::pin(async move {
520 match result {
521 Ok(notification) => {
522 if let Ok(event) =
524 serde_json::from_str::<seesaw_core::SagaEvent>(notification.payload())
525 {
526 Some(event)
527 } else {
528 None
529 }
530 }
531 Err(_) => None,
532 }
533 })
534 });
535
536 Ok(Box::new(stream))
537 }
538
539 async fn get_workflow_status(
540 &self,
541 correlation_id: Uuid,
542 ) -> Result<seesaw_core::WorkflowStatus> {
543 let state = sqlx::query_as::<_, (serde_json::Value,)>(
544 "SELECT state FROM seesaw_state WHERE saga_id = $1"
545 )
546 .bind(correlation_id)
547 .fetch_optional(&self.pool)
548 .await?
549 .map(|r| r.0);
550
551 let pending_effects = sqlx::query_as::<_, (i64,)>(
552 "SELECT COUNT(*) FROM seesaw_effect_executions
553 WHERE saga_id = $1 AND status IN ('pending', 'executing')"
554 )
555 .bind(correlation_id)
556 .fetch_one(&self.pool)
557 .await?
558 .0;
559
560 let last_event = sqlx::query_as::<_, (String,)>(
561 "SELECT event_type FROM seesaw_events
562 WHERE saga_id = $1
563 ORDER BY created_at DESC, id DESC
564 LIMIT 1"
565 )
566 .bind(correlation_id)
567 .fetch_optional(&self.pool)
568 .await?
569 .map(|r| r.0);
570
571 Ok(seesaw_core::WorkflowStatus {
572 correlation_id,
573 state,
574 pending_effects,
575 is_settled: pending_effects == 0,
576 last_event,
577 })
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use chrono::{TimeZone, Timelike};
585
586 #[test]
587 fn emitted_event_created_at_is_midnight_on_parent_day() {
588 let parent = Utc
589 .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
590 .single()
591 .expect("valid timestamp");
592
593 let emitted = emitted_event_created_at(parent);
594
595 assert_eq!(emitted.date_naive(), parent.date_naive());
596 assert_eq!(emitted.hour(), 0);
597 assert_eq!(emitted.minute(), 0);
598 assert_eq!(emitted.second(), 0);
599 }
600
601 #[test]
602 fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
603 let first_parent = Utc
604 .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
605 .single()
606 .expect("valid timestamp");
607 let second_parent = Utc
608 .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
609 .single()
610 .expect("valid timestamp");
611
612 let first_emitted = emitted_event_created_at(first_parent);
613 let second_emitted = emitted_event_created_at(second_parent);
614
615 assert_eq!(first_emitted, second_emitted);
616 }
617}