Skip to main content

seesaw_postgres/
lib.rs

1// Simplified PostgresStore without compile-time checked queries
2// Uses dynamic queries for easier testing
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8    insight::*, EmittedEvent, QueuedEffectExecution, QueuedEvent, Store, NAMESPACE_SEESAW,
9};
10use serde::{Deserialize, Serialize};
11use sqlx::{FromRow, PgPool};
12use uuid::Uuid;
13
14const EVENT_CLAIM_SECONDS: i64 = 30;
15
16fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
17    parent_created_at
18        .date_naive()
19        .and_hms_opt(0, 0, 0)
20        .expect("midnight should always be a valid UTC timestamp")
21        .and_utc()
22}
23
24fn effect_retry_delay_seconds(attempts: i32) -> i64 {
25    let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
26    1_i64 << exponent
27}
28
29/// PostgreSQL implementation of Store trait
30pub struct PostgresStore {
31    pool: PgPool,
32}
33
34impl PostgresStore {
35    pub fn new(pool: PgPool) -> Self {
36        Self { pool }
37    }
38
39    pub fn pool(&self) -> &PgPool {
40        &self.pool
41    }
42}
43
44impl Clone for PostgresStore {
45    fn clone(&self) -> Self {
46        Self {
47            pool: self.pool.clone(),
48        }
49    }
50}
51
52#[derive(FromRow)]
53struct EventRow {
54    id: i64,
55    event_id: Uuid,
56    parent_id: Option<Uuid>,
57    correlation_id: Uuid,
58    event_type: String,
59    payload: serde_json::Value,
60    hops: i32,
61    created_at: DateTime<Utc>,
62}
63
64#[derive(FromRow)]
65struct StateRow {
66    state: serde_json::Value,
67    version: i32,
68}
69
70#[derive(FromRow)]
71struct EffectRow {
72    event_id: Uuid,
73    effect_id: String,
74    correlation_id: Uuid,
75    event_type: String,
76    event_payload: serde_json::Value,
77    parent_event_id: Option<Uuid>,
78    execute_at: DateTime<Utc>,
79    timeout_seconds: i32,
80    max_attempts: i32,
81    priority: i32,
82    attempts: i32,
83}
84
85#[derive(FromRow)]
86struct ParentEventRow {
87    hops: i32,
88    created_at: DateTime<Utc>,
89}
90
91#[async_trait]
92impl Store for PostgresStore {
93    async fn publish(&self, event: QueuedEvent) -> Result<()> {
94        let mut tx = self.pool.begin().await?;
95
96        // Use the non-partitioned ledger as the idempotency guard. This keeps
97        // webhook/process_with_id dedupe stable even when created_at differs.
98        let inserted: Option<Uuid> = sqlx::query_scalar(
99            "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
100             VALUES ($1, $2, $3)
101             ON CONFLICT (event_id) DO NOTHING
102             RETURNING event_id",
103        )
104        .bind(event.event_id)
105        .bind(event.correlation_id)
106        .bind(event.created_at)
107        .fetch_optional(&mut *tx)
108        .await?;
109
110        if inserted.is_none() {
111            tx.commit().await?;
112            return Ok(());
113        }
114
115        sqlx::query(
116            "INSERT INTO seesaw_events (
117                event_id, parent_id, correlation_id, event_type, payload, hops, created_at
118             )
119             VALUES ($1, $2, $3, $4, $5, $6, $7)",
120        )
121        .bind(event.event_id)
122        .bind(event.parent_id)
123        .bind(event.correlation_id)
124        .bind(event.event_type)
125        .bind(event.payload)
126        .bind(event.hops)
127        .bind(event.created_at)
128        .execute(&mut *tx)
129        .await?;
130
131        tx.commit().await?;
132
133        Ok(())
134    }
135
136    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
137        let row: Option<EventRow> = sqlx::query_as(
138            "WITH next_event AS (
139                SELECT e.id
140                FROM seesaw_events e
141                WHERE e.processed_at IS NULL
142                  AND (e.locked_until IS NULL OR e.locked_until < NOW())
143                  AND NOT EXISTS (
144                    SELECT 1
145                    FROM seesaw_events older
146                    WHERE older.correlation_id = e.correlation_id
147                      AND older.processed_at IS NULL
148                      AND (
149                        older.created_at < e.created_at
150                        OR (older.created_at = e.created_at AND older.id < e.id)
151                      )
152                  )
153                ORDER BY e.created_at ASC, e.id ASC
154                LIMIT 1
155                FOR UPDATE SKIP LOCKED
156            )
157            UPDATE seesaw_events e
158            SET locked_until = NOW() + ($1 * INTERVAL '1 second')
159            FROM next_event
160            WHERE e.id = next_event.id
161            RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload, e.hops, e.created_at",
162        )
163        .bind(EVENT_CLAIM_SECONDS)
164        .fetch_optional(&self.pool)
165        .await?;
166
167        Ok(row.map(|r| QueuedEvent {
168            id: r.id,
169            event_id: r.event_id,
170            parent_id: r.parent_id,
171            correlation_id: r.correlation_id,
172            event_type: r.event_type,
173            payload: r.payload,
174            hops: r.hops,
175            created_at: r.created_at,
176        }))
177    }
178
179    async fn ack(&self, id: i64) -> Result<()> {
180        sqlx::query(
181            "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
182        )
183        .bind(id)
184        .execute(&self.pool)
185        .await?;
186        Ok(())
187    }
188
189    async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
190        let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
191        sqlx::query(
192            "UPDATE seesaw_events
193             SET retry_count = retry_count + 1,
194                 locked_until = $2
195             WHERE id = $1",
196        )
197        .bind(id)
198        .bind(locked_until)
199        .execute(&self.pool)
200        .await?;
201        Ok(())
202    }
203
204    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
205    where
206        S: for<'de> Deserialize<'de> + Send,
207    {
208        let row: Option<StateRow> =
209            sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
210                .bind(correlation_id)
211                .fetch_optional(&self.pool)
212                .await?;
213
214        match row {
215            Some(r) => {
216                let state: S = serde_json::from_value(r.state)?;
217                Ok(Some((state, r.version)))
218            }
219            None => Ok(None),
220        }
221    }
222
223    async fn save_state<S>(
224        &self,
225        correlation_id: Uuid,
226        state: &S,
227        expected_version: i32,
228    ) -> Result<i32>
229    where
230        S: Serialize + Send + Sync,
231    {
232        let state_json = serde_json::to_value(state)?;
233        let new_version = expected_version + 1;
234
235        let result = sqlx::query(
236            "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
237             VALUES ($1, $2, $3, NOW())
238             ON CONFLICT (correlation_id) DO UPDATE
239             SET state = $2,
240                 version = $3,
241                 updated_at = NOW()
242             WHERE seesaw_state.version = $4",
243        )
244        .bind(correlation_id)
245        .bind(&state_json)
246        .bind(new_version)
247        .bind(expected_version)
248        .execute(&self.pool)
249        .await?;
250
251        if result.rows_affected() == 0 {
252            anyhow::bail!("Version conflict: state was modified concurrently");
253        }
254
255        Ok(new_version)
256    }
257
258    async fn insert_effect_intent(
259        &self,
260        event_id: Uuid,
261        effect_id: String,
262        correlation_id: Uuid,
263        event_type: String,
264        event_payload: serde_json::Value,
265        parent_event_id: Option<Uuid>,
266        execute_at: DateTime<Utc>,
267        timeout_seconds: i32,
268        max_attempts: i32,
269        priority: i32,
270    ) -> Result<()> {
271        sqlx::query(
272            "INSERT INTO seesaw_effect_executions (
273                event_id, effect_id, correlation_id, status,
274                event_type, event_payload, parent_event_id,
275                execute_at, timeout_seconds, max_attempts, priority
276             )
277             VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10)",
278        )
279        .bind(event_id)
280        .bind(effect_id)
281        .bind(correlation_id)
282        .bind(event_type)
283        .bind(event_payload)
284        .bind(parent_event_id)
285        .bind(execute_at)
286        .bind(timeout_seconds)
287        .bind(max_attempts)
288        .bind(priority)
289        .execute(&self.pool)
290        .await?;
291
292        Ok(())
293    }
294
295    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
296        let row: Option<EffectRow> = sqlx::query_as(
297            "WITH next_effect AS (
298                SELECT event_id, effect_id
299                FROM seesaw_effect_executions
300                WHERE (
301                    status = 'pending'
302                    OR (status = 'failed' AND attempts < max_attempts)
303                )
304                  AND execute_at <= NOW()
305                ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
306                LIMIT 1
307                FOR UPDATE SKIP LOCKED
308            )
309            UPDATE seesaw_effect_executions e
310            SET status = 'executing',
311                claimed_at = NOW(),
312                last_attempted_at = NOW(),
313                attempts = e.attempts + 1
314            FROM next_effect
315            WHERE e.event_id = next_effect.event_id
316              AND e.effect_id = next_effect.effect_id
317            RETURNING
318                e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
319                e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
320        )
321        .fetch_optional(&self.pool)
322        .await?;
323
324        if let Some(r) = row {
325            Ok(Some(QueuedEffectExecution {
326                event_id: r.event_id,
327                effect_id: r.effect_id,
328                correlation_id: r.correlation_id,
329                event_type: r.event_type,
330                event_payload: r.event_payload,
331                parent_event_id: r.parent_event_id,
332                execute_at: r.execute_at,
333                timeout_seconds: r.timeout_seconds,
334                max_attempts: r.max_attempts,
335                priority: r.priority,
336                attempts: r.attempts,
337            }))
338        } else {
339            Ok(None)
340        }
341    }
342
343    async fn complete_effect(
344        &self,
345        event_id: Uuid,
346        effect_id: String,
347        result: serde_json::Value,
348    ) -> Result<()> {
349        sqlx::query(
350            "UPDATE seesaw_effect_executions
351             SET status = 'completed',
352                 result = $3,
353                 completed_at = NOW()
354             WHERE event_id = $1 AND effect_id = $2",
355        )
356        .bind(event_id)
357        .bind(effect_id)
358        .bind(result)
359        .execute(&self.pool)
360        .await?;
361
362        Ok(())
363    }
364
365    async fn complete_effect_with_events(
366        &self,
367        event_id: Uuid,
368        effect_id: String,
369        result: serde_json::Value,
370        emitted_events: Vec<EmittedEvent>,
371    ) -> Result<()> {
372        // Get correlation_id and hops for emitted events
373        let effect: EffectRow = sqlx::query_as(
374            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
375                    execute_at, timeout_seconds, max_attempts, priority, attempts
376             FROM seesaw_effect_executions
377             WHERE event_id = $1 AND effect_id = $2",
378        )
379        .bind(event_id)
380        .bind(&effect_id)
381        .fetch_one(&self.pool)
382        .await?;
383
384        // Read parent metadata for deterministic hop increment and timestamp.
385        let parent: ParentEventRow = sqlx::query_as(
386            "SELECT hops, created_at
387             FROM seesaw_events
388             WHERE event_id = $1
389             ORDER BY created_at ASC, id ASC
390             LIMIT 1",
391        )
392        .bind(event_id)
393        .fetch_one(&self.pool)
394        .await?;
395
396        // Start transaction for atomicity
397        let mut tx = self.pool.begin().await?;
398
399        // Insert emitted events with deterministic IDs
400        for emitted in emitted_events {
401            // Generate deterministic event_id from hash(parent_event_id, effect_id, event_type)
402            let deterministic_id = Uuid::new_v5(
403                &NAMESPACE_SEESAW,
404                format!("{}-{}-{}", event_id, effect_id, emitted.event_type).as_bytes(),
405            );
406
407            // Deterministic timestamp keeps retries idempotent while staying in
408            // the same partition day as the parent event.
409            let deterministic_timestamp = emitted_event_created_at(parent.created_at);
410
411            // Insert event (idempotent via ON CONFLICT on (event_id, created_at))
412            sqlx::query(
413                "INSERT INTO seesaw_events (
414                    event_id, parent_id, correlation_id, event_type, payload, hops, created_at
415                 )
416                 VALUES ($1, $2, $3, $4, $5, $6, $7)
417                 ON CONFLICT (event_id, created_at) DO NOTHING",
418            )
419            .bind(deterministic_id)
420            .bind(Some(event_id))
421            .bind(effect.correlation_id)
422            .bind(&emitted.event_type)
423            .bind(emitted.payload)
424            .bind(parent.hops + 1)
425            .bind(deterministic_timestamp)
426            .execute(&mut *tx)
427            .await?;
428        }
429
430        // Mark effect as completed (same transaction)
431        sqlx::query(
432            "UPDATE seesaw_effect_executions
433             SET status = 'completed',
434                 result = $3,
435                 completed_at = NOW()
436             WHERE event_id = $1 AND effect_id = $2",
437        )
438        .bind(event_id)
439        .bind(effect_id)
440        .bind(result)
441        .execute(&mut *tx)
442        .await?;
443
444        // Commit transaction - both succeed or both fail
445        tx.commit().await?;
446
447        Ok(())
448    }
449
450    async fn fail_effect(
451        &self,
452        event_id: Uuid,
453        effect_id: String,
454        error: String,
455        attempts: i32,
456    ) -> Result<()> {
457        let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
458        sqlx::query(
459            "UPDATE seesaw_effect_executions
460             SET status = 'failed',
461                 error = $3,
462                 execute_at = $5,
463                 claimed_at = NULL
464             WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
465        )
466        .bind(event_id)
467        .bind(effect_id)
468        .bind(error)
469        .bind(attempts)
470        .bind(retry_at)
471        .execute(&self.pool)
472        .await?;
473
474        Ok(())
475    }
476
477    async fn dlq_effect(
478        &self,
479        event_id: Uuid,
480        effect_id: String,
481        error: String,
482        reason: String,
483        attempts: i32,
484    ) -> Result<()> {
485        // Get effect details for DLQ
486        let effect: EffectRow = sqlx::query_as(
487            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
488                    execute_at, timeout_seconds, max_attempts, priority, attempts
489             FROM seesaw_effect_executions
490             WHERE event_id = $1 AND effect_id = $2",
491        )
492        .bind(event_id)
493        .bind(&effect_id)
494        .fetch_one(&self.pool)
495        .await?;
496
497        // Insert into DLQ
498        sqlx::query(
499            "INSERT INTO seesaw_dlq (
500                event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
501             )
502             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
503        )
504        .bind(event_id)
505        .bind(&effect_id)
506        .bind(effect.correlation_id)
507        .bind(error)
508        .bind(effect.event_type)
509        .bind(effect.event_payload)
510        .bind(reason)
511        .bind(attempts)
512        .execute(&self.pool)
513        .await?;
514
515        // Delete from executions table
516        sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
517            .bind(event_id)
518            .bind(effect_id)
519            .execute(&self.pool)
520            .await?;
521
522        Ok(())
523    }
524
525    async fn subscribe_workflow_events(
526        &self,
527        correlation_id: Uuid,
528    ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
529        use futures::stream::StreamExt;
530        use sqlx::postgres::PgListener;
531
532        let channel = format!("seesaw_workflow_{}", correlation_id);
533
534        // Create a new listener connection
535        let mut listener = PgListener::connect_with(&self.pool).await?;
536        listener.listen(&channel).await?;
537
538        let pool = self.pool.clone();
539
540        // Convert listener into a stream of WorkflowEvent
541        let stream = listener.into_stream().filter_map(move |result| {
542            let pool = pool.clone();
543            Box::pin(async move {
544                match result {
545                    Ok(notification) => {
546                        // Parse notification metadata (no payload due to 8000-byte pg_notify limit)
547                        #[derive(serde::Deserialize)]
548                        struct NotificationMeta {
549                            event_id: Uuid,
550                            correlation_id: Uuid,
551                            event_type: String,
552                        }
553
554                        let meta = serde_json::from_str::<NotificationMeta>(notification.payload()).ok()?;
555
556                        // Fetch full event from database
557                        sqlx::query_as::<_, (Uuid, Uuid, String, serde_json::Value)>(
558                            "SELECT event_id, correlation_id, event_type, payload
559                             FROM seesaw_events
560                             WHERE event_id = $1"
561                        )
562                        .bind(meta.event_id)
563                        .fetch_optional(&pool)
564                        .await
565                        .ok()?
566                        .map(|(event_id, correlation_id, event_type, payload)| {
567                            seesaw_core::WorkflowEvent {
568                                event_id,
569                                correlation_id,
570                                event_type,
571                                payload,
572                            }
573                        })
574                    }
575                    Err(_) => None,
576                }
577            })
578        });
579
580        Ok(Box::new(stream))
581    }
582
583    async fn get_workflow_status(
584        &self,
585        correlation_id: Uuid,
586    ) -> Result<seesaw_core::WorkflowStatus> {
587        let state = sqlx::query_as::<_, (serde_json::Value,)>(
588            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
589        )
590        .bind(correlation_id)
591        .fetch_optional(&self.pool)
592        .await?
593        .map(|r| r.0);
594
595        let pending_effects = sqlx::query_as::<_, (i64,)>(
596            "SELECT COUNT(*) FROM seesaw_effect_executions
597             WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
598        )
599        .bind(correlation_id)
600        .fetch_one(&self.pool)
601        .await?
602        .0;
603
604        let last_event = sqlx::query_as::<_, (String,)>(
605            "SELECT event_type FROM seesaw_events
606             WHERE correlation_id = $1
607             ORDER BY created_at DESC, id DESC
608             LIMIT 1",
609        )
610        .bind(correlation_id)
611        .fetch_optional(&self.pool)
612        .await?
613        .map(|r| r.0);
614
615        Ok(seesaw_core::WorkflowStatus {
616            correlation_id,
617            state,
618            pending_effects,
619            is_settled: pending_effects == 0,
620            last_event,
621        })
622    }
623}
624
625#[derive(FromRow)]
626struct StreamRow {
627    seq: i64,
628    stream_type: String,
629    correlation_id: Uuid,
630    event_id: Option<Uuid>,
631    effect_event_id: Option<Uuid>,
632    effect_id: Option<String>,
633    status: Option<String>,
634    error: Option<String>,
635    payload: Option<serde_json::Value>,
636    created_at: DateTime<Utc>,
637}
638
639#[async_trait]
640impl InsightStore for PostgresStore {
641    async fn subscribe_events(
642        &self,
643    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
644        use futures::stream::StreamExt;
645        use sqlx::postgres::PgListener;
646
647        // Create a new listener connection
648        let mut listener = PgListener::connect_with(&self.pool).await?;
649        listener.listen("seesaw_stream").await?;
650
651        // Convert listener into a stream of InsightEvent
652        let pool = self.pool.clone();
653        let stream = listener.into_stream().filter_map(move |result| {
654            let pool = pool.clone();
655            Box::pin(async move {
656                match result {
657                    Ok(_notification) => {
658                        // Fetch latest entry from stream table
659                        // (notification payload is just correlation_id for wake-up)
660                        if let Ok(row) = sqlx::query_as::<_, StreamRow>(
661                            "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
662                                    effect_id, status, error, payload, created_at
663                             FROM seesaw_stream
664                             ORDER BY seq DESC
665                             LIMIT 1",
666                        )
667                        .fetch_one(&pool)
668                        .await
669                        {
670                            Some(stream_row_to_insight_event(row))
671                        } else {
672                            None
673                        }
674                    }
675                    Err(_) => None,
676                }
677            })
678        });
679
680        Ok(Box::new(stream))
681    }
682
683    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
684        // Get all events for this correlation
685        let events = sqlx::query_as::<_, EventRow>(
686            "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops, created_at
687             FROM seesaw_events
688             WHERE correlation_id = $1
689             ORDER BY created_at ASC",
690        )
691        .bind(correlation_id)
692        .fetch_all(&self.pool)
693        .await?;
694
695        // Get all effects for this correlation
696        let effects = sqlx::query_as::<_, EffectTreeRow>(
697            "SELECT event_id, effect_id, status, result, error, attempts, created_at
698             FROM seesaw_effect_executions
699             WHERE correlation_id = $1
700             ORDER BY created_at ASC",
701        )
702        .bind(correlation_id)
703        .fetch_all(&self.pool)
704        .await?;
705
706        // Build tree structure
707        let roots = build_event_tree(&events, &effects, None);
708
709        // Get state
710        let state = sqlx::query_as::<_, (serde_json::Value,)>(
711            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
712        )
713        .bind(correlation_id)
714        .fetch_optional(&self.pool)
715        .await?
716        .map(|r| r.0);
717
718        Ok(WorkflowTree {
719            correlation_id,
720            roots,
721            state,
722            event_count: events.len(),
723            effect_count: effects.len(),
724        })
725    }
726
727    async fn get_stats(&self) -> Result<InsightStats> {
728        let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
729            .fetch_one(&self.pool)
730            .await?
731            .0;
732
733        let active_effects = sqlx::query_as::<_, (i64,)>(
734            "SELECT COUNT(*) FROM seesaw_effect_executions
735             WHERE status IN ('pending', 'executing')",
736        )
737        .fetch_one(&self.pool)
738        .await?
739        .0;
740
741        let completed_effects = sqlx::query_as::<_, (i64,)>(
742            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
743        )
744        .fetch_one(&self.pool)
745        .await?
746        .0;
747
748        let failed_effects = sqlx::query_as::<_, (i64,)>(
749            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
750        )
751        .fetch_one(&self.pool)
752        .await?
753        .0;
754
755        Ok(InsightStats {
756            total_events,
757            active_effects,
758            completed_effects,
759            failed_effects,
760        })
761    }
762
763    async fn get_recent_events(
764        &self,
765        cursor: Option<i64>,
766        limit: usize,
767    ) -> Result<Vec<InsightEvent>> {
768        let rows = if let Some(cursor_seq) = cursor {
769            sqlx::query_as::<_, StreamRow>(
770                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
771                        effect_id, status, error, payload, created_at
772                 FROM seesaw_stream
773                 WHERE seq > $1
774                 ORDER BY seq ASC
775                 LIMIT $2",
776            )
777            .bind(cursor_seq)
778            .bind(limit as i64)
779            .fetch_all(&self.pool)
780            .await?
781        } else {
782            sqlx::query_as::<_, StreamRow>(
783                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
784                        effect_id, status, error, payload, created_at
785                 FROM seesaw_stream
786                 ORDER BY seq DESC
787                 LIMIT $1",
788            )
789            .bind(limit as i64)
790            .fetch_all(&self.pool)
791            .await?
792        };
793
794        Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
795    }
796}
797
798#[derive(FromRow)]
799struct EffectTreeRow {
800    event_id: Uuid,
801    effect_id: String,
802    status: String,
803    result: Option<serde_json::Value>,
804    error: Option<String>,
805    attempts: i32,
806    created_at: DateTime<Utc>,
807}
808
809fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
810    let stream_type = match row.stream_type.as_str() {
811        "event_dispatched" => StreamType::EventDispatched,
812        "effect_started" => StreamType::EffectStarted,
813        "effect_completed" => StreamType::EffectCompleted,
814        "effect_failed" => StreamType::EffectFailed,
815        _ => StreamType::EventDispatched, // Default fallback
816    };
817
818    // Extract event_type from payload if it's an event
819    let event_type = if stream_type == StreamType::EventDispatched {
820        row.payload
821            .as_ref()
822            .and_then(|p| p.get("event_type"))
823            .and_then(|v| v.as_str())
824            .map(|s| s.to_string())
825    } else {
826        None
827    };
828
829    InsightEvent {
830        seq: row.seq,
831        stream_type,
832        correlation_id: row.correlation_id,
833        event_id: row.event_id,
834        effect_event_id: row.effect_event_id,
835        effect_id: row.effect_id,
836        event_type,
837        status: row.status,
838        error: row.error,
839        payload: row.payload,
840        created_at: row.created_at,
841    }
842}
843
844fn build_event_tree(
845    events: &[EventRow],
846    effects: &[EffectTreeRow],
847    parent_id: Option<Uuid>,
848) -> Vec<EventNode> {
849    events
850        .iter()
851        .filter(|e| e.parent_id == parent_id)
852        .map(|event| {
853            // Get effects for this event
854            let event_effects: Vec<EffectNode> = effects
855                .iter()
856                .filter(|eff| eff.event_id == event.event_id)
857                .map(|eff| EffectNode {
858                    effect_id: eff.effect_id.clone(),
859                    event_id: eff.event_id,
860                    status: eff.status.clone(),
861                    result: eff.result.clone(),
862                    error: eff.error.clone(),
863                    attempts: eff.attempts,
864                    created_at: eff.created_at,
865                })
866                .collect();
867
868            // Recursively build children
869            let children = build_event_tree(events, effects, Some(event.event_id));
870
871            EventNode {
872                event_id: event.event_id,
873                event_type: event.event_type.clone(),
874                payload: event.payload.clone(),
875                created_at: event.created_at,
876                children,
877                effects: event_effects,
878            }
879        })
880        .collect()
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use chrono::{TimeZone, Timelike};
887
888    #[test]
889    fn emitted_event_created_at_is_midnight_on_parent_day() {
890        let parent = Utc
891            .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
892            .single()
893            .expect("valid timestamp");
894
895        let emitted = emitted_event_created_at(parent);
896
897        assert_eq!(emitted.date_naive(), parent.date_naive());
898        assert_eq!(emitted.hour(), 0);
899        assert_eq!(emitted.minute(), 0);
900        assert_eq!(emitted.second(), 0);
901    }
902
903    #[test]
904    fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
905        let first_parent = Utc
906            .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
907            .single()
908            .expect("valid timestamp");
909        let second_parent = Utc
910            .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
911            .single()
912            .expect("valid timestamp");
913
914        let first_emitted = emitted_event_created_at(first_parent);
915        let second_emitted = emitted_event_created_at(second_parent);
916
917        assert_eq!(first_emitted, second_emitted);
918    }
919
920    #[test]
921    fn effect_retry_delay_seconds_uses_exponential_backoff() {
922        assert_eq!(effect_retry_delay_seconds(1), 1);
923        assert_eq!(effect_retry_delay_seconds(2), 2);
924        assert_eq!(effect_retry_delay_seconds(3), 4);
925        assert_eq!(effect_retry_delay_seconds(4), 8);
926    }
927
928    #[test]
929    fn effect_retry_delay_seconds_is_capped() {
930        assert_eq!(effect_retry_delay_seconds(9), 256);
931        assert_eq!(effect_retry_delay_seconds(50), 256);
932    }
933}