Skip to main content

seesaw_postgres/
lib.rs

1// Simplified PostgresStore without compile-time checked queries
2// Uses dynamic queries for easier testing
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8    insight::*, EmittedEvent, QueuedEffectExecution, QueuedEvent, Store, NAMESPACE_SEESAW,
9};
10use serde::{Deserialize, Serialize};
11use sqlx::{FromRow, PgPool};
12use uuid::Uuid;
13
14const EVENT_CLAIM_SECONDS: i64 = 30;
15
16fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
17    parent_created_at
18        .date_naive()
19        .and_hms_opt(0, 0, 0)
20        .expect("midnight should always be a valid UTC timestamp")
21        .and_utc()
22}
23
24fn effect_retry_delay_seconds(attempts: i32) -> i64 {
25    let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
26    1_i64 << exponent
27}
28
29/// PostgreSQL implementation of Store trait
30pub struct PostgresStore {
31    pool: PgPool,
32}
33
34impl PostgresStore {
35    pub fn new(pool: PgPool) -> Self {
36        Self { pool }
37    }
38
39    pub fn pool(&self) -> &PgPool {
40        &self.pool
41    }
42}
43
44impl Clone for PostgresStore {
45    fn clone(&self) -> Self {
46        Self {
47            pool: self.pool.clone(),
48        }
49    }
50}
51
52#[derive(FromRow)]
53struct EventRow {
54    id: i64,
55    event_id: Uuid,
56    parent_id: Option<Uuid>,
57    correlation_id: Uuid,
58    event_type: String,
59    payload: serde_json::Value,
60    hops: i32,
61    created_at: DateTime<Utc>,
62}
63
64#[derive(FromRow)]
65struct StateRow {
66    state: serde_json::Value,
67    version: i32,
68}
69
70#[derive(FromRow)]
71struct EffectRow {
72    event_id: Uuid,
73    effect_id: String,
74    correlation_id: Uuid,
75    event_type: String,
76    event_payload: serde_json::Value,
77    parent_event_id: Option<Uuid>,
78    execute_at: DateTime<Utc>,
79    timeout_seconds: i32,
80    max_attempts: i32,
81    priority: i32,
82    attempts: i32,
83}
84
85#[derive(FromRow)]
86struct ParentEventRow {
87    hops: i32,
88    created_at: DateTime<Utc>,
89}
90
91#[async_trait]
92impl Store for PostgresStore {
93    async fn publish(&self, event: QueuedEvent) -> Result<()> {
94        let mut tx = self.pool.begin().await?;
95
96        // Use the non-partitioned ledger as the idempotency guard. This keeps
97        // webhook/process_with_id dedupe stable even when created_at differs.
98        let inserted: Option<Uuid> = sqlx::query_scalar(
99            "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
100             VALUES ($1, $2, $3)
101             ON CONFLICT (event_id) DO NOTHING
102             RETURNING event_id",
103        )
104        .bind(event.event_id)
105        .bind(event.correlation_id)
106        .bind(event.created_at)
107        .fetch_optional(&mut *tx)
108        .await?;
109
110        if inserted.is_none() {
111            tx.commit().await?;
112            return Ok(());
113        }
114
115        sqlx::query(
116            "INSERT INTO seesaw_events (
117                event_id, parent_id, correlation_id, event_type, payload, hops, created_at
118             )
119             VALUES ($1, $2, $3, $4, $5, $6, $7)",
120        )
121        .bind(event.event_id)
122        .bind(event.parent_id)
123        .bind(event.correlation_id)
124        .bind(event.event_type)
125        .bind(event.payload)
126        .bind(event.hops)
127        .bind(event.created_at)
128        .execute(&mut *tx)
129        .await?;
130
131        tx.commit().await?;
132
133        Ok(())
134    }
135
136    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
137        let row: Option<EventRow> = sqlx::query_as(
138            "WITH next_event AS (
139                SELECT e.id
140                FROM seesaw_events e
141                WHERE e.processed_at IS NULL
142                  AND (e.locked_until IS NULL OR e.locked_until < NOW())
143                  AND NOT EXISTS (
144                    SELECT 1
145                    FROM seesaw_events older
146                    WHERE older.correlation_id = e.correlation_id
147                      AND older.processed_at IS NULL
148                      AND (
149                        older.created_at < e.created_at
150                        OR (older.created_at = e.created_at AND older.id < e.id)
151                      )
152                  )
153                ORDER BY e.created_at ASC, e.id ASC
154                LIMIT 1
155                FOR UPDATE SKIP LOCKED
156            )
157            UPDATE seesaw_events e
158            SET locked_until = NOW() + ($1 * INTERVAL '1 second')
159            FROM next_event
160            WHERE e.id = next_event.id
161            RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload, e.hops, e.created_at",
162        )
163        .bind(EVENT_CLAIM_SECONDS)
164        .fetch_optional(&self.pool)
165        .await?;
166
167        Ok(row.map(|r| QueuedEvent {
168            id: r.id,
169            event_id: r.event_id,
170            parent_id: r.parent_id,
171            correlation_id: r.correlation_id,
172            event_type: r.event_type,
173            payload: r.payload,
174            hops: r.hops,
175            created_at: r.created_at,
176        }))
177    }
178
179    async fn ack(&self, id: i64) -> Result<()> {
180        sqlx::query(
181            "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
182        )
183        .bind(id)
184        .execute(&self.pool)
185        .await?;
186        Ok(())
187    }
188
189    async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
190        let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
191        sqlx::query(
192            "UPDATE seesaw_events
193             SET retry_count = retry_count + 1,
194                 locked_until = $2
195             WHERE id = $1",
196        )
197        .bind(id)
198        .bind(locked_until)
199        .execute(&self.pool)
200        .await?;
201        Ok(())
202    }
203
204    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
205    where
206        S: for<'de> Deserialize<'de> + Send,
207    {
208        let row: Option<StateRow> =
209            sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
210                .bind(correlation_id)
211                .fetch_optional(&self.pool)
212                .await?;
213
214        match row {
215            Some(r) => {
216                let state: S = serde_json::from_value(r.state)?;
217                Ok(Some((state, r.version)))
218            }
219            None => Ok(None),
220        }
221    }
222
223    async fn save_state<S>(
224        &self,
225        correlation_id: Uuid,
226        state: &S,
227        expected_version: i32,
228    ) -> Result<i32>
229    where
230        S: Serialize + Send + Sync,
231    {
232        let state_json = serde_json::to_value(state)?;
233        let new_version = expected_version + 1;
234
235        let result = sqlx::query(
236            "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
237             VALUES ($1, $2, $3, NOW())
238             ON CONFLICT (correlation_id) DO UPDATE
239             SET state = $2,
240                 version = $3,
241                 updated_at = NOW()
242             WHERE seesaw_state.version = $4",
243        )
244        .bind(correlation_id)
245        .bind(&state_json)
246        .bind(new_version)
247        .bind(expected_version)
248        .execute(&self.pool)
249        .await?;
250
251        if result.rows_affected() == 0 {
252            anyhow::bail!("Version conflict: state was modified concurrently");
253        }
254
255        Ok(new_version)
256    }
257
258    async fn insert_effect_intent(
259        &self,
260        event_id: Uuid,
261        effect_id: String,
262        correlation_id: Uuid,
263        event_type: String,
264        event_payload: serde_json::Value,
265        parent_event_id: Option<Uuid>,
266        execute_at: DateTime<Utc>,
267        timeout_seconds: i32,
268        max_attempts: i32,
269        priority: i32,
270    ) -> Result<()> {
271        sqlx::query(
272            "INSERT INTO seesaw_effect_executions (
273                event_id, effect_id, correlation_id, status,
274                event_type, event_payload, parent_event_id,
275                execute_at, timeout_seconds, max_attempts, priority
276             )
277             VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10)",
278        )
279        .bind(event_id)
280        .bind(effect_id)
281        .bind(correlation_id)
282        .bind(event_type)
283        .bind(event_payload)
284        .bind(parent_event_id)
285        .bind(execute_at)
286        .bind(timeout_seconds)
287        .bind(max_attempts)
288        .bind(priority)
289        .execute(&self.pool)
290        .await?;
291
292        Ok(())
293    }
294
295    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
296        let row: Option<EffectRow> = sqlx::query_as(
297            "WITH next_effect AS (
298                SELECT event_id, effect_id
299                FROM seesaw_effect_executions
300                WHERE (
301                    status = 'pending'
302                    OR (status = 'failed' AND attempts < max_attempts)
303                )
304                  AND execute_at <= NOW()
305                ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
306                LIMIT 1
307                FOR UPDATE SKIP LOCKED
308            )
309            UPDATE seesaw_effect_executions e
310            SET status = 'executing',
311                claimed_at = NOW(),
312                last_attempted_at = NOW(),
313                attempts = e.attempts + 1
314            FROM next_effect
315            WHERE e.event_id = next_effect.event_id
316              AND e.effect_id = next_effect.effect_id
317            RETURNING
318                e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
319                e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
320        )
321        .fetch_optional(&self.pool)
322        .await?;
323
324        if let Some(r) = row {
325            Ok(Some(QueuedEffectExecution {
326                event_id: r.event_id,
327                effect_id: r.effect_id,
328                correlation_id: r.correlation_id,
329                event_type: r.event_type,
330                event_payload: r.event_payload,
331                parent_event_id: r.parent_event_id,
332                execute_at: r.execute_at,
333                timeout_seconds: r.timeout_seconds,
334                max_attempts: r.max_attempts,
335                priority: r.priority,
336                attempts: r.attempts,
337            }))
338        } else {
339            Ok(None)
340        }
341    }
342
343    async fn complete_effect(
344        &self,
345        event_id: Uuid,
346        effect_id: String,
347        result: serde_json::Value,
348    ) -> Result<()> {
349        sqlx::query(
350            "UPDATE seesaw_effect_executions
351             SET status = 'completed',
352                 result = $3,
353                 completed_at = NOW()
354             WHERE event_id = $1 AND effect_id = $2",
355        )
356        .bind(event_id)
357        .bind(effect_id)
358        .bind(result)
359        .execute(&self.pool)
360        .await?;
361
362        Ok(())
363    }
364
365    async fn complete_effect_with_events(
366        &self,
367        event_id: Uuid,
368        effect_id: String,
369        result: serde_json::Value,
370        emitted_events: Vec<EmittedEvent>,
371    ) -> Result<()> {
372        // Get correlation_id and hops for emitted events
373        let effect: EffectRow = sqlx::query_as(
374            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
375                    execute_at, timeout_seconds, max_attempts, priority, attempts
376             FROM seesaw_effect_executions
377             WHERE event_id = $1 AND effect_id = $2",
378        )
379        .bind(event_id)
380        .bind(&effect_id)
381        .fetch_one(&self.pool)
382        .await?;
383
384        // Read parent metadata for deterministic hop increment and timestamp.
385        let parent: ParentEventRow = sqlx::query_as(
386            "SELECT hops, created_at
387             FROM seesaw_events
388             WHERE event_id = $1
389             ORDER BY created_at ASC, id ASC
390             LIMIT 1",
391        )
392        .bind(event_id)
393        .fetch_one(&self.pool)
394        .await?;
395
396        // Start transaction for atomicity
397        let mut tx = self.pool.begin().await?;
398
399        // Insert emitted events with deterministic IDs
400        for emitted in emitted_events {
401            // Generate deterministic event_id from hash(parent_event_id, effect_id, event_type)
402            let deterministic_id = Uuid::new_v5(
403                &NAMESPACE_SEESAW,
404                format!("{}-{}-{}", event_id, effect_id, emitted.event_type).as_bytes(),
405            );
406
407            // Deterministic timestamp keeps retries idempotent while staying in
408            // the same partition day as the parent event.
409            let deterministic_timestamp = emitted_event_created_at(parent.created_at);
410
411            // Insert event (idempotent via ON CONFLICT on (event_id, created_at))
412            sqlx::query(
413                "INSERT INTO seesaw_events (
414                    event_id, parent_id, correlation_id, event_type, payload, hops, created_at
415                 )
416                 VALUES ($1, $2, $3, $4, $5, $6, $7)
417                 ON CONFLICT (event_id, created_at) DO NOTHING",
418            )
419            .bind(deterministic_id)
420            .bind(Some(event_id))
421            .bind(effect.correlation_id)
422            .bind(&emitted.event_type)
423            .bind(emitted.payload)
424            .bind(parent.hops + 1)
425            .bind(deterministic_timestamp)
426            .execute(&mut *tx)
427            .await?;
428        }
429
430        // Mark effect as completed (same transaction)
431        sqlx::query(
432            "UPDATE seesaw_effect_executions
433             SET status = 'completed',
434                 result = $3,
435                 completed_at = NOW()
436             WHERE event_id = $1 AND effect_id = $2",
437        )
438        .bind(event_id)
439        .bind(effect_id)
440        .bind(result)
441        .execute(&mut *tx)
442        .await?;
443
444        // Commit transaction - both succeed or both fail
445        tx.commit().await?;
446
447        Ok(())
448    }
449
450    async fn fail_effect(
451        &self,
452        event_id: Uuid,
453        effect_id: String,
454        error: String,
455        attempts: i32,
456    ) -> Result<()> {
457        let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
458        sqlx::query(
459            "UPDATE seesaw_effect_executions
460             SET status = 'failed',
461                 error = $3,
462                 execute_at = $5,
463                 claimed_at = NULL
464             WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
465        )
466        .bind(event_id)
467        .bind(effect_id)
468        .bind(error)
469        .bind(attempts)
470        .bind(retry_at)
471        .execute(&self.pool)
472        .await?;
473
474        Ok(())
475    }
476
477    async fn dlq_effect(
478        &self,
479        event_id: Uuid,
480        effect_id: String,
481        error: String,
482        reason: String,
483        attempts: i32,
484    ) -> Result<()> {
485        // Get effect details for DLQ
486        let effect: EffectRow = sqlx::query_as(
487            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
488                    execute_at, timeout_seconds, max_attempts, priority, attempts
489             FROM seesaw_effect_executions
490             WHERE event_id = $1 AND effect_id = $2",
491        )
492        .bind(event_id)
493        .bind(&effect_id)
494        .fetch_one(&self.pool)
495        .await?;
496
497        // Insert into DLQ
498        sqlx::query(
499            "INSERT INTO seesaw_dlq (
500                event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
501             )
502             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
503        )
504        .bind(event_id)
505        .bind(&effect_id)
506        .bind(effect.correlation_id)
507        .bind(error)
508        .bind(effect.event_type)
509        .bind(effect.event_payload)
510        .bind(reason)
511        .bind(attempts)
512        .execute(&self.pool)
513        .await?;
514
515        // Delete from executions table
516        sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
517            .bind(event_id)
518            .bind(effect_id)
519            .execute(&self.pool)
520            .await?;
521
522        Ok(())
523    }
524
525    async fn subscribe_workflow_events(
526        &self,
527        correlation_id: Uuid,
528    ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
529        use futures::stream::StreamExt;
530        use sqlx::postgres::PgListener;
531
532        let channel = format!("seesaw_workflow_{}", correlation_id);
533
534        // Create a new listener connection
535        let mut listener = PgListener::connect_with(&self.pool).await?;
536        listener.listen(&channel).await?;
537
538        // Convert listener into a stream of WorkflowEvent
539        let stream = listener.into_stream().filter_map(|result| {
540            Box::pin(async move {
541                match result {
542                    Ok(notification) => {
543                        // Parse the JSON payload from the notification
544                        if let Ok(event) =
545                            serde_json::from_str::<seesaw_core::WorkflowEvent>(notification.payload())
546                        {
547                            Some(event)
548                        } else {
549                            None
550                        }
551                    }
552                    Err(_) => None,
553                }
554            })
555        });
556
557        Ok(Box::new(stream))
558    }
559
560    async fn get_workflow_status(
561        &self,
562        correlation_id: Uuid,
563    ) -> Result<seesaw_core::WorkflowStatus> {
564        let state = sqlx::query_as::<_, (serde_json::Value,)>(
565            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
566        )
567        .bind(correlation_id)
568        .fetch_optional(&self.pool)
569        .await?
570        .map(|r| r.0);
571
572        let pending_effects = sqlx::query_as::<_, (i64,)>(
573            "SELECT COUNT(*) FROM seesaw_effect_executions
574             WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
575        )
576        .bind(correlation_id)
577        .fetch_one(&self.pool)
578        .await?
579        .0;
580
581        let last_event = sqlx::query_as::<_, (String,)>(
582            "SELECT event_type FROM seesaw_events
583             WHERE correlation_id = $1
584             ORDER BY created_at DESC, id DESC
585             LIMIT 1",
586        )
587        .bind(correlation_id)
588        .fetch_optional(&self.pool)
589        .await?
590        .map(|r| r.0);
591
592        Ok(seesaw_core::WorkflowStatus {
593            correlation_id,
594            state,
595            pending_effects,
596            is_settled: pending_effects == 0,
597            last_event,
598        })
599    }
600}
601
602#[derive(FromRow)]
603struct StreamRow {
604    seq: i64,
605    stream_type: String,
606    correlation_id: Uuid,
607    event_id: Option<Uuid>,
608    effect_event_id: Option<Uuid>,
609    effect_id: Option<String>,
610    status: Option<String>,
611    error: Option<String>,
612    payload: Option<serde_json::Value>,
613    created_at: DateTime<Utc>,
614}
615
616#[async_trait]
617impl InsightStore for PostgresStore {
618    async fn subscribe_events(
619        &self,
620    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
621        use futures::stream::StreamExt;
622        use sqlx::postgres::PgListener;
623
624        // Create a new listener connection
625        let mut listener = PgListener::connect_with(&self.pool).await?;
626        listener.listen("seesaw_stream").await?;
627
628        // Convert listener into a stream of InsightEvent
629        let pool = self.pool.clone();
630        let stream = listener.into_stream().filter_map(move |result| {
631            let pool = pool.clone();
632            Box::pin(async move {
633                match result {
634                    Ok(_notification) => {
635                        // Fetch latest entry from stream table
636                        // (notification payload is just correlation_id for wake-up)
637                        if let Ok(row) = sqlx::query_as::<_, StreamRow>(
638                            "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
639                                    effect_id, status, error, payload, created_at
640                             FROM seesaw_stream
641                             ORDER BY seq DESC
642                             LIMIT 1",
643                        )
644                        .fetch_one(&pool)
645                        .await
646                        {
647                            Some(stream_row_to_insight_event(row))
648                        } else {
649                            None
650                        }
651                    }
652                    Err(_) => None,
653                }
654            })
655        });
656
657        Ok(Box::new(stream))
658    }
659
660    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
661        // Get all events for this correlation
662        let events = sqlx::query_as::<_, EventRow>(
663            "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops, created_at
664             FROM seesaw_events
665             WHERE correlation_id = $1
666             ORDER BY created_at ASC",
667        )
668        .bind(correlation_id)
669        .fetch_all(&self.pool)
670        .await?;
671
672        // Get all effects for this correlation
673        let effects = sqlx::query_as::<_, EffectTreeRow>(
674            "SELECT event_id, effect_id, status, result, error, attempts, created_at
675             FROM seesaw_effect_executions
676             WHERE correlation_id = $1
677             ORDER BY created_at ASC",
678        )
679        .bind(correlation_id)
680        .fetch_all(&self.pool)
681        .await?;
682
683        // Build tree structure
684        let roots = build_event_tree(&events, &effects, None);
685
686        // Get state
687        let state = sqlx::query_as::<_, (serde_json::Value,)>(
688            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
689        )
690        .bind(correlation_id)
691        .fetch_optional(&self.pool)
692        .await?
693        .map(|r| r.0);
694
695        Ok(WorkflowTree {
696            correlation_id,
697            roots,
698            state,
699            event_count: events.len(),
700            effect_count: effects.len(),
701        })
702    }
703
704    async fn get_stats(&self) -> Result<InsightStats> {
705        let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
706            .fetch_one(&self.pool)
707            .await?
708            .0;
709
710        let active_effects = sqlx::query_as::<_, (i64,)>(
711            "SELECT COUNT(*) FROM seesaw_effect_executions
712             WHERE status IN ('pending', 'executing')",
713        )
714        .fetch_one(&self.pool)
715        .await?
716        .0;
717
718        let completed_effects = sqlx::query_as::<_, (i64,)>(
719            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
720        )
721        .fetch_one(&self.pool)
722        .await?
723        .0;
724
725        let failed_effects = sqlx::query_as::<_, (i64,)>(
726            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
727        )
728        .fetch_one(&self.pool)
729        .await?
730        .0;
731
732        Ok(InsightStats {
733            total_events,
734            active_effects,
735            completed_effects,
736            failed_effects,
737        })
738    }
739
740    async fn get_recent_events(
741        &self,
742        cursor: Option<i64>,
743        limit: usize,
744    ) -> Result<Vec<InsightEvent>> {
745        let rows = if let Some(cursor_seq) = cursor {
746            sqlx::query_as::<_, StreamRow>(
747                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
748                        effect_id, status, error, payload, created_at
749                 FROM seesaw_stream
750                 WHERE seq > $1
751                 ORDER BY seq ASC
752                 LIMIT $2",
753            )
754            .bind(cursor_seq)
755            .bind(limit as i64)
756            .fetch_all(&self.pool)
757            .await?
758        } else {
759            sqlx::query_as::<_, StreamRow>(
760                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
761                        effect_id, status, error, payload, created_at
762                 FROM seesaw_stream
763                 ORDER BY seq DESC
764                 LIMIT $1",
765            )
766            .bind(limit as i64)
767            .fetch_all(&self.pool)
768            .await?
769        };
770
771        Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
772    }
773}
774
775#[derive(FromRow)]
776struct EffectTreeRow {
777    event_id: Uuid,
778    effect_id: String,
779    status: String,
780    result: Option<serde_json::Value>,
781    error: Option<String>,
782    attempts: i32,
783    created_at: DateTime<Utc>,
784}
785
786fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
787    let stream_type = match row.stream_type.as_str() {
788        "event_dispatched" => StreamType::EventDispatched,
789        "effect_started" => StreamType::EffectStarted,
790        "effect_completed" => StreamType::EffectCompleted,
791        "effect_failed" => StreamType::EffectFailed,
792        _ => StreamType::EventDispatched, // Default fallback
793    };
794
795    // Extract event_type from payload if it's an event
796    let event_type = if stream_type == StreamType::EventDispatched {
797        row.payload
798            .as_ref()
799            .and_then(|p| p.get("event_type"))
800            .and_then(|v| v.as_str())
801            .map(|s| s.to_string())
802    } else {
803        None
804    };
805
806    InsightEvent {
807        seq: row.seq,
808        stream_type,
809        correlation_id: row.correlation_id,
810        event_id: row.event_id,
811        effect_event_id: row.effect_event_id,
812        effect_id: row.effect_id,
813        event_type,
814        status: row.status,
815        error: row.error,
816        payload: row.payload,
817        created_at: row.created_at,
818    }
819}
820
821fn build_event_tree(
822    events: &[EventRow],
823    effects: &[EffectTreeRow],
824    parent_id: Option<Uuid>,
825) -> Vec<EventNode> {
826    events
827        .iter()
828        .filter(|e| e.parent_id == parent_id)
829        .map(|event| {
830            // Get effects for this event
831            let event_effects: Vec<EffectNode> = effects
832                .iter()
833                .filter(|eff| eff.event_id == event.event_id)
834                .map(|eff| EffectNode {
835                    effect_id: eff.effect_id.clone(),
836                    event_id: eff.event_id,
837                    status: eff.status.clone(),
838                    result: eff.result.clone(),
839                    error: eff.error.clone(),
840                    attempts: eff.attempts,
841                    created_at: eff.created_at,
842                })
843                .collect();
844
845            // Recursively build children
846            let children = build_event_tree(events, effects, Some(event.event_id));
847
848            EventNode {
849                event_id: event.event_id,
850                event_type: event.event_type.clone(),
851                payload: event.payload.clone(),
852                created_at: event.created_at,
853                children,
854                effects: event_effects,
855            }
856        })
857        .collect()
858}
859
860#[cfg(test)]
861mod tests {
862    use super::*;
863    use chrono::{TimeZone, Timelike};
864
865    #[test]
866    fn emitted_event_created_at_is_midnight_on_parent_day() {
867        let parent = Utc
868            .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
869            .single()
870            .expect("valid timestamp");
871
872        let emitted = emitted_event_created_at(parent);
873
874        assert_eq!(emitted.date_naive(), parent.date_naive());
875        assert_eq!(emitted.hour(), 0);
876        assert_eq!(emitted.minute(), 0);
877        assert_eq!(emitted.second(), 0);
878    }
879
880    #[test]
881    fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
882        let first_parent = Utc
883            .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
884            .single()
885            .expect("valid timestamp");
886        let second_parent = Utc
887            .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
888            .single()
889            .expect("valid timestamp");
890
891        let first_emitted = emitted_event_created_at(first_parent);
892        let second_emitted = emitted_event_created_at(second_parent);
893
894        assert_eq!(first_emitted, second_emitted);
895    }
896
897    #[test]
898    fn effect_retry_delay_seconds_uses_exponential_backoff() {
899        assert_eq!(effect_retry_delay_seconds(1), 1);
900        assert_eq!(effect_retry_delay_seconds(2), 2);
901        assert_eq!(effect_retry_delay_seconds(3), 4);
902        assert_eq!(effect_retry_delay_seconds(4), 8);
903    }
904
905    #[test]
906    fn effect_retry_delay_seconds_is_capped() {
907        assert_eq!(effect_retry_delay_seconds(9), 256);
908        assert_eq!(effect_retry_delay_seconds(50), 256);
909    }
910}