Skip to main content

seesaw_postgres/
lib.rs

1// Simplified PostgresStore without compile-time checked queries
2// Uses dynamic queries for easier testing
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8    insight::*, EmittedEvent, JoinEntry, QueuedEffectExecution, QueuedEvent, Store,
9    NAMESPACE_SEESAW,
10};
11use serde::{Deserialize, Serialize};
12use sqlx::{FromRow, PgPool};
13use std::collections::HashSet;
14use uuid::Uuid;
15
16const EVENT_CLAIM_SECONDS: i64 = 30;
17
18fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
19    parent_created_at
20        .date_naive()
21        .and_hms_opt(0, 0, 0)
22        .expect("midnight should always be a valid UTC timestamp")
23        .and_utc()
24}
25
26fn effect_retry_delay_seconds(attempts: i32) -> i64 {
27    let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
28    1_i64 << exponent
29}
30
31/// PostgreSQL implementation of Store trait
32pub struct PostgresStore {
33    pool: PgPool,
34}
35
36impl PostgresStore {
37    pub fn new(pool: PgPool) -> Self {
38        Self { pool }
39    }
40
41    pub fn pool(&self) -> &PgPool {
42        &self.pool
43    }
44}
45
46impl Clone for PostgresStore {
47    fn clone(&self) -> Self {
48        Self {
49            pool: self.pool.clone(),
50        }
51    }
52}
53
54#[derive(FromRow)]
55struct EventRow {
56    id: i64,
57    event_id: Uuid,
58    parent_id: Option<Uuid>,
59    correlation_id: Uuid,
60    event_type: String,
61    payload: serde_json::Value,
62    hops: i32,
63    retry_count: i32,
64    batch_id: Option<Uuid>,
65    batch_index: Option<i32>,
66    batch_size: Option<i32>,
67    created_at: DateTime<Utc>,
68}
69
70#[derive(FromRow)]
71struct StateRow {
72    state: serde_json::Value,
73    version: i32,
74}
75
76#[derive(FromRow)]
77struct EffectRow {
78    event_id: Uuid,
79    effect_id: String,
80    correlation_id: Uuid,
81    event_type: String,
82    event_payload: serde_json::Value,
83    parent_event_id: Option<Uuid>,
84    batch_id: Option<Uuid>,
85    batch_index: Option<i32>,
86    batch_size: Option<i32>,
87    execute_at: DateTime<Utc>,
88    timeout_seconds: i32,
89    max_attempts: i32,
90    priority: i32,
91    attempts: i32,
92}
93
94#[derive(FromRow)]
95struct ParentEventRow {
96    hops: i32,
97    created_at: DateTime<Utc>,
98}
99
100#[derive(FromRow)]
101struct WorkflowEventRow {
102    id: i64,
103    event_id: Uuid,
104    correlation_id: Uuid,
105    event_type: String,
106    payload: serde_json::Value,
107    created_at: DateTime<Utc>,
108}
109
110#[async_trait]
111impl Store for PostgresStore {
112    async fn publish(&self, event: QueuedEvent) -> Result<()> {
113        let mut tx = self.pool.begin().await?;
114
115        // Use the non-partitioned ledger as the idempotency guard. This keeps
116        // webhook/process_with_id dedupe stable even when created_at differs.
117        let inserted: Option<Uuid> = sqlx::query_scalar(
118            "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
119             VALUES ($1, $2, $3)
120             ON CONFLICT (event_id) DO NOTHING
121             RETURNING event_id",
122        )
123        .bind(event.event_id)
124        .bind(event.correlation_id)
125        .bind(event.created_at)
126        .fetch_optional(&mut *tx)
127        .await?;
128
129        if inserted.is_none() {
130            tx.commit().await?;
131            return Ok(());
132        }
133
134        sqlx::query(
135            "INSERT INTO seesaw_events (
136                event_id, parent_id, correlation_id, event_type, payload, hops, retry_count,
137                batch_id, batch_index, batch_size, created_at
138             )
139             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
140        )
141        .bind(event.event_id)
142        .bind(event.parent_id)
143        .bind(event.correlation_id)
144        .bind(event.event_type)
145        .bind(event.payload)
146        .bind(event.hops)
147        .bind(event.retry_count)
148        .bind(event.batch_id)
149        .bind(event.batch_index)
150        .bind(event.batch_size)
151        .bind(event.created_at)
152        .execute(&mut *tx)
153        .await?;
154
155        tx.commit().await?;
156
157        Ok(())
158    }
159
160    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
161        let row: Option<EventRow> = sqlx::query_as(
162            "WITH next_event AS (
163                SELECT e.id
164                FROM seesaw_events e
165                WHERE e.processed_at IS NULL
166                  AND (e.locked_until IS NULL OR e.locked_until < NOW())
167                  AND NOT EXISTS (
168                    SELECT 1
169                    FROM seesaw_events older
170                    WHERE older.correlation_id = e.correlation_id
171                      AND older.processed_at IS NULL
172                      AND (
173                        older.created_at < e.created_at
174                        OR (older.created_at = e.created_at AND older.id < e.id)
175                      )
176                  )
177                ORDER BY e.created_at ASC, e.id ASC
178                LIMIT 1
179                FOR UPDATE SKIP LOCKED
180            )
181            UPDATE seesaw_events e
182            SET locked_until = NOW() + ($1 * INTERVAL '1 second')
183            FROM next_event
184            WHERE e.id = next_event.id
185            RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload,
186                      e.hops, e.retry_count, e.batch_id, e.batch_index, e.batch_size, e.created_at",
187        )
188        .bind(EVENT_CLAIM_SECONDS)
189        .fetch_optional(&self.pool)
190        .await?;
191
192        Ok(row.map(|r| QueuedEvent {
193            id: r.id,
194            event_id: r.event_id,
195            parent_id: r.parent_id,
196            correlation_id: r.correlation_id,
197            event_type: r.event_type,
198            payload: r.payload,
199            hops: r.hops,
200            retry_count: r.retry_count,
201            batch_id: r.batch_id,
202            batch_index: r.batch_index,
203            batch_size: r.batch_size,
204            created_at: r.created_at,
205        }))
206    }
207
208    async fn ack(&self, id: i64) -> Result<()> {
209        sqlx::query(
210            "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
211        )
212        .bind(id)
213        .execute(&self.pool)
214        .await?;
215        Ok(())
216    }
217
218    async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
219        let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
220        sqlx::query(
221            "UPDATE seesaw_events
222             SET retry_count = retry_count + 1,
223                 locked_until = $2
224             WHERE id = $1",
225        )
226        .bind(id)
227        .bind(locked_until)
228        .execute(&self.pool)
229        .await?;
230        Ok(())
231    }
232
233    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
234    where
235        S: for<'de> Deserialize<'de> + Send,
236    {
237        let row: Option<StateRow> =
238            sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
239                .bind(correlation_id)
240                .fetch_optional(&self.pool)
241                .await?;
242
243        match row {
244            Some(r) => {
245                let state: S = serde_json::from_value(r.state)?;
246                Ok(Some((state, r.version)))
247            }
248            None => Ok(None),
249        }
250    }
251
252    async fn save_state<S>(
253        &self,
254        correlation_id: Uuid,
255        state: &S,
256        expected_version: i32,
257    ) -> Result<i32>
258    where
259        S: Serialize + Send + Sync,
260    {
261        let state_json = serde_json::to_value(state)?;
262        let new_version = expected_version + 1;
263
264        let result = sqlx::query(
265            "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
266             VALUES ($1, $2, $3, NOW())
267             ON CONFLICT (correlation_id) DO UPDATE
268             SET state = $2,
269                 version = $3,
270                 updated_at = NOW()
271             WHERE seesaw_state.version = $4",
272        )
273        .bind(correlation_id)
274        .bind(&state_json)
275        .bind(new_version)
276        .bind(expected_version)
277        .execute(&self.pool)
278        .await?;
279
280        if result.rows_affected() == 0 {
281            anyhow::bail!("Version conflict: state was modified concurrently");
282        }
283
284        Ok(new_version)
285    }
286
287    async fn insert_effect_intent(
288        &self,
289        event_id: Uuid,
290        effect_id: String,
291        correlation_id: Uuid,
292        event_type: String,
293        event_payload: serde_json::Value,
294        parent_event_id: Option<Uuid>,
295        batch_id: Option<Uuid>,
296        batch_index: Option<i32>,
297        batch_size: Option<i32>,
298        execute_at: DateTime<Utc>,
299        timeout_seconds: i32,
300        max_attempts: i32,
301        priority: i32,
302    ) -> Result<()> {
303        sqlx::query(
304            "INSERT INTO seesaw_effect_executions (
305                event_id, effect_id, correlation_id, status,
306                event_type, event_payload, parent_event_id,
307                batch_id, batch_index, batch_size,
308                execute_at, timeout_seconds, max_attempts, priority
309             )
310             VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
311        )
312        .bind(event_id)
313        .bind(effect_id)
314        .bind(correlation_id)
315        .bind(event_type)
316        .bind(event_payload)
317        .bind(parent_event_id)
318        .bind(batch_id)
319        .bind(batch_index)
320        .bind(batch_size)
321        .bind(execute_at)
322        .bind(timeout_seconds)
323        .bind(max_attempts)
324        .bind(priority)
325        .execute(&self.pool)
326        .await?;
327
328        Ok(())
329    }
330
331    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
332        let row: Option<EffectRow> = sqlx::query_as(
333            "WITH next_effect AS (
334                SELECT event_id, effect_id
335                FROM seesaw_effect_executions
336                WHERE (
337                    status = 'pending'
338                    OR (status = 'failed' AND attempts < max_attempts)
339                )
340                  AND execute_at <= NOW()
341                ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
342                LIMIT 1
343                FOR UPDATE SKIP LOCKED
344            )
345            UPDATE seesaw_effect_executions e
346            SET status = 'executing',
347                claimed_at = NOW(),
348                last_attempted_at = NOW(),
349                attempts = e.attempts + 1
350            FROM next_effect
351            WHERE e.event_id = next_effect.event_id
352              AND e.effect_id = next_effect.effect_id
353            RETURNING
354                e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
355                e.batch_id, e.batch_index, e.batch_size,
356                e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
357        )
358        .fetch_optional(&self.pool)
359        .await?;
360
361        if let Some(r) = row {
362            Ok(Some(QueuedEffectExecution {
363                event_id: r.event_id,
364                effect_id: r.effect_id,
365                correlation_id: r.correlation_id,
366                event_type: r.event_type,
367                event_payload: r.event_payload,
368                parent_event_id: r.parent_event_id,
369                batch_id: r.batch_id,
370                batch_index: r.batch_index,
371                batch_size: r.batch_size,
372                execute_at: r.execute_at,
373                timeout_seconds: r.timeout_seconds,
374                max_attempts: r.max_attempts,
375                priority: r.priority,
376                attempts: r.attempts,
377            }))
378        } else {
379            Ok(None)
380        }
381    }
382
383    async fn complete_effect(
384        &self,
385        event_id: Uuid,
386        effect_id: String,
387        result: serde_json::Value,
388    ) -> Result<()> {
389        sqlx::query(
390            "UPDATE seesaw_effect_executions
391             SET status = 'completed',
392                 result = $3,
393                 completed_at = NOW()
394             WHERE event_id = $1 AND effect_id = $2",
395        )
396        .bind(event_id)
397        .bind(effect_id)
398        .bind(result)
399        .execute(&self.pool)
400        .await?;
401
402        Ok(())
403    }
404
405    async fn complete_effect_with_events(
406        &self,
407        event_id: Uuid,
408        effect_id: String,
409        result: serde_json::Value,
410        emitted_events: Vec<EmittedEvent>,
411    ) -> Result<()> {
412        // Get correlation_id and hops for emitted events
413        let effect: EffectRow = sqlx::query_as(
414            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
415                    batch_id, batch_index, batch_size,
416                    execute_at, timeout_seconds, max_attempts, priority, attempts
417             FROM seesaw_effect_executions
418             WHERE event_id = $1 AND effect_id = $2",
419        )
420        .bind(event_id)
421        .bind(&effect_id)
422        .fetch_one(&self.pool)
423        .await?;
424
425        // Read parent metadata for deterministic hop increment and timestamp.
426        let parent: ParentEventRow = sqlx::query_as(
427            "SELECT hops, created_at
428             FROM seesaw_events
429             WHERE event_id = $1
430             ORDER BY created_at ASC, id ASC
431             LIMIT 1",
432        )
433        .bind(event_id)
434        .fetch_one(&self.pool)
435        .await?;
436
437        // Start transaction for atomicity
438        let mut tx = self.pool.begin().await?;
439
440        // Insert emitted events with deterministic IDs
441        for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
442            // Generate deterministic event_id from
443            // hash(parent_event_id, effect_id, event_type, emitted_index)
444            let deterministic_id = Uuid::new_v5(
445                &NAMESPACE_SEESAW,
446                format!(
447                    "{}-{}-{}-{}",
448                    event_id, effect_id, emitted.event_type, emitted_index
449                )
450                .as_bytes(),
451            );
452
453            // Deterministic timestamp keeps retries idempotent while staying in
454            // the same partition day as the parent event.
455            let deterministic_timestamp = emitted_event_created_at(parent.created_at);
456
457            // Insert event (idempotent via ON CONFLICT on (event_id, created_at))
458            sqlx::query(
459                "INSERT INTO seesaw_events (
460                    event_id, parent_id, correlation_id, event_type, payload, hops,
461                    batch_id, batch_index, batch_size, created_at
462                 )
463                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
464                 ON CONFLICT (event_id, created_at) DO NOTHING",
465            )
466            .bind(deterministic_id)
467            .bind(Some(event_id))
468            .bind(effect.correlation_id)
469            .bind(&emitted.event_type)
470            .bind(emitted.payload)
471            .bind(parent.hops + 1)
472            .bind(emitted.batch_id)
473            .bind(emitted.batch_index)
474            .bind(emitted.batch_size)
475            .bind(deterministic_timestamp)
476            .execute(&mut *tx)
477            .await?;
478        }
479
480        // Mark effect as completed (same transaction)
481        sqlx::query(
482            "UPDATE seesaw_effect_executions
483             SET status = 'completed',
484                 result = $3,
485                 completed_at = NOW()
486             WHERE event_id = $1 AND effect_id = $2",
487        )
488        .bind(event_id)
489        .bind(effect_id)
490        .bind(result)
491        .execute(&mut *tx)
492        .await?;
493
494        // Commit transaction - both succeed or both fail
495        tx.commit().await?;
496
497        Ok(())
498    }
499
500    async fn fail_effect(
501        &self,
502        event_id: Uuid,
503        effect_id: String,
504        error: String,
505        attempts: i32,
506    ) -> Result<()> {
507        let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
508        sqlx::query(
509            "UPDATE seesaw_effect_executions
510             SET status = 'failed',
511                 error = $3,
512                 execute_at = $5,
513                 claimed_at = NULL
514             WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
515        )
516        .bind(event_id)
517        .bind(effect_id)
518        .bind(error)
519        .bind(attempts)
520        .bind(retry_at)
521        .execute(&self.pool)
522        .await?;
523
524        Ok(())
525    }
526
527    async fn dlq_effect(
528        &self,
529        event_id: Uuid,
530        effect_id: String,
531        error: String,
532        reason: String,
533        attempts: i32,
534    ) -> Result<()> {
535        self.dlq_effect_with_events(event_id, effect_id, error, reason, attempts, Vec::new())
536            .await
537    }
538
539    async fn dlq_effect_with_events(
540        &self,
541        event_id: Uuid,
542        effect_id: String,
543        error: String,
544        reason: String,
545        attempts: i32,
546        emitted_events: Vec<EmittedEvent>,
547    ) -> Result<()> {
548        // Get effect details for DLQ
549        let effect: EffectRow = sqlx::query_as(
550            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
551                    batch_id, batch_index, batch_size,
552                    execute_at, timeout_seconds, max_attempts, priority, attempts
553             FROM seesaw_effect_executions
554             WHERE event_id = $1 AND effect_id = $2",
555        )
556        .bind(event_id)
557        .bind(&effect_id)
558        .fetch_one(&self.pool)
559        .await?;
560
561        let parent = sqlx::query_as::<_, ParentEventRow>(
562            "SELECT hops, created_at
563             FROM seesaw_events
564             WHERE event_id = $1
565             ORDER BY created_at ASC, id ASC
566             LIMIT 1",
567        )
568        .bind(event_id)
569        .fetch_optional(&self.pool)
570        .await?;
571
572        let mut tx = self.pool.begin().await?;
573
574        // Insert into DLQ
575        sqlx::query(
576            "INSERT INTO seesaw_dlq (
577                event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
578             )
579             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
580        )
581        .bind(event_id)
582        .bind(&effect_id)
583        .bind(effect.correlation_id)
584        .bind(&error)
585        .bind(&effect.event_type)
586        .bind(&effect.event_payload)
587        .bind(&reason)
588        .bind(attempts)
589        .execute(&mut *tx)
590        .await?;
591
592        let synthetic_created_at = parent
593            .as_ref()
594            .map(|row| emitted_event_created_at(row.created_at))
595            .unwrap_or_else(Utc::now);
596        let synthetic_hops = parent.as_ref().map(|row| row.hops + 1).unwrap_or(0);
597
598        if emitted_events.is_empty() {
599            if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
600                (effect.batch_id, effect.batch_index, effect.batch_size)
601            {
602                let synthetic_event_id = Uuid::new_v5(
603                    &NAMESPACE_SEESAW,
604                    format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
605                );
606
607                sqlx::query(
608                    "INSERT INTO seesaw_events (
609                        event_id, parent_id, correlation_id, event_type, payload, hops,
610                        batch_id, batch_index, batch_size, created_at
611                     )
612                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
613                     ON CONFLICT (event_id, created_at) DO NOTHING",
614                )
615                .bind(synthetic_event_id)
616                .bind(Some(event_id))
617                .bind(effect.correlation_id)
618                .bind(&effect.event_type)
619                .bind(&effect.event_payload)
620                .bind(synthetic_hops)
621                .bind(Some(batch_id))
622                .bind(Some(batch_index))
623                .bind(Some(batch_size))
624                .bind(synthetic_created_at)
625                .execute(&mut *tx)
626                .await?;
627            }
628        } else {
629            for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
630                let synthetic_event_id = Uuid::new_v5(
631                    &NAMESPACE_SEESAW,
632                    format!(
633                        "{}-{}-dlq-terminal-{}-{}",
634                        event_id, effect_id, emitted.event_type, emitted_index
635                    )
636                    .as_bytes(),
637                );
638
639                sqlx::query(
640                    "INSERT INTO seesaw_events (
641                        event_id, parent_id, correlation_id, event_type, payload, hops,
642                        batch_id, batch_index, batch_size, created_at
643                     )
644                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
645                     ON CONFLICT (event_id, created_at) DO NOTHING",
646                )
647                .bind(synthetic_event_id)
648                .bind(Some(event_id))
649                .bind(effect.correlation_id)
650                .bind(&emitted.event_type)
651                .bind(emitted.payload)
652                .bind(synthetic_hops)
653                .bind(emitted.batch_id.or(effect.batch_id))
654                .bind(emitted.batch_index.or(effect.batch_index))
655                .bind(emitted.batch_size.or(effect.batch_size))
656                .bind(synthetic_created_at)
657                .execute(&mut *tx)
658                .await?;
659            }
660        }
661
662        // Delete from executions table
663        sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
664            .bind(event_id)
665            .bind(&effect_id)
666            .execute(&mut *tx)
667            .await?;
668
669        tx.commit().await?;
670
671        Ok(())
672    }
673
674    async fn subscribe_workflow_events(
675        &self,
676        correlation_id: Uuid,
677    ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
678        use sqlx::postgres::PgListener;
679
680        let channel = format!("seesaw_workflow_{}", correlation_id);
681        const PAGE_SIZE: i64 = 256;
682        const CATCH_UP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
683
684        // Establish a cursor at subscribe time so callers only receive new
685        // workflow events emitted after this subscription starts.
686        let initial_cursor: Option<(DateTime<Utc>, i64)> = sqlx::query_as(
687            "SELECT created_at, id
688             FROM seesaw_events
689             WHERE correlation_id = $1
690             ORDER BY created_at DESC, id DESC
691             LIMIT 1",
692        )
693        .bind(correlation_id)
694        .fetch_optional(&self.pool)
695        .await?;
696
697        // Create a new listener connection
698        let mut listener = PgListener::connect_with(&self.pool).await?;
699        listener.listen(&channel).await?;
700
701        let pool = self.pool.clone();
702        let (tx, rx) = futures::channel::mpsc::unbounded::<seesaw_core::WorkflowEvent>();
703
704        tokio::spawn(async move {
705            let mut cursor = initial_cursor;
706            let mut drain_pending = true;
707
708            loop {
709                if !drain_pending {
710                    match tokio::time::timeout(CATCH_UP_INTERVAL, listener.recv()).await {
711                        Ok(Ok(_notification)) => {}
712                        Ok(Err(error)) => {
713                            tracing::warn!(
714                                "workflow listener recv failed for {}: {}",
715                                correlation_id,
716                                error
717                            );
718                            return;
719                        }
720                        Err(_) => {}
721                    }
722                }
723                drain_pending = false;
724
725                loop {
726                    let rows_result: std::result::Result<Vec<WorkflowEventRow>, sqlx::Error> =
727                        if let Some((created_at, id)) = cursor {
728                            sqlx::query_as(
729                                "SELECT id, event_id, correlation_id, event_type, payload, created_at
730                                 FROM seesaw_events
731                                 WHERE correlation_id = $1
732                                   AND (
733                                        created_at > $2
734                                        OR (created_at = $2 AND id > $3)
735                                   )
736                                 ORDER BY created_at ASC, id ASC
737                                 LIMIT $4",
738                            )
739                            .bind(correlation_id)
740                            .bind(created_at)
741                            .bind(id)
742                            .bind(PAGE_SIZE)
743                            .fetch_all(&pool)
744                            .await
745                        } else {
746                            sqlx::query_as(
747                                "SELECT id, event_id, correlation_id, event_type, payload, created_at
748                                 FROM seesaw_events
749                                 WHERE correlation_id = $1
750                                 ORDER BY created_at ASC, id ASC
751                                 LIMIT $2",
752                            )
753                            .bind(correlation_id)
754                            .bind(PAGE_SIZE)
755                            .fetch_all(&pool)
756                            .await
757                        };
758
759                    let rows = match rows_result {
760                        Ok(rows) => rows,
761                        Err(error) => {
762                            tracing::warn!(
763                                "workflow event query failed for {}: {}",
764                                correlation_id,
765                                error
766                            );
767                            return;
768                        }
769                    };
770
771                    if rows.is_empty() {
772                        break;
773                    }
774
775                    for row in rows {
776                        cursor = Some((row.created_at, row.id));
777                        if tx
778                            .unbounded_send(seesaw_core::WorkflowEvent {
779                                event_id: row.event_id,
780                                correlation_id: row.correlation_id,
781                                event_type: row.event_type,
782                                payload: row.payload,
783                            })
784                            .is_err()
785                        {
786                            return;
787                        }
788                    }
789                }
790            }
791        });
792
793        Ok(Box::new(rx))
794    }
795
796    async fn get_workflow_status(
797        &self,
798        correlation_id: Uuid,
799    ) -> Result<seesaw_core::WorkflowStatus> {
800        let state = sqlx::query_as::<_, (serde_json::Value,)>(
801            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
802        )
803        .bind(correlation_id)
804        .fetch_optional(&self.pool)
805        .await?
806        .map(|r| r.0);
807
808        let pending_effects = sqlx::query_as::<_, (i64,)>(
809            "SELECT COUNT(*) FROM seesaw_effect_executions
810             WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
811        )
812        .bind(correlation_id)
813        .fetch_one(&self.pool)
814        .await?
815        .0;
816
817        let last_event = sqlx::query_as::<_, (String,)>(
818            "SELECT event_type FROM seesaw_events
819             WHERE correlation_id = $1
820             ORDER BY created_at DESC, id DESC
821             LIMIT 1",
822        )
823        .bind(correlation_id)
824        .fetch_optional(&self.pool)
825        .await?
826        .map(|r| r.0);
827
828        Ok(seesaw_core::WorkflowStatus {
829            correlation_id,
830            state,
831            pending_effects,
832            is_settled: pending_effects == 0,
833            last_event,
834        })
835    }
836
837    async fn join_same_batch_append_and_maybe_claim(
838        &self,
839        join_effect_id: String,
840        correlation_id: Uuid,
841        source_event_id: Uuid,
842        source_event_type: String,
843        source_payload: serde_json::Value,
844        source_created_at: DateTime<Utc>,
845        batch_id: Uuid,
846        batch_index: i32,
847        batch_size: i32,
848    ) -> Result<Option<Vec<JoinEntry>>> {
849        let mut tx = self.pool.begin().await?;
850
851        sqlx::query(
852            "INSERT INTO seesaw_join_entries (
853                join_effect_id, correlation_id, source_event_id, source_event_type, source_payload,
854                source_created_at, batch_id, batch_index, batch_size
855             )
856             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
857             ON CONFLICT (join_effect_id, correlation_id, source_event_id) DO NOTHING",
858        )
859        .bind(&join_effect_id)
860        .bind(correlation_id)
861        .bind(source_event_id)
862        .bind(&source_event_type)
863        .bind(source_payload)
864        .bind(source_created_at)
865        .bind(batch_id)
866        .bind(batch_index)
867        .bind(batch_size)
868        .execute(&mut *tx)
869        .await?;
870
871        sqlx::query(
872            "INSERT INTO seesaw_join_windows (
873                join_effect_id, correlation_id, mode, batch_id, target_count, status
874             )
875             VALUES ($1, $2, 'same_batch', $3, $4, 'open')
876             ON CONFLICT (join_effect_id, correlation_id, batch_id) DO NOTHING",
877        )
878        .bind(&join_effect_id)
879        .bind(correlation_id)
880        .bind(batch_id)
881        .bind(batch_size)
882        .execute(&mut *tx)
883        .await?;
884
885        sqlx::query(
886            "UPDATE seesaw_join_windows
887             SET target_count = $4,
888                 updated_at = NOW()
889             WHERE join_effect_id = $1
890               AND correlation_id = $2
891               AND batch_id = $3
892               AND target_count <> $4",
893        )
894        .bind(&join_effect_id)
895        .bind(correlation_id)
896        .bind(batch_id)
897        .bind(batch_size)
898        .execute(&mut *tx)
899        .await?;
900
901        let claimed: Option<(String,)> = sqlx::query_as(
902            "UPDATE seesaw_join_windows w
903             SET status = 'processing',
904                 sealed_at = COALESCE(w.sealed_at, NOW()),
905                 processing_started_at = NOW(),
906                 updated_at = NOW(),
907                 last_error = NULL
908             WHERE w.join_effect_id = $1
909               AND w.correlation_id = $2
910               AND w.batch_id = $3
911               AND w.status = 'open'
912               AND (
913                    SELECT COUNT(*)::int
914                    FROM seesaw_join_entries e
915                    WHERE e.join_effect_id = w.join_effect_id
916                      AND e.correlation_id = w.correlation_id
917                      AND e.batch_id = w.batch_id
918               ) >= w.target_count
919             RETURNING w.join_effect_id",
920        )
921        .bind(&join_effect_id)
922        .bind(correlation_id)
923        .bind(batch_id)
924        .fetch_optional(&mut *tx)
925        .await?;
926
927        if claimed.is_none() {
928            tx.commit().await?;
929            return Ok(None);
930        }
931
932        let rows = sqlx::query_as::<_, (Uuid, String, serde_json::Value, Uuid, i32, i32, DateTime<Utc>)>(
933            "SELECT source_event_id, source_event_type, source_payload, batch_id, batch_index, batch_size, source_created_at
934             FROM seesaw_join_entries
935             WHERE join_effect_id = $1
936               AND correlation_id = $2
937               AND batch_id = $3
938             ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
939        )
940        .bind(&join_effect_id)
941        .bind(correlation_id)
942        .bind(batch_id)
943        .fetch_all(&mut *tx)
944        .await?;
945
946        let entries = rows
947            .into_iter()
948            .map(
949                |(
950                    source_event_id,
951                    event_type,
952                    payload,
953                    batch_id,
954                    batch_index,
955                    batch_size,
956                    created_at,
957                )| JoinEntry {
958                    source_event_id,
959                    event_type,
960                    payload,
961                    batch_id,
962                    batch_index,
963                    batch_size,
964                    created_at,
965                },
966            )
967            .collect::<Vec<_>>();
968
969        tx.commit().await?;
970        Ok(Some(entries))
971    }
972
973    async fn join_same_batch_complete(
974        &self,
975        join_effect_id: String,
976        correlation_id: Uuid,
977        batch_id: Uuid,
978    ) -> Result<()> {
979        let mut tx = self.pool.begin().await?;
980
981        sqlx::query(
982            "UPDATE seesaw_join_windows
983             SET status = 'completed',
984                 completed_at = NOW(),
985                 updated_at = NOW()
986             WHERE join_effect_id = $1
987               AND correlation_id = $2
988               AND batch_id = $3",
989        )
990        .bind(&join_effect_id)
991        .bind(correlation_id)
992        .bind(batch_id)
993        .execute(&mut *tx)
994        .await?;
995
996        sqlx::query(
997            "DELETE FROM seesaw_join_entries
998             WHERE join_effect_id = $1
999               AND correlation_id = $2
1000               AND batch_id = $3",
1001        )
1002        .bind(&join_effect_id)
1003        .bind(correlation_id)
1004        .bind(batch_id)
1005        .execute(&mut *tx)
1006        .await?;
1007
1008        sqlx::query(
1009            "DELETE FROM seesaw_join_windows
1010             WHERE join_effect_id = $1
1011               AND correlation_id = $2
1012               AND batch_id = $3",
1013        )
1014        .bind(&join_effect_id)
1015        .bind(correlation_id)
1016        .bind(batch_id)
1017        .execute(&mut *tx)
1018        .await?;
1019
1020        tx.commit().await?;
1021        Ok(())
1022    }
1023
1024    async fn join_same_batch_release(
1025        &self,
1026        join_effect_id: String,
1027        correlation_id: Uuid,
1028        batch_id: Uuid,
1029        error: String,
1030    ) -> Result<()> {
1031        sqlx::query(
1032            "UPDATE seesaw_join_windows
1033             SET status = 'open',
1034                 processing_started_at = NULL,
1035                 last_error = $4,
1036                 updated_at = NOW()
1037             WHERE join_effect_id = $1
1038               AND correlation_id = $2
1039               AND batch_id = $3
1040               AND status = 'processing'",
1041        )
1042        .bind(&join_effect_id)
1043        .bind(correlation_id)
1044        .bind(batch_id)
1045        .bind(error)
1046        .execute(&self.pool)
1047        .await?;
1048
1049        Ok(())
1050    }
1051}
1052
1053#[derive(FromRow)]
1054struct StreamRow {
1055    seq: i64,
1056    stream_type: String,
1057    correlation_id: Uuid,
1058    event_id: Option<Uuid>,
1059    effect_event_id: Option<Uuid>,
1060    effect_id: Option<String>,
1061    status: Option<String>,
1062    error: Option<String>,
1063    payload: Option<serde_json::Value>,
1064    created_at: DateTime<Utc>,
1065}
1066
1067#[derive(FromRow)]
1068struct EffectLogRow {
1069    correlation_id: Uuid,
1070    event_id: Uuid,
1071    effect_id: String,
1072    status: String,
1073    attempts: i32,
1074    event_type: String,
1075    result: Option<serde_json::Value>,
1076    error: Option<String>,
1077    created_at: DateTime<Utc>,
1078    execute_at: DateTime<Utc>,
1079    claimed_at: Option<DateTime<Utc>>,
1080    last_attempted_at: Option<DateTime<Utc>>,
1081    completed_at: Option<DateTime<Utc>>,
1082}
1083
1084#[derive(FromRow)]
1085struct DeadLetterRow {
1086    correlation_id: Uuid,
1087    event_id: Uuid,
1088    effect_id: String,
1089    event_type: String,
1090    event_payload: serde_json::Value,
1091    error: String,
1092    reason: String,
1093    attempts: i32,
1094    failed_at: DateTime<Utc>,
1095    resolved_at: Option<DateTime<Utc>>,
1096}
1097
1098#[derive(FromRow)]
1099struct FailedWorkflowRow {
1100    correlation_id: Uuid,
1101    failed_effects: i64,
1102    active_effects: i64,
1103    dead_letters: i64,
1104    last_failed_at: Option<DateTime<Utc>>,
1105    last_error: Option<String>,
1106}
1107
1108#[async_trait]
1109impl InsightStore for PostgresStore {
1110    async fn subscribe_events(
1111        &self,
1112    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
1113        use futures::stream::StreamExt;
1114        use sqlx::postgres::PgListener;
1115
1116        // Create a new listener connection
1117        let mut listener = PgListener::connect_with(&self.pool).await?;
1118        listener.listen("seesaw_stream").await?;
1119
1120        // Convert listener into a stream of InsightEvent
1121        let pool = self.pool.clone();
1122        let stream = listener.into_stream().filter_map(move |result| {
1123            let pool = pool.clone();
1124            Box::pin(async move {
1125                match result {
1126                    Ok(_notification) => {
1127                        // Fetch latest entry from stream table
1128                        // (notification payload is just correlation_id for wake-up)
1129                        if let Ok(row) = sqlx::query_as::<_, StreamRow>(
1130                            "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1131                                    effect_id, status, error, payload, created_at
1132                             FROM seesaw_stream
1133                             ORDER BY seq DESC
1134                             LIMIT 1",
1135                        )
1136                        .fetch_one(&pool)
1137                        .await
1138                        {
1139                            Some(stream_row_to_insight_event(row))
1140                        } else {
1141                            None
1142                        }
1143                    }
1144                    Err(_) => None,
1145                }
1146            })
1147        });
1148
1149        Ok(Box::new(stream))
1150    }
1151
1152    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
1153        // Get all events for this correlation
1154        let events = sqlx::query_as::<_, EventRow>(
1155            "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
1156                    batch_id, batch_index, batch_size, created_at
1157             FROM seesaw_events
1158             WHERE correlation_id = $1
1159             ORDER BY created_at ASC",
1160        )
1161        .bind(correlation_id)
1162        .fetch_all(&self.pool)
1163        .await?;
1164
1165        // Get all effects for this correlation
1166        let effects = sqlx::query_as::<_, EffectTreeRow>(
1167            "SELECT event_id, effect_id, status, result, error, attempts, created_at,
1168                    batch_id, batch_index, batch_size
1169             FROM seesaw_effect_executions
1170             WHERE correlation_id = $1
1171             ORDER BY created_at ASC",
1172        )
1173        .bind(correlation_id)
1174        .fetch_all(&self.pool)
1175        .await?;
1176
1177        // Build tree structure
1178        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1179        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1180
1181        // Get state
1182        let state = sqlx::query_as::<_, (serde_json::Value,)>(
1183            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
1184        )
1185        .bind(correlation_id)
1186        .fetch_optional(&self.pool)
1187        .await?
1188        .map(|r| r.0);
1189
1190        Ok(WorkflowTree {
1191            correlation_id,
1192            roots,
1193            state,
1194            event_count: events.len(),
1195            effect_count: effects.len(),
1196        })
1197    }
1198
1199    async fn get_stats(&self) -> Result<InsightStats> {
1200        let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
1201            .fetch_one(&self.pool)
1202            .await?
1203            .0;
1204
1205        let active_effects = sqlx::query_as::<_, (i64,)>(
1206            "SELECT COUNT(*) FROM seesaw_effect_executions
1207             WHERE status IN ('pending', 'executing')",
1208        )
1209        .fetch_one(&self.pool)
1210        .await?
1211        .0;
1212
1213        let completed_effects = sqlx::query_as::<_, (i64,)>(
1214            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
1215        )
1216        .fetch_one(&self.pool)
1217        .await?
1218        .0;
1219
1220        let failed_effects = sqlx::query_as::<_, (i64,)>(
1221            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
1222        )
1223        .fetch_one(&self.pool)
1224        .await?
1225        .0;
1226
1227        Ok(InsightStats {
1228            total_events,
1229            active_effects,
1230            completed_effects,
1231            failed_effects,
1232        })
1233    }
1234
1235    async fn get_recent_events(
1236        &self,
1237        cursor: Option<i64>,
1238        limit: usize,
1239    ) -> Result<Vec<InsightEvent>> {
1240        let rows = if let Some(cursor_seq) = cursor {
1241            sqlx::query_as::<_, StreamRow>(
1242                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1243                        effect_id, status, error, payload, created_at
1244                 FROM seesaw_stream
1245                 WHERE seq > $1
1246                 ORDER BY seq ASC
1247                 LIMIT $2",
1248            )
1249            .bind(cursor_seq)
1250            .bind(limit as i64)
1251            .fetch_all(&self.pool)
1252            .await?
1253        } else {
1254            sqlx::query_as::<_, StreamRow>(
1255                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1256                        effect_id, status, error, payload, created_at
1257                 FROM seesaw_stream
1258                 ORDER BY seq DESC
1259                 LIMIT $1",
1260            )
1261            .bind(limit as i64)
1262            .fetch_all(&self.pool)
1263            .await?
1264        };
1265
1266        Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
1267    }
1268
1269    async fn get_effect_logs(
1270        &self,
1271        correlation_id: Option<Uuid>,
1272        limit: usize,
1273    ) -> Result<Vec<EffectExecutionLog>> {
1274        let rows = sqlx::query_as::<_, EffectLogRow>(
1275            "SELECT
1276                correlation_id,
1277                event_id,
1278                effect_id,
1279                status,
1280                attempts,
1281                event_type,
1282                result,
1283                error,
1284                created_at,
1285                execute_at,
1286                claimed_at,
1287                last_attempted_at,
1288                completed_at
1289             FROM seesaw_effect_executions
1290             WHERE ($1::uuid IS NULL OR correlation_id = $1)
1291             ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
1292             LIMIT $2",
1293        )
1294        .bind(correlation_id)
1295        .bind(limit as i64)
1296        .fetch_all(&self.pool)
1297        .await?;
1298
1299        Ok(rows
1300            .into_iter()
1301            .map(|row| {
1302                let started_at = row.claimed_at.or(row.last_attempted_at);
1303                let duration_ms = match (started_at, row.completed_at) {
1304                    (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
1305                    _ => None,
1306                };
1307
1308                EffectExecutionLog {
1309                    correlation_id: row.correlation_id,
1310                    event_id: row.event_id,
1311                    effect_id: row.effect_id,
1312                    status: row.status,
1313                    attempts: row.attempts,
1314                    event_type: Some(row.event_type),
1315                    result: row.result,
1316                    error: row.error,
1317                    created_at: row.created_at,
1318                    execute_at: Some(row.execute_at),
1319                    claimed_at: row.claimed_at,
1320                    last_attempted_at: row.last_attempted_at,
1321                    completed_at: row.completed_at,
1322                    duration_ms,
1323                }
1324            })
1325            .collect())
1326    }
1327
1328    async fn get_dead_letters(
1329        &self,
1330        unresolved_only: bool,
1331        limit: usize,
1332    ) -> Result<Vec<DeadLetterEntry>> {
1333        let rows = sqlx::query_as::<_, DeadLetterRow>(
1334            "SELECT
1335                correlation_id,
1336                event_id,
1337                effect_id,
1338                event_type,
1339                event_payload,
1340                error,
1341                reason,
1342                attempts,
1343                failed_at,
1344                resolved_at
1345             FROM seesaw_dlq
1346             WHERE (NOT $1 OR resolved_at IS NULL)
1347             ORDER BY failed_at DESC
1348             LIMIT $2",
1349        )
1350        .bind(unresolved_only)
1351        .bind(limit as i64)
1352        .fetch_all(&self.pool)
1353        .await?;
1354
1355        Ok(rows
1356            .into_iter()
1357            .map(|row| DeadLetterEntry {
1358                correlation_id: row.correlation_id,
1359                event_id: row.event_id,
1360                effect_id: row.effect_id,
1361                event_type: row.event_type,
1362                event_payload: row.event_payload,
1363                error: row.error,
1364                reason: row.reason,
1365                attempts: row.attempts,
1366                failed_at: row.failed_at,
1367                resolved_at: row.resolved_at,
1368            })
1369            .collect())
1370    }
1371
1372    async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
1373        let rows = sqlx::query_as::<_, FailedWorkflowRow>(
1374            "WITH effect_agg AS (
1375                SELECT
1376                    correlation_id,
1377                    COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
1378                    COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
1379                    MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
1380                    MAX(error) FILTER (WHERE status = 'failed') AS last_error
1381                FROM seesaw_effect_executions
1382                GROUP BY correlation_id
1383             ),
1384             dlq_agg AS (
1385                SELECT
1386                    correlation_id,
1387                    COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
1388                    MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
1389                    MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
1390                FROM seesaw_dlq
1391                GROUP BY correlation_id
1392             )
1393             SELECT
1394                COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
1395                COALESCE(e.failed_effects, 0) AS failed_effects,
1396                COALESCE(e.active_effects, 0) AS active_effects,
1397                COALESCE(d.dead_letters, 0) AS dead_letters,
1398                GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
1399                COALESCE(d.last_dlq_error, e.last_error) AS last_error
1400             FROM effect_agg e
1401             FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
1402             WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
1403             ORDER BY last_failed_at DESC NULLS LAST
1404             LIMIT $1",
1405        )
1406        .bind(limit as i64)
1407        .fetch_all(&self.pool)
1408        .await?;
1409
1410        Ok(rows
1411            .into_iter()
1412            .map(|row| FailedWorkflow {
1413                correlation_id: row.correlation_id,
1414                failed_effects: row.failed_effects,
1415                active_effects: row.active_effects,
1416                dead_letters: row.dead_letters,
1417                last_failed_at: row.last_failed_at,
1418                last_error: row.last_error,
1419            })
1420            .collect())
1421    }
1422}
1423
1424#[derive(FromRow)]
1425struct EffectTreeRow {
1426    event_id: Uuid,
1427    effect_id: String,
1428    status: String,
1429    result: Option<serde_json::Value>,
1430    error: Option<String>,
1431    attempts: i32,
1432    created_at: DateTime<Utc>,
1433    batch_id: Option<Uuid>,
1434    batch_index: Option<i32>,
1435    batch_size: Option<i32>,
1436}
1437
1438fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
1439    let stream_type = match row.stream_type.as_str() {
1440        "event_dispatched" => StreamType::EventDispatched,
1441        "effect_started" => StreamType::EffectStarted,
1442        "effect_completed" => StreamType::EffectCompleted,
1443        "effect_failed" => StreamType::EffectFailed,
1444        _ => StreamType::EventDispatched, // Default fallback
1445    };
1446
1447    // Extract event_type from payload if it's an event
1448    let event_type = if stream_type == StreamType::EventDispatched {
1449        row.payload
1450            .as_ref()
1451            .and_then(|p| p.get("event_type"))
1452            .and_then(|v| v.as_str())
1453            .map(|s| s.to_string())
1454    } else {
1455        None
1456    };
1457
1458    InsightEvent {
1459        seq: row.seq,
1460        stream_type,
1461        correlation_id: row.correlation_id,
1462        event_id: row.event_id,
1463        effect_event_id: row.effect_event_id,
1464        effect_id: row.effect_id,
1465        event_type,
1466        status: row.status,
1467        error: row.error,
1468        payload: row.payload,
1469        created_at: row.created_at,
1470    }
1471}
1472
1473fn build_event_tree(
1474    events: &[EventRow],
1475    effects: &[EffectTreeRow],
1476    parent_id: Option<Uuid>,
1477    event_ids: &HashSet<Uuid>,
1478    is_root_pass: bool,
1479) -> Vec<EventNode> {
1480    events
1481        .iter()
1482        .filter(|event| {
1483            if is_root_pass {
1484                event.parent_id.is_none()
1485                    || event
1486                        .parent_id
1487                        .map(|parent| !event_ids.contains(&parent))
1488                        .unwrap_or(false)
1489            } else {
1490                event.parent_id == parent_id
1491            }
1492        })
1493        .map(|event| {
1494            // Get effects for this event
1495            let event_effects: Vec<EffectNode> = effects
1496                .iter()
1497                .filter(|eff| eff.event_id == event.event_id)
1498                .map(|eff| EffectNode {
1499                    effect_id: eff.effect_id.clone(),
1500                    event_id: eff.event_id,
1501                    status: eff.status.clone(),
1502                    result: eff.result.clone(),
1503                    error: eff.error.clone(),
1504                    attempts: eff.attempts,
1505                    created_at: eff.created_at,
1506                    batch_id: eff.batch_id,
1507                    batch_index: eff.batch_index,
1508                    batch_size: eff.batch_size,
1509                })
1510                .collect();
1511
1512            // Recursively build children
1513            let children =
1514                build_event_tree(events, effects, Some(event.event_id), event_ids, false);
1515
1516            EventNode {
1517                event_id: event.event_id,
1518                event_type: event.event_type.clone(),
1519                payload: event.payload.clone(),
1520                created_at: event.created_at,
1521                batch_id: event.batch_id,
1522                batch_index: event.batch_index,
1523                batch_size: event.batch_size,
1524                children,
1525                effects: event_effects,
1526            }
1527        })
1528        .collect()
1529}
1530
1531#[cfg(test)]
1532mod tests {
1533    use super::*;
1534    use chrono::{TimeZone, Timelike};
1535
1536    #[test]
1537    fn emitted_event_created_at_is_midnight_on_parent_day() {
1538        let parent = Utc
1539            .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
1540            .single()
1541            .expect("valid timestamp");
1542
1543        let emitted = emitted_event_created_at(parent);
1544
1545        assert_eq!(emitted.date_naive(), parent.date_naive());
1546        assert_eq!(emitted.hour(), 0);
1547        assert_eq!(emitted.minute(), 0);
1548        assert_eq!(emitted.second(), 0);
1549    }
1550
1551    #[test]
1552    fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
1553        let first_parent = Utc
1554            .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
1555            .single()
1556            .expect("valid timestamp");
1557        let second_parent = Utc
1558            .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
1559            .single()
1560            .expect("valid timestamp");
1561
1562        let first_emitted = emitted_event_created_at(first_parent);
1563        let second_emitted = emitted_event_created_at(second_parent);
1564
1565        assert_eq!(first_emitted, second_emitted);
1566    }
1567
1568    #[test]
1569    fn effect_retry_delay_seconds_uses_exponential_backoff() {
1570        assert_eq!(effect_retry_delay_seconds(1), 1);
1571        assert_eq!(effect_retry_delay_seconds(2), 2);
1572        assert_eq!(effect_retry_delay_seconds(3), 4);
1573        assert_eq!(effect_retry_delay_seconds(4), 8);
1574    }
1575
1576    #[test]
1577    fn effect_retry_delay_seconds_is_capped() {
1578        assert_eq!(effect_retry_delay_seconds(9), 256);
1579        assert_eq!(effect_retry_delay_seconds(50), 256);
1580    }
1581
1582    #[test]
1583    fn build_event_tree_treats_orphan_parent_as_root() {
1584        let correlation_id = Uuid::new_v4();
1585        let event_id = Uuid::new_v4();
1586        let missing_parent = Uuid::new_v4();
1587        let now = Utc::now();
1588
1589        let events = vec![EventRow {
1590            id: 1,
1591            event_id,
1592            parent_id: Some(missing_parent),
1593            correlation_id,
1594            event_type: "OrphanEvent".to_string(),
1595            payload: serde_json::json!({"ok": true}),
1596            hops: 1,
1597            retry_count: 0,
1598            batch_id: None,
1599            batch_index: None,
1600            batch_size: None,
1601            created_at: now,
1602        }];
1603
1604        let effects: Vec<EffectTreeRow> = Vec::new();
1605        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1606
1607        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1608
1609        assert_eq!(roots.len(), 1);
1610        assert_eq!(roots[0].event_id, event_id);
1611    }
1612}