Skip to main content

seesaw_postgres/
lib.rs

1// Simplified PostgresStore without compile-time checked queries
2// Uses dynamic queries for easier testing
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8    insight::*, EmittedEvent, JoinEntry, QueuedEffectExecution, QueuedEvent, Store,
9    NAMESPACE_SEESAW,
10};
11use serde::{Deserialize, Serialize};
12use sqlx::{FromRow, PgPool};
13use std::collections::HashSet;
14use uuid::Uuid;
15
16const EVENT_CLAIM_SECONDS: i64 = 30;
17
18fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
19    parent_created_at
20        .date_naive()
21        .and_hms_opt(0, 0, 0)
22        .expect("midnight should always be a valid UTC timestamp")
23        .and_utc()
24}
25
26fn effect_retry_delay_seconds(attempts: i32) -> i64 {
27    let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
28    1_i64 << exponent
29}
30
31/// PostgreSQL implementation of Store trait
32pub struct PostgresStore {
33    pool: PgPool,
34}
35
36impl PostgresStore {
37    pub fn new(pool: PgPool) -> Self {
38        Self { pool }
39    }
40
41    pub fn pool(&self) -> &PgPool {
42        &self.pool
43    }
44}
45
46impl Clone for PostgresStore {
47    fn clone(&self) -> Self {
48        Self {
49            pool: self.pool.clone(),
50        }
51    }
52}
53
54#[derive(FromRow)]
55struct EventRow {
56    id: i64,
57    event_id: Uuid,
58    parent_id: Option<Uuid>,
59    correlation_id: Uuid,
60    event_type: String,
61    payload: serde_json::Value,
62    hops: i32,
63    batch_id: Option<Uuid>,
64    batch_index: Option<i32>,
65    batch_size: Option<i32>,
66    created_at: DateTime<Utc>,
67}
68
69#[derive(FromRow)]
70struct StateRow {
71    state: serde_json::Value,
72    version: i32,
73}
74
75#[derive(FromRow)]
76struct EffectRow {
77    event_id: Uuid,
78    effect_id: String,
79    correlation_id: Uuid,
80    event_type: String,
81    event_payload: serde_json::Value,
82    parent_event_id: Option<Uuid>,
83    batch_id: Option<Uuid>,
84    batch_index: Option<i32>,
85    batch_size: Option<i32>,
86    execute_at: DateTime<Utc>,
87    timeout_seconds: i32,
88    max_attempts: i32,
89    priority: i32,
90    attempts: i32,
91}
92
93#[derive(FromRow)]
94struct ParentEventRow {
95    hops: i32,
96    created_at: DateTime<Utc>,
97}
98
99#[derive(FromRow)]
100struct WorkflowEventRow {
101    id: i64,
102    event_id: Uuid,
103    correlation_id: Uuid,
104    event_type: String,
105    payload: serde_json::Value,
106    created_at: DateTime<Utc>,
107}
108
109#[async_trait]
110impl Store for PostgresStore {
111    async fn publish(&self, event: QueuedEvent) -> Result<()> {
112        let mut tx = self.pool.begin().await?;
113
114        // Use the non-partitioned ledger as the idempotency guard. This keeps
115        // webhook/process_with_id dedupe stable even when created_at differs.
116        let inserted: Option<Uuid> = sqlx::query_scalar(
117            "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
118             VALUES ($1, $2, $3)
119             ON CONFLICT (event_id) DO NOTHING
120             RETURNING event_id",
121        )
122        .bind(event.event_id)
123        .bind(event.correlation_id)
124        .bind(event.created_at)
125        .fetch_optional(&mut *tx)
126        .await?;
127
128        if inserted.is_none() {
129            tx.commit().await?;
130            return Ok(());
131        }
132
133        sqlx::query(
134            "INSERT INTO seesaw_events (
135                event_id, parent_id, correlation_id, event_type, payload, hops,
136                batch_id, batch_index, batch_size, created_at
137             )
138             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
139        )
140        .bind(event.event_id)
141        .bind(event.parent_id)
142        .bind(event.correlation_id)
143        .bind(event.event_type)
144        .bind(event.payload)
145        .bind(event.hops)
146        .bind(event.batch_id)
147        .bind(event.batch_index)
148        .bind(event.batch_size)
149        .bind(event.created_at)
150        .execute(&mut *tx)
151        .await?;
152
153        tx.commit().await?;
154
155        Ok(())
156    }
157
158    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
159        let row: Option<EventRow> = sqlx::query_as(
160            "WITH next_event AS (
161                SELECT e.id
162                FROM seesaw_events e
163                WHERE e.processed_at IS NULL
164                  AND (e.locked_until IS NULL OR e.locked_until < NOW())
165                  AND NOT EXISTS (
166                    SELECT 1
167                    FROM seesaw_events older
168                    WHERE older.correlation_id = e.correlation_id
169                      AND older.processed_at IS NULL
170                      AND (
171                        older.created_at < e.created_at
172                        OR (older.created_at = e.created_at AND older.id < e.id)
173                      )
174                  )
175                ORDER BY e.created_at ASC, e.id ASC
176                LIMIT 1
177                FOR UPDATE SKIP LOCKED
178            )
179            UPDATE seesaw_events e
180            SET locked_until = NOW() + ($1 * INTERVAL '1 second')
181            FROM next_event
182            WHERE e.id = next_event.id
183            RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload,
184                      e.hops, e.batch_id, e.batch_index, e.batch_size, e.created_at",
185        )
186        .bind(EVENT_CLAIM_SECONDS)
187        .fetch_optional(&self.pool)
188        .await?;
189
190        Ok(row.map(|r| QueuedEvent {
191            id: r.id,
192            event_id: r.event_id,
193            parent_id: r.parent_id,
194            correlation_id: r.correlation_id,
195            event_type: r.event_type,
196            payload: r.payload,
197            hops: r.hops,
198            batch_id: r.batch_id,
199            batch_index: r.batch_index,
200            batch_size: r.batch_size,
201            created_at: r.created_at,
202        }))
203    }
204
205    async fn ack(&self, id: i64) -> Result<()> {
206        sqlx::query(
207            "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
208        )
209        .bind(id)
210        .execute(&self.pool)
211        .await?;
212        Ok(())
213    }
214
215    async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
216        let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
217        sqlx::query(
218            "UPDATE seesaw_events
219             SET retry_count = retry_count + 1,
220                 locked_until = $2
221             WHERE id = $1",
222        )
223        .bind(id)
224        .bind(locked_until)
225        .execute(&self.pool)
226        .await?;
227        Ok(())
228    }
229
230    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
231    where
232        S: for<'de> Deserialize<'de> + Send,
233    {
234        let row: Option<StateRow> =
235            sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
236                .bind(correlation_id)
237                .fetch_optional(&self.pool)
238                .await?;
239
240        match row {
241            Some(r) => {
242                let state: S = serde_json::from_value(r.state)?;
243                Ok(Some((state, r.version)))
244            }
245            None => Ok(None),
246        }
247    }
248
249    async fn save_state<S>(
250        &self,
251        correlation_id: Uuid,
252        state: &S,
253        expected_version: i32,
254    ) -> Result<i32>
255    where
256        S: Serialize + Send + Sync,
257    {
258        let state_json = serde_json::to_value(state)?;
259        let new_version = expected_version + 1;
260
261        let result = sqlx::query(
262            "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
263             VALUES ($1, $2, $3, NOW())
264             ON CONFLICT (correlation_id) DO UPDATE
265             SET state = $2,
266                 version = $3,
267                 updated_at = NOW()
268             WHERE seesaw_state.version = $4",
269        )
270        .bind(correlation_id)
271        .bind(&state_json)
272        .bind(new_version)
273        .bind(expected_version)
274        .execute(&self.pool)
275        .await?;
276
277        if result.rows_affected() == 0 {
278            anyhow::bail!("Version conflict: state was modified concurrently");
279        }
280
281        Ok(new_version)
282    }
283
284    async fn insert_effect_intent(
285        &self,
286        event_id: Uuid,
287        effect_id: String,
288        correlation_id: Uuid,
289        event_type: String,
290        event_payload: serde_json::Value,
291        parent_event_id: Option<Uuid>,
292        batch_id: Option<Uuid>,
293        batch_index: Option<i32>,
294        batch_size: Option<i32>,
295        execute_at: DateTime<Utc>,
296        timeout_seconds: i32,
297        max_attempts: i32,
298        priority: i32,
299    ) -> Result<()> {
300        sqlx::query(
301            "INSERT INTO seesaw_effect_executions (
302                event_id, effect_id, correlation_id, status,
303                event_type, event_payload, parent_event_id,
304                batch_id, batch_index, batch_size,
305                execute_at, timeout_seconds, max_attempts, priority
306             )
307             VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
308        )
309        .bind(event_id)
310        .bind(effect_id)
311        .bind(correlation_id)
312        .bind(event_type)
313        .bind(event_payload)
314        .bind(parent_event_id)
315        .bind(batch_id)
316        .bind(batch_index)
317        .bind(batch_size)
318        .bind(execute_at)
319        .bind(timeout_seconds)
320        .bind(max_attempts)
321        .bind(priority)
322        .execute(&self.pool)
323        .await?;
324
325        Ok(())
326    }
327
328    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
329        let row: Option<EffectRow> = sqlx::query_as(
330            "WITH next_effect AS (
331                SELECT event_id, effect_id
332                FROM seesaw_effect_executions
333                WHERE (
334                    status = 'pending'
335                    OR (status = 'failed' AND attempts < max_attempts)
336                )
337                  AND execute_at <= NOW()
338                ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
339                LIMIT 1
340                FOR UPDATE SKIP LOCKED
341            )
342            UPDATE seesaw_effect_executions e
343            SET status = 'executing',
344                claimed_at = NOW(),
345                last_attempted_at = NOW(),
346                attempts = e.attempts + 1
347            FROM next_effect
348            WHERE e.event_id = next_effect.event_id
349              AND e.effect_id = next_effect.effect_id
350            RETURNING
351                e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
352                e.batch_id, e.batch_index, e.batch_size,
353                e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
354        )
355        .fetch_optional(&self.pool)
356        .await?;
357
358        if let Some(r) = row {
359            Ok(Some(QueuedEffectExecution {
360                event_id: r.event_id,
361                effect_id: r.effect_id,
362                correlation_id: r.correlation_id,
363                event_type: r.event_type,
364                event_payload: r.event_payload,
365                parent_event_id: r.parent_event_id,
366                batch_id: r.batch_id,
367                batch_index: r.batch_index,
368                batch_size: r.batch_size,
369                execute_at: r.execute_at,
370                timeout_seconds: r.timeout_seconds,
371                max_attempts: r.max_attempts,
372                priority: r.priority,
373                attempts: r.attempts,
374            }))
375        } else {
376            Ok(None)
377        }
378    }
379
380    async fn complete_effect(
381        &self,
382        event_id: Uuid,
383        effect_id: String,
384        result: serde_json::Value,
385    ) -> Result<()> {
386        sqlx::query(
387            "UPDATE seesaw_effect_executions
388             SET status = 'completed',
389                 result = $3,
390                 completed_at = NOW()
391             WHERE event_id = $1 AND effect_id = $2",
392        )
393        .bind(event_id)
394        .bind(effect_id)
395        .bind(result)
396        .execute(&self.pool)
397        .await?;
398
399        Ok(())
400    }
401
402    async fn complete_effect_with_events(
403        &self,
404        event_id: Uuid,
405        effect_id: String,
406        result: serde_json::Value,
407        emitted_events: Vec<EmittedEvent>,
408    ) -> Result<()> {
409        // Get correlation_id and hops for emitted events
410        let effect: EffectRow = sqlx::query_as(
411            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
412                    batch_id, batch_index, batch_size,
413                    execute_at, timeout_seconds, max_attempts, priority, attempts
414             FROM seesaw_effect_executions
415             WHERE event_id = $1 AND effect_id = $2",
416        )
417        .bind(event_id)
418        .bind(&effect_id)
419        .fetch_one(&self.pool)
420        .await?;
421
422        // Read parent metadata for deterministic hop increment and timestamp.
423        let parent: ParentEventRow = sqlx::query_as(
424            "SELECT hops, created_at
425             FROM seesaw_events
426             WHERE event_id = $1
427             ORDER BY created_at ASC, id ASC
428             LIMIT 1",
429        )
430        .bind(event_id)
431        .fetch_one(&self.pool)
432        .await?;
433
434        // Start transaction for atomicity
435        let mut tx = self.pool.begin().await?;
436
437        // Insert emitted events with deterministic IDs
438        for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
439            // Generate deterministic event_id from
440            // hash(parent_event_id, effect_id, event_type, emitted_index)
441            let deterministic_id = Uuid::new_v5(
442                &NAMESPACE_SEESAW,
443                format!(
444                    "{}-{}-{}-{}",
445                    event_id, effect_id, emitted.event_type, emitted_index
446                )
447                .as_bytes(),
448            );
449
450            // Deterministic timestamp keeps retries idempotent while staying in
451            // the same partition day as the parent event.
452            let deterministic_timestamp = emitted_event_created_at(parent.created_at);
453
454            // Insert event (idempotent via ON CONFLICT on (event_id, created_at))
455            sqlx::query(
456                "INSERT INTO seesaw_events (
457                    event_id, parent_id, correlation_id, event_type, payload, hops,
458                    batch_id, batch_index, batch_size, created_at
459                 )
460                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
461                 ON CONFLICT (event_id, created_at) DO NOTHING",
462            )
463            .bind(deterministic_id)
464            .bind(Some(event_id))
465            .bind(effect.correlation_id)
466            .bind(&emitted.event_type)
467            .bind(emitted.payload)
468            .bind(parent.hops + 1)
469            .bind(emitted.batch_id)
470            .bind(emitted.batch_index)
471            .bind(emitted.batch_size)
472            .bind(deterministic_timestamp)
473            .execute(&mut *tx)
474            .await?;
475        }
476
477        // Mark effect as completed (same transaction)
478        sqlx::query(
479            "UPDATE seesaw_effect_executions
480             SET status = 'completed',
481                 result = $3,
482                 completed_at = NOW()
483             WHERE event_id = $1 AND effect_id = $2",
484        )
485        .bind(event_id)
486        .bind(effect_id)
487        .bind(result)
488        .execute(&mut *tx)
489        .await?;
490
491        // Commit transaction - both succeed or both fail
492        tx.commit().await?;
493
494        Ok(())
495    }
496
497    async fn fail_effect(
498        &self,
499        event_id: Uuid,
500        effect_id: String,
501        error: String,
502        attempts: i32,
503    ) -> Result<()> {
504        let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
505        sqlx::query(
506            "UPDATE seesaw_effect_executions
507             SET status = 'failed',
508                 error = $3,
509                 execute_at = $5,
510                 claimed_at = NULL
511             WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
512        )
513        .bind(event_id)
514        .bind(effect_id)
515        .bind(error)
516        .bind(attempts)
517        .bind(retry_at)
518        .execute(&self.pool)
519        .await?;
520
521        Ok(())
522    }
523
524    async fn dlq_effect(
525        &self,
526        event_id: Uuid,
527        effect_id: String,
528        error: String,
529        reason: String,
530        attempts: i32,
531    ) -> Result<()> {
532        self.dlq_effect_with_events(event_id, effect_id, error, reason, attempts, Vec::new())
533            .await
534    }
535
536    async fn dlq_effect_with_events(
537        &self,
538        event_id: Uuid,
539        effect_id: String,
540        error: String,
541        reason: String,
542        attempts: i32,
543        emitted_events: Vec<EmittedEvent>,
544    ) -> Result<()> {
545        // Get effect details for DLQ
546        let effect: EffectRow = sqlx::query_as(
547            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
548                    batch_id, batch_index, batch_size,
549                    execute_at, timeout_seconds, max_attempts, priority, attempts
550             FROM seesaw_effect_executions
551             WHERE event_id = $1 AND effect_id = $2",
552        )
553        .bind(event_id)
554        .bind(&effect_id)
555        .fetch_one(&self.pool)
556        .await?;
557
558        let parent = sqlx::query_as::<_, ParentEventRow>(
559            "SELECT hops, created_at
560             FROM seesaw_events
561             WHERE event_id = $1
562             ORDER BY created_at ASC, id ASC
563             LIMIT 1",
564        )
565        .bind(event_id)
566        .fetch_optional(&self.pool)
567        .await?;
568
569        let mut tx = self.pool.begin().await?;
570
571        // Insert into DLQ
572        sqlx::query(
573            "INSERT INTO seesaw_dlq (
574                event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
575             )
576             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
577        )
578        .bind(event_id)
579        .bind(&effect_id)
580        .bind(effect.correlation_id)
581        .bind(&error)
582        .bind(&effect.event_type)
583        .bind(&effect.event_payload)
584        .bind(&reason)
585        .bind(attempts)
586        .execute(&mut *tx)
587        .await?;
588
589        let synthetic_created_at = parent
590            .as_ref()
591            .map(|row| emitted_event_created_at(row.created_at))
592            .unwrap_or_else(Utc::now);
593        let synthetic_hops = parent.as_ref().map(|row| row.hops + 1).unwrap_or(0);
594
595        if emitted_events.is_empty() {
596            if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
597                (effect.batch_id, effect.batch_index, effect.batch_size)
598            {
599                let synthetic_event_id = Uuid::new_v5(
600                    &NAMESPACE_SEESAW,
601                    format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
602                );
603
604                sqlx::query(
605                    "INSERT INTO seesaw_events (
606                        event_id, parent_id, correlation_id, event_type, payload, hops,
607                        batch_id, batch_index, batch_size, created_at
608                     )
609                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
610                     ON CONFLICT (event_id, created_at) DO NOTHING",
611                )
612                .bind(synthetic_event_id)
613                .bind(Some(event_id))
614                .bind(effect.correlation_id)
615                .bind(&effect.event_type)
616                .bind(&effect.event_payload)
617                .bind(synthetic_hops)
618                .bind(Some(batch_id))
619                .bind(Some(batch_index))
620                .bind(Some(batch_size))
621                .bind(synthetic_created_at)
622                .execute(&mut *tx)
623                .await?;
624            }
625        } else {
626            for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
627                let synthetic_event_id = Uuid::new_v5(
628                    &NAMESPACE_SEESAW,
629                    format!(
630                        "{}-{}-dlq-terminal-{}-{}",
631                        event_id, effect_id, emitted.event_type, emitted_index
632                    )
633                    .as_bytes(),
634                );
635
636                sqlx::query(
637                    "INSERT INTO seesaw_events (
638                        event_id, parent_id, correlation_id, event_type, payload, hops,
639                        batch_id, batch_index, batch_size, created_at
640                     )
641                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
642                     ON CONFLICT (event_id, created_at) DO NOTHING",
643                )
644                .bind(synthetic_event_id)
645                .bind(Some(event_id))
646                .bind(effect.correlation_id)
647                .bind(&emitted.event_type)
648                .bind(emitted.payload)
649                .bind(synthetic_hops)
650                .bind(emitted.batch_id.or(effect.batch_id))
651                .bind(emitted.batch_index.or(effect.batch_index))
652                .bind(emitted.batch_size.or(effect.batch_size))
653                .bind(synthetic_created_at)
654                .execute(&mut *tx)
655                .await?;
656            }
657        }
658
659        // Delete from executions table
660        sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
661            .bind(event_id)
662            .bind(&effect_id)
663            .execute(&mut *tx)
664            .await?;
665
666        tx.commit().await?;
667
668        Ok(())
669    }
670
671    async fn subscribe_workflow_events(
672        &self,
673        correlation_id: Uuid,
674    ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
675        use sqlx::postgres::PgListener;
676
677        let channel = format!("seesaw_workflow_{}", correlation_id);
678        const PAGE_SIZE: i64 = 256;
679        const CATCH_UP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
680
681        // Establish a cursor at subscribe time so callers only receive new
682        // workflow events emitted after this subscription starts.
683        let initial_cursor: Option<(DateTime<Utc>, i64)> = sqlx::query_as(
684            "SELECT created_at, id
685             FROM seesaw_events
686             WHERE correlation_id = $1
687             ORDER BY created_at DESC, id DESC
688             LIMIT 1",
689        )
690        .bind(correlation_id)
691        .fetch_optional(&self.pool)
692        .await?;
693
694        // Create a new listener connection
695        let mut listener = PgListener::connect_with(&self.pool).await?;
696        listener.listen(&channel).await?;
697
698        let pool = self.pool.clone();
699        let (tx, rx) = futures::channel::mpsc::unbounded::<seesaw_core::WorkflowEvent>();
700
701        tokio::spawn(async move {
702            let mut cursor = initial_cursor;
703            let mut drain_pending = true;
704
705            loop {
706                if !drain_pending {
707                    match tokio::time::timeout(CATCH_UP_INTERVAL, listener.recv()).await {
708                        Ok(Ok(_notification)) => {}
709                        Ok(Err(error)) => {
710                            tracing::warn!(
711                                "workflow listener recv failed for {}: {}",
712                                correlation_id,
713                                error
714                            );
715                            return;
716                        }
717                        Err(_) => {}
718                    }
719                }
720                drain_pending = false;
721
722                loop {
723                    let rows_result: std::result::Result<Vec<WorkflowEventRow>, sqlx::Error> =
724                        if let Some((created_at, id)) = cursor {
725                            sqlx::query_as(
726                                "SELECT id, event_id, correlation_id, event_type, payload, created_at
727                                 FROM seesaw_events
728                                 WHERE correlation_id = $1
729                                   AND (
730                                        created_at > $2
731                                        OR (created_at = $2 AND id > $3)
732                                   )
733                                 ORDER BY created_at ASC, id ASC
734                                 LIMIT $4",
735                            )
736                            .bind(correlation_id)
737                            .bind(created_at)
738                            .bind(id)
739                            .bind(PAGE_SIZE)
740                            .fetch_all(&pool)
741                            .await
742                        } else {
743                            sqlx::query_as(
744                                "SELECT id, event_id, correlation_id, event_type, payload, created_at
745                                 FROM seesaw_events
746                                 WHERE correlation_id = $1
747                                 ORDER BY created_at ASC, id ASC
748                                 LIMIT $2",
749                            )
750                            .bind(correlation_id)
751                            .bind(PAGE_SIZE)
752                            .fetch_all(&pool)
753                            .await
754                        };
755
756                    let rows = match rows_result {
757                        Ok(rows) => rows,
758                        Err(error) => {
759                            tracing::warn!(
760                                "workflow event query failed for {}: {}",
761                                correlation_id,
762                                error
763                            );
764                            return;
765                        }
766                    };
767
768                    if rows.is_empty() {
769                        break;
770                    }
771
772                    for row in rows {
773                        cursor = Some((row.created_at, row.id));
774                        if tx
775                            .unbounded_send(seesaw_core::WorkflowEvent {
776                                event_id: row.event_id,
777                                correlation_id: row.correlation_id,
778                                event_type: row.event_type,
779                                payload: row.payload,
780                            })
781                            .is_err()
782                        {
783                            return;
784                        }
785                    }
786                }
787            }
788        });
789
790        Ok(Box::new(rx))
791    }
792
793    async fn get_workflow_status(
794        &self,
795        correlation_id: Uuid,
796    ) -> Result<seesaw_core::WorkflowStatus> {
797        let state = sqlx::query_as::<_, (serde_json::Value,)>(
798            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
799        )
800        .bind(correlation_id)
801        .fetch_optional(&self.pool)
802        .await?
803        .map(|r| r.0);
804
805        let pending_effects = sqlx::query_as::<_, (i64,)>(
806            "SELECT COUNT(*) FROM seesaw_effect_executions
807             WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
808        )
809        .bind(correlation_id)
810        .fetch_one(&self.pool)
811        .await?
812        .0;
813
814        let last_event = sqlx::query_as::<_, (String,)>(
815            "SELECT event_type FROM seesaw_events
816             WHERE correlation_id = $1
817             ORDER BY created_at DESC, id DESC
818             LIMIT 1",
819        )
820        .bind(correlation_id)
821        .fetch_optional(&self.pool)
822        .await?
823        .map(|r| r.0);
824
825        Ok(seesaw_core::WorkflowStatus {
826            correlation_id,
827            state,
828            pending_effects,
829            is_settled: pending_effects == 0,
830            last_event,
831        })
832    }
833
834    async fn join_same_batch_append_and_maybe_claim(
835        &self,
836        join_effect_id: String,
837        correlation_id: Uuid,
838        source_event_id: Uuid,
839        source_event_type: String,
840        source_payload: serde_json::Value,
841        source_created_at: DateTime<Utc>,
842        batch_id: Uuid,
843        batch_index: i32,
844        batch_size: i32,
845    ) -> Result<Option<Vec<JoinEntry>>> {
846        let mut tx = self.pool.begin().await?;
847
848        sqlx::query(
849            "INSERT INTO seesaw_join_entries (
850                join_effect_id, correlation_id, source_event_id, source_event_type, source_payload,
851                source_created_at, batch_id, batch_index, batch_size
852             )
853             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
854             ON CONFLICT (join_effect_id, correlation_id, source_event_id) DO NOTHING",
855        )
856        .bind(&join_effect_id)
857        .bind(correlation_id)
858        .bind(source_event_id)
859        .bind(&source_event_type)
860        .bind(source_payload)
861        .bind(source_created_at)
862        .bind(batch_id)
863        .bind(batch_index)
864        .bind(batch_size)
865        .execute(&mut *tx)
866        .await?;
867
868        sqlx::query(
869            "INSERT INTO seesaw_join_windows (
870                join_effect_id, correlation_id, mode, batch_id, target_count, status
871             )
872             VALUES ($1, $2, 'same_batch', $3, $4, 'open')
873             ON CONFLICT (join_effect_id, correlation_id, batch_id) DO NOTHING",
874        )
875        .bind(&join_effect_id)
876        .bind(correlation_id)
877        .bind(batch_id)
878        .bind(batch_size)
879        .execute(&mut *tx)
880        .await?;
881
882        sqlx::query(
883            "UPDATE seesaw_join_windows
884             SET target_count = $4,
885                 updated_at = NOW()
886             WHERE join_effect_id = $1
887               AND correlation_id = $2
888               AND batch_id = $3
889               AND target_count <> $4",
890        )
891        .bind(&join_effect_id)
892        .bind(correlation_id)
893        .bind(batch_id)
894        .bind(batch_size)
895        .execute(&mut *tx)
896        .await?;
897
898        let claimed: Option<(String,)> = sqlx::query_as(
899            "UPDATE seesaw_join_windows w
900             SET status = 'processing',
901                 sealed_at = COALESCE(w.sealed_at, NOW()),
902                 processing_started_at = NOW(),
903                 updated_at = NOW(),
904                 last_error = NULL
905             WHERE w.join_effect_id = $1
906               AND w.correlation_id = $2
907               AND w.batch_id = $3
908               AND w.status = 'open'
909               AND (
910                    SELECT COUNT(*)::int
911                    FROM seesaw_join_entries e
912                    WHERE e.join_effect_id = w.join_effect_id
913                      AND e.correlation_id = w.correlation_id
914                      AND e.batch_id = w.batch_id
915               ) >= w.target_count
916             RETURNING w.join_effect_id",
917        )
918        .bind(&join_effect_id)
919        .bind(correlation_id)
920        .bind(batch_id)
921        .fetch_optional(&mut *tx)
922        .await?;
923
924        if claimed.is_none() {
925            tx.commit().await?;
926            return Ok(None);
927        }
928
929        let rows = sqlx::query_as::<_, (Uuid, String, serde_json::Value, Uuid, i32, i32, DateTime<Utc>)>(
930            "SELECT source_event_id, source_event_type, source_payload, batch_id, batch_index, batch_size, source_created_at
931             FROM seesaw_join_entries
932             WHERE join_effect_id = $1
933               AND correlation_id = $2
934               AND batch_id = $3
935             ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
936        )
937        .bind(&join_effect_id)
938        .bind(correlation_id)
939        .bind(batch_id)
940        .fetch_all(&mut *tx)
941        .await?;
942
943        let entries = rows
944            .into_iter()
945            .map(
946                |(
947                    source_event_id,
948                    event_type,
949                    payload,
950                    batch_id,
951                    batch_index,
952                    batch_size,
953                    created_at,
954                )| JoinEntry {
955                    source_event_id,
956                    event_type,
957                    payload,
958                    batch_id,
959                    batch_index,
960                    batch_size,
961                    created_at,
962                },
963            )
964            .collect::<Vec<_>>();
965
966        tx.commit().await?;
967        Ok(Some(entries))
968    }
969
970    async fn join_same_batch_complete(
971        &self,
972        join_effect_id: String,
973        correlation_id: Uuid,
974        batch_id: Uuid,
975    ) -> Result<()> {
976        let mut tx = self.pool.begin().await?;
977
978        sqlx::query(
979            "UPDATE seesaw_join_windows
980             SET status = 'completed',
981                 completed_at = NOW(),
982                 updated_at = NOW()
983             WHERE join_effect_id = $1
984               AND correlation_id = $2
985               AND batch_id = $3",
986        )
987        .bind(&join_effect_id)
988        .bind(correlation_id)
989        .bind(batch_id)
990        .execute(&mut *tx)
991        .await?;
992
993        sqlx::query(
994            "DELETE FROM seesaw_join_entries
995             WHERE join_effect_id = $1
996               AND correlation_id = $2
997               AND batch_id = $3",
998        )
999        .bind(&join_effect_id)
1000        .bind(correlation_id)
1001        .bind(batch_id)
1002        .execute(&mut *tx)
1003        .await?;
1004
1005        sqlx::query(
1006            "DELETE FROM seesaw_join_windows
1007             WHERE join_effect_id = $1
1008               AND correlation_id = $2
1009               AND batch_id = $3",
1010        )
1011        .bind(&join_effect_id)
1012        .bind(correlation_id)
1013        .bind(batch_id)
1014        .execute(&mut *tx)
1015        .await?;
1016
1017        tx.commit().await?;
1018        Ok(())
1019    }
1020
1021    async fn join_same_batch_release(
1022        &self,
1023        join_effect_id: String,
1024        correlation_id: Uuid,
1025        batch_id: Uuid,
1026        error: String,
1027    ) -> Result<()> {
1028        sqlx::query(
1029            "UPDATE seesaw_join_windows
1030             SET status = 'open',
1031                 processing_started_at = NULL,
1032                 last_error = $4,
1033                 updated_at = NOW()
1034             WHERE join_effect_id = $1
1035               AND correlation_id = $2
1036               AND batch_id = $3
1037               AND status = 'processing'",
1038        )
1039        .bind(&join_effect_id)
1040        .bind(correlation_id)
1041        .bind(batch_id)
1042        .bind(error)
1043        .execute(&self.pool)
1044        .await?;
1045
1046        Ok(())
1047    }
1048}
1049
1050#[derive(FromRow)]
1051struct StreamRow {
1052    seq: i64,
1053    stream_type: String,
1054    correlation_id: Uuid,
1055    event_id: Option<Uuid>,
1056    effect_event_id: Option<Uuid>,
1057    effect_id: Option<String>,
1058    status: Option<String>,
1059    error: Option<String>,
1060    payload: Option<serde_json::Value>,
1061    created_at: DateTime<Utc>,
1062}
1063
1064#[derive(FromRow)]
1065struct EffectLogRow {
1066    correlation_id: Uuid,
1067    event_id: Uuid,
1068    effect_id: String,
1069    status: String,
1070    attempts: i32,
1071    event_type: String,
1072    result: Option<serde_json::Value>,
1073    error: Option<String>,
1074    created_at: DateTime<Utc>,
1075    execute_at: DateTime<Utc>,
1076    claimed_at: Option<DateTime<Utc>>,
1077    last_attempted_at: Option<DateTime<Utc>>,
1078    completed_at: Option<DateTime<Utc>>,
1079}
1080
1081#[derive(FromRow)]
1082struct DeadLetterRow {
1083    correlation_id: Uuid,
1084    event_id: Uuid,
1085    effect_id: String,
1086    event_type: String,
1087    event_payload: serde_json::Value,
1088    error: String,
1089    reason: String,
1090    attempts: i32,
1091    failed_at: DateTime<Utc>,
1092    resolved_at: Option<DateTime<Utc>>,
1093}
1094
1095#[derive(FromRow)]
1096struct FailedWorkflowRow {
1097    correlation_id: Uuid,
1098    failed_effects: i64,
1099    active_effects: i64,
1100    dead_letters: i64,
1101    last_failed_at: Option<DateTime<Utc>>,
1102    last_error: Option<String>,
1103}
1104
1105#[async_trait]
1106impl InsightStore for PostgresStore {
1107    async fn subscribe_events(
1108        &self,
1109    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
1110        use futures::stream::StreamExt;
1111        use sqlx::postgres::PgListener;
1112
1113        // Create a new listener connection
1114        let mut listener = PgListener::connect_with(&self.pool).await?;
1115        listener.listen("seesaw_stream").await?;
1116
1117        // Convert listener into a stream of InsightEvent
1118        let pool = self.pool.clone();
1119        let stream = listener.into_stream().filter_map(move |result| {
1120            let pool = pool.clone();
1121            Box::pin(async move {
1122                match result {
1123                    Ok(_notification) => {
1124                        // Fetch latest entry from stream table
1125                        // (notification payload is just correlation_id for wake-up)
1126                        if let Ok(row) = sqlx::query_as::<_, StreamRow>(
1127                            "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1128                                    effect_id, status, error, payload, created_at
1129                             FROM seesaw_stream
1130                             ORDER BY seq DESC
1131                             LIMIT 1",
1132                        )
1133                        .fetch_one(&pool)
1134                        .await
1135                        {
1136                            Some(stream_row_to_insight_event(row))
1137                        } else {
1138                            None
1139                        }
1140                    }
1141                    Err(_) => None,
1142                }
1143            })
1144        });
1145
1146        Ok(Box::new(stream))
1147    }
1148
1149    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
1150        // Get all events for this correlation
1151        let events = sqlx::query_as::<_, EventRow>(
1152            "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
1153                    batch_id, batch_index, batch_size, created_at
1154             FROM seesaw_events
1155             WHERE correlation_id = $1
1156             ORDER BY created_at ASC",
1157        )
1158        .bind(correlation_id)
1159        .fetch_all(&self.pool)
1160        .await?;
1161
1162        // Get all effects for this correlation
1163        let effects = sqlx::query_as::<_, EffectTreeRow>(
1164            "SELECT event_id, effect_id, status, result, error, attempts, created_at,
1165                    batch_id, batch_index, batch_size
1166             FROM seesaw_effect_executions
1167             WHERE correlation_id = $1
1168             ORDER BY created_at ASC",
1169        )
1170        .bind(correlation_id)
1171        .fetch_all(&self.pool)
1172        .await?;
1173
1174        // Build tree structure
1175        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1176        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1177
1178        // Get state
1179        let state = sqlx::query_as::<_, (serde_json::Value,)>(
1180            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
1181        )
1182        .bind(correlation_id)
1183        .fetch_optional(&self.pool)
1184        .await?
1185        .map(|r| r.0);
1186
1187        Ok(WorkflowTree {
1188            correlation_id,
1189            roots,
1190            state,
1191            event_count: events.len(),
1192            effect_count: effects.len(),
1193        })
1194    }
1195
1196    async fn get_stats(&self) -> Result<InsightStats> {
1197        let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
1198            .fetch_one(&self.pool)
1199            .await?
1200            .0;
1201
1202        let active_effects = sqlx::query_as::<_, (i64,)>(
1203            "SELECT COUNT(*) FROM seesaw_effect_executions
1204             WHERE status IN ('pending', 'executing')",
1205        )
1206        .fetch_one(&self.pool)
1207        .await?
1208        .0;
1209
1210        let completed_effects = sqlx::query_as::<_, (i64,)>(
1211            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
1212        )
1213        .fetch_one(&self.pool)
1214        .await?
1215        .0;
1216
1217        let failed_effects = sqlx::query_as::<_, (i64,)>(
1218            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
1219        )
1220        .fetch_one(&self.pool)
1221        .await?
1222        .0;
1223
1224        Ok(InsightStats {
1225            total_events,
1226            active_effects,
1227            completed_effects,
1228            failed_effects,
1229        })
1230    }
1231
1232    async fn get_recent_events(
1233        &self,
1234        cursor: Option<i64>,
1235        limit: usize,
1236    ) -> Result<Vec<InsightEvent>> {
1237        let rows = if let Some(cursor_seq) = cursor {
1238            sqlx::query_as::<_, StreamRow>(
1239                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1240                        effect_id, status, error, payload, created_at
1241                 FROM seesaw_stream
1242                 WHERE seq > $1
1243                 ORDER BY seq ASC
1244                 LIMIT $2",
1245            )
1246            .bind(cursor_seq)
1247            .bind(limit as i64)
1248            .fetch_all(&self.pool)
1249            .await?
1250        } else {
1251            sqlx::query_as::<_, StreamRow>(
1252                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1253                        effect_id, status, error, payload, created_at
1254                 FROM seesaw_stream
1255                 ORDER BY seq DESC
1256                 LIMIT $1",
1257            )
1258            .bind(limit as i64)
1259            .fetch_all(&self.pool)
1260            .await?
1261        };
1262
1263        Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
1264    }
1265
1266    async fn get_effect_logs(
1267        &self,
1268        correlation_id: Option<Uuid>,
1269        limit: usize,
1270    ) -> Result<Vec<EffectExecutionLog>> {
1271        let rows = sqlx::query_as::<_, EffectLogRow>(
1272            "SELECT
1273                correlation_id,
1274                event_id,
1275                effect_id,
1276                status,
1277                attempts,
1278                event_type,
1279                result,
1280                error,
1281                created_at,
1282                execute_at,
1283                claimed_at,
1284                last_attempted_at,
1285                completed_at
1286             FROM seesaw_effect_executions
1287             WHERE ($1::uuid IS NULL OR correlation_id = $1)
1288             ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
1289             LIMIT $2",
1290        )
1291        .bind(correlation_id)
1292        .bind(limit as i64)
1293        .fetch_all(&self.pool)
1294        .await?;
1295
1296        Ok(rows
1297            .into_iter()
1298            .map(|row| {
1299                let started_at = row.claimed_at.or(row.last_attempted_at);
1300                let duration_ms = match (started_at, row.completed_at) {
1301                    (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
1302                    _ => None,
1303                };
1304
1305                EffectExecutionLog {
1306                    correlation_id: row.correlation_id,
1307                    event_id: row.event_id,
1308                    effect_id: row.effect_id,
1309                    status: row.status,
1310                    attempts: row.attempts,
1311                    event_type: Some(row.event_type),
1312                    result: row.result,
1313                    error: row.error,
1314                    created_at: row.created_at,
1315                    execute_at: Some(row.execute_at),
1316                    claimed_at: row.claimed_at,
1317                    last_attempted_at: row.last_attempted_at,
1318                    completed_at: row.completed_at,
1319                    duration_ms,
1320                }
1321            })
1322            .collect())
1323    }
1324
1325    async fn get_dead_letters(
1326        &self,
1327        unresolved_only: bool,
1328        limit: usize,
1329    ) -> Result<Vec<DeadLetterEntry>> {
1330        let rows = sqlx::query_as::<_, DeadLetterRow>(
1331            "SELECT
1332                correlation_id,
1333                event_id,
1334                effect_id,
1335                event_type,
1336                event_payload,
1337                error,
1338                reason,
1339                attempts,
1340                failed_at,
1341                resolved_at
1342             FROM seesaw_dlq
1343             WHERE (NOT $1 OR resolved_at IS NULL)
1344             ORDER BY failed_at DESC
1345             LIMIT $2",
1346        )
1347        .bind(unresolved_only)
1348        .bind(limit as i64)
1349        .fetch_all(&self.pool)
1350        .await?;
1351
1352        Ok(rows
1353            .into_iter()
1354            .map(|row| DeadLetterEntry {
1355                correlation_id: row.correlation_id,
1356                event_id: row.event_id,
1357                effect_id: row.effect_id,
1358                event_type: row.event_type,
1359                event_payload: row.event_payload,
1360                error: row.error,
1361                reason: row.reason,
1362                attempts: row.attempts,
1363                failed_at: row.failed_at,
1364                resolved_at: row.resolved_at,
1365            })
1366            .collect())
1367    }
1368
1369    async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
1370        let rows = sqlx::query_as::<_, FailedWorkflowRow>(
1371            "WITH effect_agg AS (
1372                SELECT
1373                    correlation_id,
1374                    COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
1375                    COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
1376                    MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
1377                    MAX(error) FILTER (WHERE status = 'failed') AS last_error
1378                FROM seesaw_effect_executions
1379                GROUP BY correlation_id
1380             ),
1381             dlq_agg AS (
1382                SELECT
1383                    correlation_id,
1384                    COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
1385                    MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
1386                    MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
1387                FROM seesaw_dlq
1388                GROUP BY correlation_id
1389             )
1390             SELECT
1391                COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
1392                COALESCE(e.failed_effects, 0) AS failed_effects,
1393                COALESCE(e.active_effects, 0) AS active_effects,
1394                COALESCE(d.dead_letters, 0) AS dead_letters,
1395                GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
1396                COALESCE(d.last_dlq_error, e.last_error) AS last_error
1397             FROM effect_agg e
1398             FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
1399             WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
1400             ORDER BY last_failed_at DESC NULLS LAST
1401             LIMIT $1",
1402        )
1403        .bind(limit as i64)
1404        .fetch_all(&self.pool)
1405        .await?;
1406
1407        Ok(rows
1408            .into_iter()
1409            .map(|row| FailedWorkflow {
1410                correlation_id: row.correlation_id,
1411                failed_effects: row.failed_effects,
1412                active_effects: row.active_effects,
1413                dead_letters: row.dead_letters,
1414                last_failed_at: row.last_failed_at,
1415                last_error: row.last_error,
1416            })
1417            .collect())
1418    }
1419}
1420
1421#[derive(FromRow)]
1422struct EffectTreeRow {
1423    event_id: Uuid,
1424    effect_id: String,
1425    status: String,
1426    result: Option<serde_json::Value>,
1427    error: Option<String>,
1428    attempts: i32,
1429    created_at: DateTime<Utc>,
1430    batch_id: Option<Uuid>,
1431    batch_index: Option<i32>,
1432    batch_size: Option<i32>,
1433}
1434
1435fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
1436    let stream_type = match row.stream_type.as_str() {
1437        "event_dispatched" => StreamType::EventDispatched,
1438        "effect_started" => StreamType::EffectStarted,
1439        "effect_completed" => StreamType::EffectCompleted,
1440        "effect_failed" => StreamType::EffectFailed,
1441        _ => StreamType::EventDispatched, // Default fallback
1442    };
1443
1444    // Extract event_type from payload if it's an event
1445    let event_type = if stream_type == StreamType::EventDispatched {
1446        row.payload
1447            .as_ref()
1448            .and_then(|p| p.get("event_type"))
1449            .and_then(|v| v.as_str())
1450            .map(|s| s.to_string())
1451    } else {
1452        None
1453    };
1454
1455    InsightEvent {
1456        seq: row.seq,
1457        stream_type,
1458        correlation_id: row.correlation_id,
1459        event_id: row.event_id,
1460        effect_event_id: row.effect_event_id,
1461        effect_id: row.effect_id,
1462        event_type,
1463        status: row.status,
1464        error: row.error,
1465        payload: row.payload,
1466        created_at: row.created_at,
1467    }
1468}
1469
1470fn build_event_tree(
1471    events: &[EventRow],
1472    effects: &[EffectTreeRow],
1473    parent_id: Option<Uuid>,
1474    event_ids: &HashSet<Uuid>,
1475    is_root_pass: bool,
1476) -> Vec<EventNode> {
1477    events
1478        .iter()
1479        .filter(|event| {
1480            if is_root_pass {
1481                event.parent_id.is_none()
1482                    || event
1483                        .parent_id
1484                        .map(|parent| !event_ids.contains(&parent))
1485                        .unwrap_or(false)
1486            } else {
1487                event.parent_id == parent_id
1488            }
1489        })
1490        .map(|event| {
1491            // Get effects for this event
1492            let event_effects: Vec<EffectNode> = effects
1493                .iter()
1494                .filter(|eff| eff.event_id == event.event_id)
1495                .map(|eff| EffectNode {
1496                    effect_id: eff.effect_id.clone(),
1497                    event_id: eff.event_id,
1498                    status: eff.status.clone(),
1499                    result: eff.result.clone(),
1500                    error: eff.error.clone(),
1501                    attempts: eff.attempts,
1502                    created_at: eff.created_at,
1503                    batch_id: eff.batch_id,
1504                    batch_index: eff.batch_index,
1505                    batch_size: eff.batch_size,
1506                })
1507                .collect();
1508
1509            // Recursively build children
1510            let children =
1511                build_event_tree(events, effects, Some(event.event_id), event_ids, false);
1512
1513            EventNode {
1514                event_id: event.event_id,
1515                event_type: event.event_type.clone(),
1516                payload: event.payload.clone(),
1517                created_at: event.created_at,
1518                batch_id: event.batch_id,
1519                batch_index: event.batch_index,
1520                batch_size: event.batch_size,
1521                children,
1522                effects: event_effects,
1523            }
1524        })
1525        .collect()
1526}
1527
1528#[cfg(test)]
1529mod tests {
1530    use super::*;
1531    use chrono::{TimeZone, Timelike};
1532
1533    #[test]
1534    fn emitted_event_created_at_is_midnight_on_parent_day() {
1535        let parent = Utc
1536            .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
1537            .single()
1538            .expect("valid timestamp");
1539
1540        let emitted = emitted_event_created_at(parent);
1541
1542        assert_eq!(emitted.date_naive(), parent.date_naive());
1543        assert_eq!(emitted.hour(), 0);
1544        assert_eq!(emitted.minute(), 0);
1545        assert_eq!(emitted.second(), 0);
1546    }
1547
1548    #[test]
1549    fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
1550        let first_parent = Utc
1551            .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
1552            .single()
1553            .expect("valid timestamp");
1554        let second_parent = Utc
1555            .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
1556            .single()
1557            .expect("valid timestamp");
1558
1559        let first_emitted = emitted_event_created_at(first_parent);
1560        let second_emitted = emitted_event_created_at(second_parent);
1561
1562        assert_eq!(first_emitted, second_emitted);
1563    }
1564
1565    #[test]
1566    fn effect_retry_delay_seconds_uses_exponential_backoff() {
1567        assert_eq!(effect_retry_delay_seconds(1), 1);
1568        assert_eq!(effect_retry_delay_seconds(2), 2);
1569        assert_eq!(effect_retry_delay_seconds(3), 4);
1570        assert_eq!(effect_retry_delay_seconds(4), 8);
1571    }
1572
1573    #[test]
1574    fn effect_retry_delay_seconds_is_capped() {
1575        assert_eq!(effect_retry_delay_seconds(9), 256);
1576        assert_eq!(effect_retry_delay_seconds(50), 256);
1577    }
1578
1579    #[test]
1580    fn build_event_tree_treats_orphan_parent_as_root() {
1581        let correlation_id = Uuid::new_v4();
1582        let event_id = Uuid::new_v4();
1583        let missing_parent = Uuid::new_v4();
1584        let now = Utc::now();
1585
1586        let events = vec![EventRow {
1587            id: 1,
1588            event_id,
1589            parent_id: Some(missing_parent),
1590            correlation_id,
1591            event_type: "OrphanEvent".to_string(),
1592            payload: serde_json::json!({"ok": true}),
1593            hops: 1,
1594            batch_id: None,
1595            batch_index: None,
1596            batch_size: None,
1597            created_at: now,
1598        }];
1599
1600        let effects: Vec<EffectTreeRow> = Vec::new();
1601        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1602
1603        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1604
1605        assert_eq!(roots.len(), 1);
1606        assert_eq!(roots[0].event_id, event_id);
1607    }
1608}