Skip to main content

seesaw_postgres/
lib.rs

1// Simplified PostgresStore without compile-time checked queries
2// Uses dynamic queries for easier testing
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{EmittedEvent, QueuedEffectExecution, QueuedEvent, Store, NAMESPACE_SEESAW};
8use serde::{Deserialize, Serialize};
9use sqlx::{FromRow, PgPool};
10use uuid::Uuid;
11
12const EVENT_CLAIM_SECONDS: i64 = 30;
13
14fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
15    parent_created_at
16        .date_naive()
17        .and_hms_opt(0, 0, 0)
18        .expect("midnight should always be a valid UTC timestamp")
19        .and_utc()
20}
21
22/// PostgreSQL implementation of Store trait
23pub struct PostgresStore {
24    pool: PgPool,
25}
26
27impl PostgresStore {
28    pub fn new(pool: PgPool) -> Self {
29        Self { pool }
30    }
31
32    pub fn pool(&self) -> &PgPool {
33        &self.pool
34    }
35}
36
37impl Clone for PostgresStore {
38    fn clone(&self) -> Self {
39        Self {
40            pool: self.pool.clone(),
41        }
42    }
43}
44
45#[derive(FromRow)]
46struct EventRow {
47    id: i64,
48    event_id: Uuid,
49    parent_id: Option<Uuid>,
50    correlation_id: Uuid,
51    event_type: String,
52    payload: serde_json::Value,
53    hops: i32,
54    created_at: DateTime<Utc>,
55}
56
57#[derive(FromRow)]
58struct StateRow {
59    state: serde_json::Value,
60    version: i32,
61}
62
63#[derive(FromRow)]
64struct EffectRow {
65    event_id: Uuid,
66    effect_id: String,
67    correlation_id: Uuid,
68    event_type: String,
69    event_payload: serde_json::Value,
70    parent_event_id: Option<Uuid>,
71    execute_at: DateTime<Utc>,
72    timeout_seconds: i32,
73    max_attempts: i32,
74    priority: i32,
75    attempts: i32,
76}
77
78#[derive(FromRow)]
79struct ParentEventRow {
80    hops: i32,
81    created_at: DateTime<Utc>,
82}
83
84#[async_trait]
85impl Store for PostgresStore {
86    async fn publish(&self, event: QueuedEvent) -> Result<()> {
87        let mut tx = self.pool.begin().await?;
88
89        // Use the non-partitioned ledger as the idempotency guard. This keeps
90        // webhook/process_with_id dedupe stable even when created_at differs.
91        let inserted: Option<Uuid> = sqlx::query_scalar(
92            "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
93             VALUES ($1, $2, $3)
94             ON CONFLICT (event_id) DO NOTHING
95             RETURNING event_id",
96        )
97        .bind(event.event_id)
98        .bind(event.correlation_id)
99        .bind(event.created_at)
100        .fetch_optional(&mut *tx)
101        .await?;
102
103        if inserted.is_none() {
104            tx.commit().await?;
105            return Ok(());
106        }
107
108        sqlx::query(
109            "INSERT INTO seesaw_events (
110                event_id, parent_id, correlation_id, event_type, payload, hops, created_at
111             )
112             VALUES ($1, $2, $3, $4, $5, $6, $7)",
113        )
114        .bind(event.event_id)
115        .bind(event.parent_id)
116        .bind(event.correlation_id)
117        .bind(event.event_type)
118        .bind(event.payload)
119        .bind(event.hops)
120        .bind(event.created_at)
121        .execute(&mut *tx)
122        .await?;
123
124        tx.commit().await?;
125
126        Ok(())
127    }
128
129    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
130        let row: Option<EventRow> = sqlx::query_as(
131            "WITH next_event AS (
132                SELECT e.id
133                FROM seesaw_events e
134                WHERE e.processed_at IS NULL
135                  AND (e.locked_until IS NULL OR e.locked_until < NOW())
136                  AND NOT EXISTS (
137                    SELECT 1
138                    FROM seesaw_events older
139                    WHERE older.correlation_id = e.correlation_id
140                      AND older.processed_at IS NULL
141                      AND (
142                        older.created_at < e.created_at
143                        OR (older.created_at = e.created_at AND older.id < e.id)
144                      )
145                  )
146                ORDER BY e.created_at ASC, e.id ASC
147                LIMIT 1
148                FOR UPDATE SKIP LOCKED
149            )
150            UPDATE seesaw_events e
151            SET locked_until = NOW() + ($1 * INTERVAL '1 second')
152            FROM next_event
153            WHERE e.id = next_event.id
154            RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload, e.hops, e.created_at",
155        )
156        .bind(EVENT_CLAIM_SECONDS)
157        .fetch_optional(&self.pool)
158        .await?;
159
160        Ok(row.map(|r| QueuedEvent {
161            id: r.id,
162            event_id: r.event_id,
163            parent_id: r.parent_id,
164            correlation_id: r.correlation_id,
165            event_type: r.event_type,
166            payload: r.payload,
167            hops: r.hops,
168            created_at: r.created_at,
169        }))
170    }
171
172    async fn ack(&self, id: i64) -> Result<()> {
173        sqlx::query("UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1")
174            .bind(id)
175            .execute(&self.pool)
176            .await?;
177        Ok(())
178    }
179
180    async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
181        let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
182        sqlx::query(
183            "UPDATE seesaw_events
184             SET retry_count = retry_count + 1,
185                 locked_until = $2
186             WHERE id = $1",
187        )
188        .bind(id)
189        .bind(locked_until)
190        .execute(&self.pool)
191        .await?;
192        Ok(())
193    }
194
195    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
196    where
197        S: for<'de> Deserialize<'de> + Send,
198    {
199        let row: Option<StateRow> =
200            sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
201                .bind(correlation_id)
202                .fetch_optional(&self.pool)
203                .await?;
204
205        match row {
206            Some(r) => {
207                let state: S = serde_json::from_value(r.state)?;
208                Ok(Some((state, r.version)))
209            }
210            None => Ok(None),
211        }
212    }
213
214    async fn save_state<S>(&self, correlation_id: Uuid, state: &S, expected_version: i32) -> Result<i32>
215    where
216        S: Serialize + Send + Sync,
217    {
218        let state_json = serde_json::to_value(state)?;
219        let new_version = expected_version + 1;
220
221        let result = sqlx::query(
222            "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
223             VALUES ($1, $2, $3, NOW())
224             ON CONFLICT (correlation_id) DO UPDATE
225             SET state = $2,
226                 version = $3,
227                 updated_at = NOW()
228             WHERE seesaw_state.version = $4",
229        )
230        .bind(correlation_id)
231        .bind(&state_json)
232        .bind(new_version)
233        .bind(expected_version)
234        .execute(&self.pool)
235        .await?;
236
237        if result.rows_affected() == 0 {
238            anyhow::bail!("Version conflict: state was modified concurrently");
239        }
240
241        Ok(new_version)
242    }
243
244    async fn insert_effect_intent(
245        &self,
246        event_id: Uuid,
247        effect_id: String,
248        correlation_id: Uuid,
249        event_type: String,
250        event_payload: serde_json::Value,
251        parent_event_id: Option<Uuid>,
252        execute_at: DateTime<Utc>,
253        timeout_seconds: i32,
254        max_attempts: i32,
255        priority: i32,
256    ) -> Result<()> {
257        sqlx::query(
258            "INSERT INTO seesaw_effect_executions (
259                event_id, effect_id, correlation_id, status,
260                event_type, event_payload, parent_event_id,
261                execute_at, timeout_seconds, max_attempts, priority
262             )
263             VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10)",
264        )
265        .bind(event_id)
266        .bind(effect_id)
267        .bind(correlation_id)
268        .bind(event_type)
269        .bind(event_payload)
270        .bind(parent_event_id)
271        .bind(execute_at)
272        .bind(timeout_seconds)
273        .bind(max_attempts)
274        .bind(priority)
275        .execute(&self.pool)
276        .await?;
277
278        Ok(())
279    }
280
281    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
282        let row: Option<EffectRow> = sqlx::query_as(
283            "WITH next_effect AS (
284                SELECT event_id, effect_id
285                FROM seesaw_effect_executions
286                WHERE status = 'pending'
287                  AND execute_at <= NOW()
288                ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
289                LIMIT 1
290                FOR UPDATE SKIP LOCKED
291            )
292            UPDATE seesaw_effect_executions e
293            SET status = 'executing',
294                claimed_at = NOW(),
295                last_attempted_at = NOW(),
296                attempts = e.attempts + 1
297            FROM next_effect
298            WHERE e.event_id = next_effect.event_id
299              AND e.effect_id = next_effect.effect_id
300            RETURNING
301                e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
302                e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
303        )
304        .fetch_optional(&self.pool)
305        .await?;
306
307        if let Some(r) = row {
308            Ok(Some(QueuedEffectExecution {
309                event_id: r.event_id,
310                effect_id: r.effect_id,
311                correlation_id: r.correlation_id,
312                event_type: r.event_type,
313                event_payload: r.event_payload,
314                parent_event_id: r.parent_event_id,
315                execute_at: r.execute_at,
316                timeout_seconds: r.timeout_seconds,
317                max_attempts: r.max_attempts,
318                priority: r.priority,
319                attempts: r.attempts,
320            }))
321        } else {
322            Ok(None)
323        }
324    }
325
326    async fn complete_effect(
327        &self,
328        event_id: Uuid,
329        effect_id: String,
330        result: serde_json::Value,
331    ) -> Result<()> {
332        sqlx::query(
333            "UPDATE seesaw_effect_executions
334             SET status = 'completed',
335                 result = $3,
336                 completed_at = NOW()
337             WHERE event_id = $1 AND effect_id = $2",
338        )
339        .bind(event_id)
340        .bind(effect_id)
341        .bind(result)
342        .execute(&self.pool)
343        .await?;
344
345        Ok(())
346    }
347
348    async fn complete_effect_with_events(
349        &self,
350        event_id: Uuid,
351        effect_id: String,
352        result: serde_json::Value,
353        emitted_events: Vec<EmittedEvent>,
354    ) -> Result<()> {
355        // Get correlation_id and hops for emitted events
356        let effect: EffectRow = sqlx::query_as(
357            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
358                    execute_at, timeout_seconds, max_attempts, priority, attempts
359             FROM seesaw_effect_executions
360             WHERE event_id = $1 AND effect_id = $2",
361        )
362        .bind(event_id)
363        .bind(&effect_id)
364        .fetch_one(&self.pool)
365        .await?;
366
367        // Read parent metadata for deterministic hop increment and timestamp.
368        let parent: ParentEventRow = sqlx::query_as(
369            "SELECT hops, created_at
370             FROM seesaw_events
371             WHERE event_id = $1
372             ORDER BY created_at ASC, id ASC
373             LIMIT 1",
374        )
375        .bind(event_id)
376        .fetch_one(&self.pool)
377        .await?;
378
379        // Start transaction for atomicity
380        let mut tx = self.pool.begin().await?;
381
382        // Insert emitted events with deterministic IDs
383        for emitted in emitted_events {
384            // Generate deterministic event_id from hash(parent_event_id, effect_id, event_type)
385            let deterministic_id = Uuid::new_v5(
386                &NAMESPACE_SEESAW,
387                format!("{}-{}-{}", event_id, effect_id, emitted.event_type).as_bytes(),
388            );
389
390            // Deterministic timestamp keeps retries idempotent while staying in
391            // the same partition day as the parent event.
392            let deterministic_timestamp = emitted_event_created_at(parent.created_at);
393
394            // Insert event (idempotent via ON CONFLICT on (event_id, created_at))
395            sqlx::query(
396                "INSERT INTO seesaw_events (
397                    event_id, parent_id, correlation_id, event_type, payload, hops, created_at
398                 )
399                 VALUES ($1, $2, $3, $4, $5, $6, $7)
400                 ON CONFLICT (event_id, created_at) DO NOTHING",
401            )
402            .bind(deterministic_id)
403            .bind(Some(event_id))
404            .bind(effect.correlation_id)
405            .bind(&emitted.event_type)
406            .bind(emitted.payload)
407            .bind(parent.hops + 1)
408            .bind(deterministic_timestamp)
409            .execute(&mut *tx)
410            .await?;
411        }
412
413        // Mark effect as completed (same transaction)
414        sqlx::query(
415            "UPDATE seesaw_effect_executions
416             SET status = 'completed',
417                 result = $3,
418                 completed_at = NOW()
419             WHERE event_id = $1 AND effect_id = $2",
420        )
421        .bind(event_id)
422        .bind(effect_id)
423        .bind(result)
424        .execute(&mut *tx)
425        .await?;
426
427        // Commit transaction - both succeed or both fail
428        tx.commit().await?;
429
430        Ok(())
431    }
432
433    async fn fail_effect(
434        &self,
435        event_id: Uuid,
436        effect_id: String,
437        error: String,
438        attempts: i32,
439    ) -> Result<()> {
440        sqlx::query(
441            "UPDATE seesaw_effect_executions
442             SET status = 'failed',
443                 error = $3
444             WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
445        )
446        .bind(event_id)
447        .bind(effect_id)
448        .bind(error)
449        .bind(attempts)
450        .execute(&self.pool)
451        .await?;
452
453        Ok(())
454    }
455
456    async fn dlq_effect(
457        &self,
458        event_id: Uuid,
459        effect_id: String,
460        error: String,
461        reason: String,
462        attempts: i32,
463    ) -> Result<()> {
464        // Get effect details for DLQ
465        let effect: EffectRow = sqlx::query_as(
466            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
467                    execute_at, timeout_seconds, max_attempts, priority, attempts
468             FROM seesaw_effect_executions
469             WHERE event_id = $1 AND effect_id = $2",
470        )
471        .bind(event_id)
472        .bind(&effect_id)
473        .fetch_one(&self.pool)
474        .await?;
475
476        // Insert into DLQ
477        sqlx::query(
478            "INSERT INTO seesaw_dlq (
479                event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
480             )
481             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
482        )
483        .bind(event_id)
484        .bind(&effect_id)
485        .bind(effect.correlation_id)
486        .bind(error)
487        .bind(effect.event_type)
488        .bind(effect.event_payload)
489        .bind(reason)
490        .bind(attempts)
491        .execute(&self.pool)
492        .await?;
493
494        // Delete from executions table
495        sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
496            .bind(event_id)
497            .bind(effect_id)
498            .execute(&self.pool)
499            .await?;
500
501        Ok(())
502    }
503
504    async fn subscribe_saga_events(
505        &self,
506        correlation_id: Uuid,
507    ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::SagaEvent> + Send + Unpin>> {
508        use futures::stream::StreamExt;
509        use sqlx::postgres::PgListener;
510
511        let channel = format!("seesaw_saga_{}", correlation_id);
512
513        // Create a new listener connection
514        let mut listener = PgListener::connect_with(&self.pool).await?;
515        listener.listen(&channel).await?;
516
517        // Convert listener into a stream of SagaEvent
518        let stream = listener.into_stream().filter_map(|result| {
519            Box::pin(async move {
520                match result {
521                    Ok(notification) => {
522                        // Parse the JSON payload from the notification
523                        if let Ok(event) =
524                            serde_json::from_str::<seesaw_core::SagaEvent>(notification.payload())
525                        {
526                            Some(event)
527                        } else {
528                            None
529                        }
530                    }
531                    Err(_) => None,
532                }
533            })
534        });
535
536        Ok(Box::new(stream))
537    }
538
539    async fn get_workflow_status(
540        &self,
541        correlation_id: Uuid,
542    ) -> Result<seesaw_core::WorkflowStatus> {
543        let state = sqlx::query_as::<_, (serde_json::Value,)>(
544            "SELECT state FROM seesaw_state WHERE saga_id = $1"
545        )
546        .bind(correlation_id)
547        .fetch_optional(&self.pool)
548        .await?
549        .map(|r| r.0);
550
551        let pending_effects = sqlx::query_as::<_, (i64,)>(
552            "SELECT COUNT(*) FROM seesaw_effect_executions
553             WHERE saga_id = $1 AND status IN ('pending', 'executing')"
554        )
555        .bind(correlation_id)
556        .fetch_one(&self.pool)
557        .await?
558        .0;
559
560        let last_event = sqlx::query_as::<_, (String,)>(
561            "SELECT event_type FROM seesaw_events
562             WHERE saga_id = $1
563             ORDER BY created_at DESC, id DESC
564             LIMIT 1"
565        )
566        .bind(correlation_id)
567        .fetch_optional(&self.pool)
568        .await?
569        .map(|r| r.0);
570
571        Ok(seesaw_core::WorkflowStatus {
572            correlation_id,
573            state,
574            pending_effects,
575            is_settled: pending_effects == 0,
576            last_event,
577        })
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use chrono::{TimeZone, Timelike};
585
586    #[test]
587    fn emitted_event_created_at_is_midnight_on_parent_day() {
588        let parent = Utc
589            .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
590            .single()
591            .expect("valid timestamp");
592
593        let emitted = emitted_event_created_at(parent);
594
595        assert_eq!(emitted.date_naive(), parent.date_naive());
596        assert_eq!(emitted.hour(), 0);
597        assert_eq!(emitted.minute(), 0);
598        assert_eq!(emitted.second(), 0);
599    }
600
601    #[test]
602    fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
603        let first_parent = Utc
604            .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
605            .single()
606            .expect("valid timestamp");
607        let second_parent = Utc
608            .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
609            .single()
610            .expect("valid timestamp");
611
612        let first_emitted = emitted_event_created_at(first_parent);
613        let second_emitted = emitted_event_created_at(second_parent);
614
615        assert_eq!(first_emitted, second_emitted);
616    }
617}