Skip to main content

seesaw_postgres/
lib.rs

1// Simplified PostgresStore without compile-time checked queries
2// Uses dynamic queries for easier testing
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8    insight::*, EmittedEvent, JoinEntry, QueuedEffectExecution, QueuedEvent, Store,
9    NAMESPACE_SEESAW,
10};
11use serde::{Deserialize, Serialize};
12use sqlx::{FromRow, PgPool};
13use std::collections::HashSet;
14use uuid::Uuid;
15
16const EVENT_CLAIM_SECONDS: i64 = 30;
17
18fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
19    parent_created_at
20        .date_naive()
21        .and_hms_opt(0, 0, 0)
22        .expect("midnight should always be a valid UTC timestamp")
23        .and_utc()
24}
25
26fn effect_retry_delay_seconds(attempts: i32) -> i64 {
27    let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
28    1_i64 << exponent
29}
30
31/// PostgreSQL implementation of Store trait
32pub struct PostgresStore {
33    pool: PgPool,
34}
35
36impl PostgresStore {
37    pub fn new(pool: PgPool) -> Self {
38        Self { pool }
39    }
40
41    pub fn pool(&self) -> &PgPool {
42        &self.pool
43    }
44}
45
46impl Clone for PostgresStore {
47    fn clone(&self) -> Self {
48        Self {
49            pool: self.pool.clone(),
50        }
51    }
52}
53
54#[derive(FromRow)]
55struct EventRow {
56    id: i64,
57    event_id: Uuid,
58    parent_id: Option<Uuid>,
59    correlation_id: Uuid,
60    event_type: String,
61    payload: serde_json::Value,
62    hops: i32,
63    batch_id: Option<Uuid>,
64    batch_index: Option<i32>,
65    batch_size: Option<i32>,
66    created_at: DateTime<Utc>,
67}
68
69#[derive(FromRow)]
70struct StateRow {
71    state: serde_json::Value,
72    version: i32,
73}
74
75#[derive(FromRow)]
76struct EffectRow {
77    event_id: Uuid,
78    effect_id: String,
79    correlation_id: Uuid,
80    event_type: String,
81    event_payload: serde_json::Value,
82    parent_event_id: Option<Uuid>,
83    batch_id: Option<Uuid>,
84    batch_index: Option<i32>,
85    batch_size: Option<i32>,
86    execute_at: DateTime<Utc>,
87    timeout_seconds: i32,
88    max_attempts: i32,
89    priority: i32,
90    attempts: i32,
91}
92
93#[derive(FromRow)]
94struct ParentEventRow {
95    hops: i32,
96    created_at: DateTime<Utc>,
97}
98
99#[derive(FromRow)]
100struct WorkflowEventRow {
101    id: i64,
102    event_id: Uuid,
103    correlation_id: Uuid,
104    event_type: String,
105    payload: serde_json::Value,
106    created_at: DateTime<Utc>,
107}
108
109#[async_trait]
110impl Store for PostgresStore {
111    async fn publish(&self, event: QueuedEvent) -> Result<()> {
112        let mut tx = self.pool.begin().await?;
113
114        // Use the non-partitioned ledger as the idempotency guard. This keeps
115        // webhook/process_with_id dedupe stable even when created_at differs.
116        let inserted: Option<Uuid> = sqlx::query_scalar(
117            "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
118             VALUES ($1, $2, $3)
119             ON CONFLICT (event_id) DO NOTHING
120             RETURNING event_id",
121        )
122        .bind(event.event_id)
123        .bind(event.correlation_id)
124        .bind(event.created_at)
125        .fetch_optional(&mut *tx)
126        .await?;
127
128        if inserted.is_none() {
129            tx.commit().await?;
130            return Ok(());
131        }
132
133        sqlx::query(
134            "INSERT INTO seesaw_events (
135                event_id, parent_id, correlation_id, event_type, payload, hops,
136                batch_id, batch_index, batch_size, created_at
137             )
138             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
139        )
140        .bind(event.event_id)
141        .bind(event.parent_id)
142        .bind(event.correlation_id)
143        .bind(event.event_type)
144        .bind(event.payload)
145        .bind(event.hops)
146        .bind(event.batch_id)
147        .bind(event.batch_index)
148        .bind(event.batch_size)
149        .bind(event.created_at)
150        .execute(&mut *tx)
151        .await?;
152
153        tx.commit().await?;
154
155        Ok(())
156    }
157
158    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
159        let row: Option<EventRow> = sqlx::query_as(
160            "WITH next_event AS (
161                SELECT e.id
162                FROM seesaw_events e
163                WHERE e.processed_at IS NULL
164                  AND (e.locked_until IS NULL OR e.locked_until < NOW())
165                  AND NOT EXISTS (
166                    SELECT 1
167                    FROM seesaw_events older
168                    WHERE older.correlation_id = e.correlation_id
169                      AND older.processed_at IS NULL
170                      AND (
171                        older.created_at < e.created_at
172                        OR (older.created_at = e.created_at AND older.id < e.id)
173                      )
174                  )
175                ORDER BY e.created_at ASC, e.id ASC
176                LIMIT 1
177                FOR UPDATE SKIP LOCKED
178            )
179            UPDATE seesaw_events e
180            SET locked_until = NOW() + ($1 * INTERVAL '1 second')
181            FROM next_event
182            WHERE e.id = next_event.id
183            RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload,
184                      e.hops, e.batch_id, e.batch_index, e.batch_size, e.created_at",
185        )
186        .bind(EVENT_CLAIM_SECONDS)
187        .fetch_optional(&self.pool)
188        .await?;
189
190        Ok(row.map(|r| QueuedEvent {
191            id: r.id,
192            event_id: r.event_id,
193            parent_id: r.parent_id,
194            correlation_id: r.correlation_id,
195            event_type: r.event_type,
196            payload: r.payload,
197            hops: r.hops,
198            batch_id: r.batch_id,
199            batch_index: r.batch_index,
200            batch_size: r.batch_size,
201            created_at: r.created_at,
202        }))
203    }
204
205    async fn ack(&self, id: i64) -> Result<()> {
206        sqlx::query(
207            "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
208        )
209        .bind(id)
210        .execute(&self.pool)
211        .await?;
212        Ok(())
213    }
214
215    async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
216        let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
217        sqlx::query(
218            "UPDATE seesaw_events
219             SET retry_count = retry_count + 1,
220                 locked_until = $2
221             WHERE id = $1",
222        )
223        .bind(id)
224        .bind(locked_until)
225        .execute(&self.pool)
226        .await?;
227        Ok(())
228    }
229
230    async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
231    where
232        S: for<'de> Deserialize<'de> + Send,
233    {
234        let row: Option<StateRow> =
235            sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
236                .bind(correlation_id)
237                .fetch_optional(&self.pool)
238                .await?;
239
240        match row {
241            Some(r) => {
242                let state: S = serde_json::from_value(r.state)?;
243                Ok(Some((state, r.version)))
244            }
245            None => Ok(None),
246        }
247    }
248
249    async fn save_state<S>(
250        &self,
251        correlation_id: Uuid,
252        state: &S,
253        expected_version: i32,
254    ) -> Result<i32>
255    where
256        S: Serialize + Send + Sync,
257    {
258        let state_json = serde_json::to_value(state)?;
259        let new_version = expected_version + 1;
260
261        let result = sqlx::query(
262            "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
263             VALUES ($1, $2, $3, NOW())
264             ON CONFLICT (correlation_id) DO UPDATE
265             SET state = $2,
266                 version = $3,
267                 updated_at = NOW()
268             WHERE seesaw_state.version = $4",
269        )
270        .bind(correlation_id)
271        .bind(&state_json)
272        .bind(new_version)
273        .bind(expected_version)
274        .execute(&self.pool)
275        .await?;
276
277        if result.rows_affected() == 0 {
278            anyhow::bail!("Version conflict: state was modified concurrently");
279        }
280
281        Ok(new_version)
282    }
283
284    async fn insert_effect_intent(
285        &self,
286        event_id: Uuid,
287        effect_id: String,
288        correlation_id: Uuid,
289        event_type: String,
290        event_payload: serde_json::Value,
291        parent_event_id: Option<Uuid>,
292        batch_id: Option<Uuid>,
293        batch_index: Option<i32>,
294        batch_size: Option<i32>,
295        execute_at: DateTime<Utc>,
296        timeout_seconds: i32,
297        max_attempts: i32,
298        priority: i32,
299    ) -> Result<()> {
300        sqlx::query(
301            "INSERT INTO seesaw_effect_executions (
302                event_id, effect_id, correlation_id, status,
303                event_type, event_payload, parent_event_id,
304                batch_id, batch_index, batch_size,
305                execute_at, timeout_seconds, max_attempts, priority
306             )
307             VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
308        )
309        .bind(event_id)
310        .bind(effect_id)
311        .bind(correlation_id)
312        .bind(event_type)
313        .bind(event_payload)
314        .bind(parent_event_id)
315        .bind(batch_id)
316        .bind(batch_index)
317        .bind(batch_size)
318        .bind(execute_at)
319        .bind(timeout_seconds)
320        .bind(max_attempts)
321        .bind(priority)
322        .execute(&self.pool)
323        .await?;
324
325        Ok(())
326    }
327
328    async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
329        let row: Option<EffectRow> = sqlx::query_as(
330            "WITH next_effect AS (
331                SELECT event_id, effect_id
332                FROM seesaw_effect_executions
333                WHERE (
334                    status = 'pending'
335                    OR (status = 'failed' AND attempts < max_attempts)
336                )
337                  AND execute_at <= NOW()
338                ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
339                LIMIT 1
340                FOR UPDATE SKIP LOCKED
341            )
342            UPDATE seesaw_effect_executions e
343            SET status = 'executing',
344                claimed_at = NOW(),
345                last_attempted_at = NOW(),
346                attempts = e.attempts + 1
347            FROM next_effect
348            WHERE e.event_id = next_effect.event_id
349              AND e.effect_id = next_effect.effect_id
350            RETURNING
351                e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
352                e.batch_id, e.batch_index, e.batch_size,
353                e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
354        )
355        .fetch_optional(&self.pool)
356        .await?;
357
358        if let Some(r) = row {
359            Ok(Some(QueuedEffectExecution {
360                event_id: r.event_id,
361                effect_id: r.effect_id,
362                correlation_id: r.correlation_id,
363                event_type: r.event_type,
364                event_payload: r.event_payload,
365                parent_event_id: r.parent_event_id,
366                batch_id: r.batch_id,
367                batch_index: r.batch_index,
368                batch_size: r.batch_size,
369                execute_at: r.execute_at,
370                timeout_seconds: r.timeout_seconds,
371                max_attempts: r.max_attempts,
372                priority: r.priority,
373                attempts: r.attempts,
374            }))
375        } else {
376            Ok(None)
377        }
378    }
379
380    async fn complete_effect(
381        &self,
382        event_id: Uuid,
383        effect_id: String,
384        result: serde_json::Value,
385    ) -> Result<()> {
386        sqlx::query(
387            "UPDATE seesaw_effect_executions
388             SET status = 'completed',
389                 result = $3,
390                 completed_at = NOW()
391             WHERE event_id = $1 AND effect_id = $2",
392        )
393        .bind(event_id)
394        .bind(effect_id)
395        .bind(result)
396        .execute(&self.pool)
397        .await?;
398
399        Ok(())
400    }
401
402    async fn complete_effect_with_events(
403        &self,
404        event_id: Uuid,
405        effect_id: String,
406        result: serde_json::Value,
407        emitted_events: Vec<EmittedEvent>,
408    ) -> Result<()> {
409        // Get correlation_id and hops for emitted events
410        let effect: EffectRow = sqlx::query_as(
411            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
412                    batch_id, batch_index, batch_size,
413                    execute_at, timeout_seconds, max_attempts, priority, attempts
414             FROM seesaw_effect_executions
415             WHERE event_id = $1 AND effect_id = $2",
416        )
417        .bind(event_id)
418        .bind(&effect_id)
419        .fetch_one(&self.pool)
420        .await?;
421
422        // Read parent metadata for deterministic hop increment and timestamp.
423        let parent: ParentEventRow = sqlx::query_as(
424            "SELECT hops, created_at
425             FROM seesaw_events
426             WHERE event_id = $1
427             ORDER BY created_at ASC, id ASC
428             LIMIT 1",
429        )
430        .bind(event_id)
431        .fetch_one(&self.pool)
432        .await?;
433
434        // Start transaction for atomicity
435        let mut tx = self.pool.begin().await?;
436
437        // Insert emitted events with deterministic IDs
438        for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
439            // Generate deterministic event_id from hash(parent_event_id, effect_id, event_type)
440            let deterministic_id = Uuid::new_v5(
441                &NAMESPACE_SEESAW,
442                format!(
443                    "{}-{}-{}-{}",
444                    event_id, effect_id, emitted.event_type, emitted_index
445                )
446                .as_bytes(),
447            );
448
449            // Deterministic timestamp keeps retries idempotent while staying in
450            // the same partition day as the parent event.
451            let deterministic_timestamp = emitted_event_created_at(parent.created_at);
452
453            // Insert event (idempotent via ON CONFLICT on (event_id, created_at))
454            sqlx::query(
455                "INSERT INTO seesaw_events (
456                    event_id, parent_id, correlation_id, event_type, payload, hops,
457                    batch_id, batch_index, batch_size, created_at
458                 )
459                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
460                 ON CONFLICT (event_id, created_at) DO NOTHING",
461            )
462            .bind(deterministic_id)
463            .bind(Some(event_id))
464            .bind(effect.correlation_id)
465            .bind(&emitted.event_type)
466            .bind(emitted.payload)
467            .bind(parent.hops + 1)
468            .bind(emitted.batch_id)
469            .bind(emitted.batch_index)
470            .bind(emitted.batch_size)
471            .bind(deterministic_timestamp)
472            .execute(&mut *tx)
473            .await?;
474        }
475
476        // Mark effect as completed (same transaction)
477        sqlx::query(
478            "UPDATE seesaw_effect_executions
479             SET status = 'completed',
480                 result = $3,
481                 completed_at = NOW()
482             WHERE event_id = $1 AND effect_id = $2",
483        )
484        .bind(event_id)
485        .bind(effect_id)
486        .bind(result)
487        .execute(&mut *tx)
488        .await?;
489
490        // Commit transaction - both succeed or both fail
491        tx.commit().await?;
492
493        Ok(())
494    }
495
496    async fn fail_effect(
497        &self,
498        event_id: Uuid,
499        effect_id: String,
500        error: String,
501        attempts: i32,
502    ) -> Result<()> {
503        let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
504        sqlx::query(
505            "UPDATE seesaw_effect_executions
506             SET status = 'failed',
507                 error = $3,
508                 execute_at = $5,
509                 claimed_at = NULL
510             WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
511        )
512        .bind(event_id)
513        .bind(effect_id)
514        .bind(error)
515        .bind(attempts)
516        .bind(retry_at)
517        .execute(&self.pool)
518        .await?;
519
520        Ok(())
521    }
522
523    async fn dlq_effect(
524        &self,
525        event_id: Uuid,
526        effect_id: String,
527        error: String,
528        reason: String,
529        attempts: i32,
530    ) -> Result<()> {
531        // Get effect details for DLQ
532        let effect: EffectRow = sqlx::query_as(
533            "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
534                    batch_id, batch_index, batch_size,
535                    execute_at, timeout_seconds, max_attempts, priority, attempts
536             FROM seesaw_effect_executions
537             WHERE event_id = $1 AND effect_id = $2",
538        )
539        .bind(event_id)
540        .bind(&effect_id)
541        .fetch_one(&self.pool)
542        .await?;
543
544        let parent = sqlx::query_as::<_, ParentEventRow>(
545            "SELECT hops, created_at
546             FROM seesaw_events
547             WHERE event_id = $1
548             ORDER BY created_at ASC, id ASC
549             LIMIT 1",
550        )
551        .bind(event_id)
552        .fetch_optional(&self.pool)
553        .await?;
554
555        let mut tx = self.pool.begin().await?;
556
557        // Insert into DLQ
558        sqlx::query(
559            "INSERT INTO seesaw_dlq (
560                event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
561             )
562             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
563        )
564        .bind(event_id)
565        .bind(&effect_id)
566        .bind(effect.correlation_id)
567        .bind(&error)
568        .bind(&effect.event_type)
569        .bind(&effect.event_payload)
570        .bind(&reason)
571        .bind(attempts)
572        .execute(&mut *tx)
573        .await?;
574
575        if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
576            (effect.batch_id, effect.batch_index, effect.batch_size)
577        {
578            let synthetic_event_id = Uuid::new_v5(
579                &NAMESPACE_SEESAW,
580                format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
581            );
582            let synthetic_created_at = parent
583                .as_ref()
584                .map(|row| emitted_event_created_at(row.created_at))
585                .unwrap_or_else(Utc::now);
586            let synthetic_hops = parent.as_ref().map(|row| row.hops + 1).unwrap_or(0);
587
588            sqlx::query(
589                "INSERT INTO seesaw_events (
590                    event_id, parent_id, correlation_id, event_type, payload, hops,
591                    batch_id, batch_index, batch_size, created_at
592                 )
593                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
594                 ON CONFLICT (event_id, created_at) DO NOTHING",
595            )
596            .bind(synthetic_event_id)
597            .bind(Some(event_id))
598            .bind(effect.correlation_id)
599            .bind(&effect.event_type)
600            .bind(&effect.event_payload)
601            .bind(synthetic_hops)
602            .bind(Some(batch_id))
603            .bind(Some(batch_index))
604            .bind(Some(batch_size))
605            .bind(synthetic_created_at)
606            .execute(&mut *tx)
607            .await?;
608        }
609
610        // Delete from executions table
611        sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
612            .bind(event_id)
613            .bind(&effect_id)
614            .execute(&mut *tx)
615            .await?;
616
617        tx.commit().await?;
618
619        Ok(())
620    }
621
622    async fn subscribe_workflow_events(
623        &self,
624        correlation_id: Uuid,
625    ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
626        use sqlx::postgres::PgListener;
627
628        let channel = format!("seesaw_workflow_{}", correlation_id);
629        const PAGE_SIZE: i64 = 256;
630        const CATCH_UP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
631
632        // Establish a cursor at subscribe time so callers only receive new
633        // workflow events emitted after this subscription starts.
634        let initial_cursor: Option<(DateTime<Utc>, i64)> = sqlx::query_as(
635            "SELECT created_at, id
636             FROM seesaw_events
637             WHERE correlation_id = $1
638             ORDER BY created_at DESC, id DESC
639             LIMIT 1",
640        )
641        .bind(correlation_id)
642        .fetch_optional(&self.pool)
643        .await?;
644
645        // Create a new listener connection
646        let mut listener = PgListener::connect_with(&self.pool).await?;
647        listener.listen(&channel).await?;
648
649        let pool = self.pool.clone();
650        let (tx, rx) = futures::channel::mpsc::unbounded::<seesaw_core::WorkflowEvent>();
651
652        tokio::spawn(async move {
653            let mut cursor = initial_cursor;
654            let mut drain_pending = true;
655
656            loop {
657                if !drain_pending {
658                    match tokio::time::timeout(CATCH_UP_INTERVAL, listener.recv()).await {
659                        Ok(Ok(_notification)) => {}
660                        Ok(Err(error)) => {
661                            tracing::warn!(
662                                "workflow listener recv failed for {}: {}",
663                                correlation_id,
664                                error
665                            );
666                            return;
667                        }
668                        Err(_) => {}
669                    }
670                }
671                drain_pending = false;
672
673                loop {
674                    let rows_result: std::result::Result<Vec<WorkflowEventRow>, sqlx::Error> =
675                        if let Some((created_at, id)) = cursor {
676                            sqlx::query_as(
677                                "SELECT id, event_id, correlation_id, event_type, payload, created_at
678                                 FROM seesaw_events
679                                 WHERE correlation_id = $1
680                                   AND (
681                                        created_at > $2
682                                        OR (created_at = $2 AND id > $3)
683                                   )
684                                 ORDER BY created_at ASC, id ASC
685                                 LIMIT $4",
686                            )
687                            .bind(correlation_id)
688                            .bind(created_at)
689                            .bind(id)
690                            .bind(PAGE_SIZE)
691                            .fetch_all(&pool)
692                            .await
693                        } else {
694                            sqlx::query_as(
695                                "SELECT id, event_id, correlation_id, event_type, payload, created_at
696                                 FROM seesaw_events
697                                 WHERE correlation_id = $1
698                                 ORDER BY created_at ASC, id ASC
699                                 LIMIT $2",
700                            )
701                            .bind(correlation_id)
702                            .bind(PAGE_SIZE)
703                            .fetch_all(&pool)
704                            .await
705                        };
706
707                    let rows = match rows_result {
708                        Ok(rows) => rows,
709                        Err(error) => {
710                            tracing::warn!(
711                                "workflow event query failed for {}: {}",
712                                correlation_id,
713                                error
714                            );
715                            return;
716                        }
717                    };
718
719                    if rows.is_empty() {
720                        break;
721                    }
722
723                    for row in rows {
724                        cursor = Some((row.created_at, row.id));
725                        if tx
726                            .unbounded_send(seesaw_core::WorkflowEvent {
727                                event_id: row.event_id,
728                                correlation_id: row.correlation_id,
729                                event_type: row.event_type,
730                                payload: row.payload,
731                            })
732                            .is_err()
733                        {
734                            return;
735                        }
736                    }
737                }
738            }
739        });
740
741        Ok(Box::new(rx))
742    }
743
744    async fn get_workflow_status(
745        &self,
746        correlation_id: Uuid,
747    ) -> Result<seesaw_core::WorkflowStatus> {
748        let state = sqlx::query_as::<_, (serde_json::Value,)>(
749            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
750        )
751        .bind(correlation_id)
752        .fetch_optional(&self.pool)
753        .await?
754        .map(|r| r.0);
755
756        let pending_effects = sqlx::query_as::<_, (i64,)>(
757            "SELECT COUNT(*) FROM seesaw_effect_executions
758             WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
759        )
760        .bind(correlation_id)
761        .fetch_one(&self.pool)
762        .await?
763        .0;
764
765        let last_event = sqlx::query_as::<_, (String,)>(
766            "SELECT event_type FROM seesaw_events
767             WHERE correlation_id = $1
768             ORDER BY created_at DESC, id DESC
769             LIMIT 1",
770        )
771        .bind(correlation_id)
772        .fetch_optional(&self.pool)
773        .await?
774        .map(|r| r.0);
775
776        Ok(seesaw_core::WorkflowStatus {
777            correlation_id,
778            state,
779            pending_effects,
780            is_settled: pending_effects == 0,
781            last_event,
782        })
783    }
784
785    async fn join_same_batch_append_and_maybe_claim(
786        &self,
787        join_effect_id: String,
788        correlation_id: Uuid,
789        source_event_id: Uuid,
790        source_event_type: String,
791        source_payload: serde_json::Value,
792        source_created_at: DateTime<Utc>,
793        batch_id: Uuid,
794        batch_index: i32,
795        batch_size: i32,
796    ) -> Result<Option<Vec<JoinEntry>>> {
797        let mut tx = self.pool.begin().await?;
798
799        sqlx::query(
800            "INSERT INTO seesaw_join_entries (
801                join_effect_id, correlation_id, source_event_id, source_event_type, source_payload,
802                source_created_at, batch_id, batch_index, batch_size
803             )
804             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
805             ON CONFLICT (join_effect_id, correlation_id, source_event_id) DO NOTHING",
806        )
807        .bind(&join_effect_id)
808        .bind(correlation_id)
809        .bind(source_event_id)
810        .bind(&source_event_type)
811        .bind(source_payload)
812        .bind(source_created_at)
813        .bind(batch_id)
814        .bind(batch_index)
815        .bind(batch_size)
816        .execute(&mut *tx)
817        .await?;
818
819        sqlx::query(
820            "INSERT INTO seesaw_join_windows (
821                join_effect_id, correlation_id, mode, batch_id, target_count, status
822             )
823             VALUES ($1, $2, 'same_batch', $3, $4, 'open')
824             ON CONFLICT (join_effect_id, correlation_id, batch_id) DO NOTHING",
825        )
826        .bind(&join_effect_id)
827        .bind(correlation_id)
828        .bind(batch_id)
829        .bind(batch_size)
830        .execute(&mut *tx)
831        .await?;
832
833        sqlx::query(
834            "UPDATE seesaw_join_windows
835             SET target_count = $4,
836                 updated_at = NOW()
837             WHERE join_effect_id = $1
838               AND correlation_id = $2
839               AND batch_id = $3
840               AND target_count <> $4",
841        )
842        .bind(&join_effect_id)
843        .bind(correlation_id)
844        .bind(batch_id)
845        .bind(batch_size)
846        .execute(&mut *tx)
847        .await?;
848
849        let claimed: Option<(String,)> = sqlx::query_as(
850            "UPDATE seesaw_join_windows w
851             SET status = 'processing',
852                 sealed_at = COALESCE(w.sealed_at, NOW()),
853                 processing_started_at = NOW(),
854                 updated_at = NOW(),
855                 last_error = NULL
856             WHERE w.join_effect_id = $1
857               AND w.correlation_id = $2
858               AND w.batch_id = $3
859               AND w.status = 'open'
860               AND (
861                    SELECT COUNT(*)::int
862                    FROM seesaw_join_entries e
863                    WHERE e.join_effect_id = w.join_effect_id
864                      AND e.correlation_id = w.correlation_id
865                      AND e.batch_id = w.batch_id
866               ) >= w.target_count
867             RETURNING w.join_effect_id",
868        )
869        .bind(&join_effect_id)
870        .bind(correlation_id)
871        .bind(batch_id)
872        .fetch_optional(&mut *tx)
873        .await?;
874
875        if claimed.is_none() {
876            tx.commit().await?;
877            return Ok(None);
878        }
879
880        let rows = sqlx::query_as::<_, (Uuid, String, serde_json::Value, Uuid, i32, i32, DateTime<Utc>)>(
881            "SELECT source_event_id, source_event_type, source_payload, batch_id, batch_index, batch_size, source_created_at
882             FROM seesaw_join_entries
883             WHERE join_effect_id = $1
884               AND correlation_id = $2
885               AND batch_id = $3
886             ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
887        )
888        .bind(&join_effect_id)
889        .bind(correlation_id)
890        .bind(batch_id)
891        .fetch_all(&mut *tx)
892        .await?;
893
894        let entries = rows
895            .into_iter()
896            .map(
897                |(source_event_id, event_type, payload, batch_id, batch_index, batch_size, created_at)| JoinEntry {
898                    source_event_id,
899                    event_type,
900                    payload,
901                    batch_id,
902                    batch_index,
903                    batch_size,
904                    created_at,
905                },
906            )
907            .collect::<Vec<_>>();
908
909        tx.commit().await?;
910        Ok(Some(entries))
911    }
912
913    async fn join_same_batch_complete(
914        &self,
915        join_effect_id: String,
916        correlation_id: Uuid,
917        batch_id: Uuid,
918    ) -> Result<()> {
919        let mut tx = self.pool.begin().await?;
920
921        sqlx::query(
922            "UPDATE seesaw_join_windows
923             SET status = 'completed',
924                 completed_at = NOW(),
925                 updated_at = NOW()
926             WHERE join_effect_id = $1
927               AND correlation_id = $2
928               AND batch_id = $3",
929        )
930        .bind(&join_effect_id)
931        .bind(correlation_id)
932        .bind(batch_id)
933        .execute(&mut *tx)
934        .await?;
935
936        sqlx::query(
937            "DELETE FROM seesaw_join_entries
938             WHERE join_effect_id = $1
939               AND correlation_id = $2
940               AND batch_id = $3",
941        )
942        .bind(&join_effect_id)
943        .bind(correlation_id)
944        .bind(batch_id)
945        .execute(&mut *tx)
946        .await?;
947
948        sqlx::query(
949            "DELETE FROM seesaw_join_windows
950             WHERE join_effect_id = $1
951               AND correlation_id = $2
952               AND batch_id = $3",
953        )
954        .bind(&join_effect_id)
955        .bind(correlation_id)
956        .bind(batch_id)
957        .execute(&mut *tx)
958        .await?;
959
960        tx.commit().await?;
961        Ok(())
962    }
963
964    async fn join_same_batch_release(
965        &self,
966        join_effect_id: String,
967        correlation_id: Uuid,
968        batch_id: Uuid,
969        error: String,
970    ) -> Result<()> {
971        sqlx::query(
972            "UPDATE seesaw_join_windows
973             SET status = 'open',
974                 processing_started_at = NULL,
975                 last_error = $4,
976                 updated_at = NOW()
977             WHERE join_effect_id = $1
978               AND correlation_id = $2
979               AND batch_id = $3
980               AND status = 'processing'",
981        )
982        .bind(&join_effect_id)
983        .bind(correlation_id)
984        .bind(batch_id)
985        .bind(error)
986        .execute(&self.pool)
987        .await?;
988
989        Ok(())
990    }
991}
992
993#[derive(FromRow)]
994struct StreamRow {
995    seq: i64,
996    stream_type: String,
997    correlation_id: Uuid,
998    event_id: Option<Uuid>,
999    effect_event_id: Option<Uuid>,
1000    effect_id: Option<String>,
1001    status: Option<String>,
1002    error: Option<String>,
1003    payload: Option<serde_json::Value>,
1004    created_at: DateTime<Utc>,
1005}
1006
1007#[derive(FromRow)]
1008struct EffectLogRow {
1009    correlation_id: Uuid,
1010    event_id: Uuid,
1011    effect_id: String,
1012    status: String,
1013    attempts: i32,
1014    event_type: String,
1015    result: Option<serde_json::Value>,
1016    error: Option<String>,
1017    created_at: DateTime<Utc>,
1018    execute_at: DateTime<Utc>,
1019    claimed_at: Option<DateTime<Utc>>,
1020    last_attempted_at: Option<DateTime<Utc>>,
1021    completed_at: Option<DateTime<Utc>>,
1022}
1023
1024#[derive(FromRow)]
1025struct DeadLetterRow {
1026    correlation_id: Uuid,
1027    event_id: Uuid,
1028    effect_id: String,
1029    event_type: String,
1030    event_payload: serde_json::Value,
1031    error: String,
1032    reason: String,
1033    attempts: i32,
1034    failed_at: DateTime<Utc>,
1035    resolved_at: Option<DateTime<Utc>>,
1036}
1037
1038#[derive(FromRow)]
1039struct FailedWorkflowRow {
1040    correlation_id: Uuid,
1041    failed_effects: i64,
1042    active_effects: i64,
1043    dead_letters: i64,
1044    last_failed_at: Option<DateTime<Utc>>,
1045    last_error: Option<String>,
1046}
1047
1048#[async_trait]
1049impl InsightStore for PostgresStore {
1050    async fn subscribe_events(
1051        &self,
1052    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
1053        use futures::stream::StreamExt;
1054        use sqlx::postgres::PgListener;
1055
1056        // Create a new listener connection
1057        let mut listener = PgListener::connect_with(&self.pool).await?;
1058        listener.listen("seesaw_stream").await?;
1059
1060        // Convert listener into a stream of InsightEvent
1061        let pool = self.pool.clone();
1062        let stream = listener.into_stream().filter_map(move |result| {
1063            let pool = pool.clone();
1064            Box::pin(async move {
1065                match result {
1066                    Ok(_notification) => {
1067                        // Fetch latest entry from stream table
1068                        // (notification payload is just correlation_id for wake-up)
1069                        if let Ok(row) = sqlx::query_as::<_, StreamRow>(
1070                            "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1071                                    effect_id, status, error, payload, created_at
1072                             FROM seesaw_stream
1073                             ORDER BY seq DESC
1074                             LIMIT 1",
1075                        )
1076                        .fetch_one(&pool)
1077                        .await
1078                        {
1079                            Some(stream_row_to_insight_event(row))
1080                        } else {
1081                            None
1082                        }
1083                    }
1084                    Err(_) => None,
1085                }
1086            })
1087        });
1088
1089        Ok(Box::new(stream))
1090    }
1091
1092    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
1093        // Get all events for this correlation
1094        let events = sqlx::query_as::<_, EventRow>(
1095            "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
1096                    batch_id, batch_index, batch_size, created_at
1097             FROM seesaw_events
1098             WHERE correlation_id = $1
1099             ORDER BY created_at ASC",
1100        )
1101        .bind(correlation_id)
1102        .fetch_all(&self.pool)
1103        .await?;
1104
1105        // Get all effects for this correlation
1106        let effects = sqlx::query_as::<_, EffectTreeRow>(
1107            "SELECT event_id, effect_id, status, result, error, attempts, created_at,
1108                    batch_id, batch_index, batch_size
1109             FROM seesaw_effect_executions
1110             WHERE correlation_id = $1
1111             ORDER BY created_at ASC",
1112        )
1113        .bind(correlation_id)
1114        .fetch_all(&self.pool)
1115        .await?;
1116
1117        // Build tree structure
1118        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1119        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1120
1121        // Get state
1122        let state = sqlx::query_as::<_, (serde_json::Value,)>(
1123            "SELECT state FROM seesaw_state WHERE correlation_id = $1",
1124        )
1125        .bind(correlation_id)
1126        .fetch_optional(&self.pool)
1127        .await?
1128        .map(|r| r.0);
1129
1130        Ok(WorkflowTree {
1131            correlation_id,
1132            roots,
1133            state,
1134            event_count: events.len(),
1135            effect_count: effects.len(),
1136        })
1137    }
1138
1139    async fn get_stats(&self) -> Result<InsightStats> {
1140        let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
1141            .fetch_one(&self.pool)
1142            .await?
1143            .0;
1144
1145        let active_effects = sqlx::query_as::<_, (i64,)>(
1146            "SELECT COUNT(*) FROM seesaw_effect_executions
1147             WHERE status IN ('pending', 'executing')",
1148        )
1149        .fetch_one(&self.pool)
1150        .await?
1151        .0;
1152
1153        let completed_effects = sqlx::query_as::<_, (i64,)>(
1154            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
1155        )
1156        .fetch_one(&self.pool)
1157        .await?
1158        .0;
1159
1160        let failed_effects = sqlx::query_as::<_, (i64,)>(
1161            "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
1162        )
1163        .fetch_one(&self.pool)
1164        .await?
1165        .0;
1166
1167        Ok(InsightStats {
1168            total_events,
1169            active_effects,
1170            completed_effects,
1171            failed_effects,
1172        })
1173    }
1174
1175    async fn get_recent_events(
1176        &self,
1177        cursor: Option<i64>,
1178        limit: usize,
1179    ) -> Result<Vec<InsightEvent>> {
1180        let rows = if let Some(cursor_seq) = cursor {
1181            sqlx::query_as::<_, StreamRow>(
1182                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1183                        effect_id, status, error, payload, created_at
1184                 FROM seesaw_stream
1185                 WHERE seq > $1
1186                 ORDER BY seq ASC
1187                 LIMIT $2",
1188            )
1189            .bind(cursor_seq)
1190            .bind(limit as i64)
1191            .fetch_all(&self.pool)
1192            .await?
1193        } else {
1194            sqlx::query_as::<_, StreamRow>(
1195                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1196                        effect_id, status, error, payload, created_at
1197                 FROM seesaw_stream
1198                 ORDER BY seq DESC
1199                 LIMIT $1",
1200            )
1201            .bind(limit as i64)
1202            .fetch_all(&self.pool)
1203            .await?
1204        };
1205
1206        Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
1207    }
1208
1209    async fn get_effect_logs(
1210        &self,
1211        correlation_id: Option<Uuid>,
1212        limit: usize,
1213    ) -> Result<Vec<EffectExecutionLog>> {
1214        let rows = sqlx::query_as::<_, EffectLogRow>(
1215            "SELECT
1216                correlation_id,
1217                event_id,
1218                effect_id,
1219                status,
1220                attempts,
1221                event_type,
1222                result,
1223                error,
1224                created_at,
1225                execute_at,
1226                claimed_at,
1227                last_attempted_at,
1228                completed_at
1229             FROM seesaw_effect_executions
1230             WHERE ($1::uuid IS NULL OR correlation_id = $1)
1231             ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
1232             LIMIT $2",
1233        )
1234        .bind(correlation_id)
1235        .bind(limit as i64)
1236        .fetch_all(&self.pool)
1237        .await?;
1238
1239        Ok(rows
1240            .into_iter()
1241            .map(|row| {
1242                let started_at = row.claimed_at.or(row.last_attempted_at);
1243                let duration_ms = match (started_at, row.completed_at) {
1244                    (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
1245                    _ => None,
1246                };
1247
1248                EffectExecutionLog {
1249                    correlation_id: row.correlation_id,
1250                    event_id: row.event_id,
1251                    effect_id: row.effect_id,
1252                    status: row.status,
1253                    attempts: row.attempts,
1254                    event_type: Some(row.event_type),
1255                    result: row.result,
1256                    error: row.error,
1257                    created_at: row.created_at,
1258                    execute_at: Some(row.execute_at),
1259                    claimed_at: row.claimed_at,
1260                    last_attempted_at: row.last_attempted_at,
1261                    completed_at: row.completed_at,
1262                    duration_ms,
1263                }
1264            })
1265            .collect())
1266    }
1267
1268    async fn get_dead_letters(
1269        &self,
1270        unresolved_only: bool,
1271        limit: usize,
1272    ) -> Result<Vec<DeadLetterEntry>> {
1273        let rows = sqlx::query_as::<_, DeadLetterRow>(
1274            "SELECT
1275                correlation_id,
1276                event_id,
1277                effect_id,
1278                event_type,
1279                event_payload,
1280                error,
1281                reason,
1282                attempts,
1283                failed_at,
1284                resolved_at
1285             FROM seesaw_dlq
1286             WHERE (NOT $1 OR resolved_at IS NULL)
1287             ORDER BY failed_at DESC
1288             LIMIT $2",
1289        )
1290        .bind(unresolved_only)
1291        .bind(limit as i64)
1292        .fetch_all(&self.pool)
1293        .await?;
1294
1295        Ok(rows
1296            .into_iter()
1297            .map(|row| DeadLetterEntry {
1298                correlation_id: row.correlation_id,
1299                event_id: row.event_id,
1300                effect_id: row.effect_id,
1301                event_type: row.event_type,
1302                event_payload: row.event_payload,
1303                error: row.error,
1304                reason: row.reason,
1305                attempts: row.attempts,
1306                failed_at: row.failed_at,
1307                resolved_at: row.resolved_at,
1308            })
1309            .collect())
1310    }
1311
1312    async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
1313        let rows = sqlx::query_as::<_, FailedWorkflowRow>(
1314            "WITH effect_agg AS (
1315                SELECT
1316                    correlation_id,
1317                    COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
1318                    COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
1319                    MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
1320                    MAX(error) FILTER (WHERE status = 'failed') AS last_error
1321                FROM seesaw_effect_executions
1322                GROUP BY correlation_id
1323             ),
1324             dlq_agg AS (
1325                SELECT
1326                    correlation_id,
1327                    COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
1328                    MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
1329                    MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
1330                FROM seesaw_dlq
1331                GROUP BY correlation_id
1332             )
1333             SELECT
1334                COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
1335                COALESCE(e.failed_effects, 0) AS failed_effects,
1336                COALESCE(e.active_effects, 0) AS active_effects,
1337                COALESCE(d.dead_letters, 0) AS dead_letters,
1338                GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
1339                COALESCE(d.last_dlq_error, e.last_error) AS last_error
1340             FROM effect_agg e
1341             FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
1342             WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
1343             ORDER BY last_failed_at DESC NULLS LAST
1344             LIMIT $1",
1345        )
1346        .bind(limit as i64)
1347        .fetch_all(&self.pool)
1348        .await?;
1349
1350        Ok(rows
1351            .into_iter()
1352            .map(|row| FailedWorkflow {
1353                correlation_id: row.correlation_id,
1354                failed_effects: row.failed_effects,
1355                active_effects: row.active_effects,
1356                dead_letters: row.dead_letters,
1357                last_failed_at: row.last_failed_at,
1358                last_error: row.last_error,
1359            })
1360            .collect())
1361    }
1362}
1363
1364#[derive(FromRow)]
1365struct EffectTreeRow {
1366    event_id: Uuid,
1367    effect_id: String,
1368    status: String,
1369    result: Option<serde_json::Value>,
1370    error: Option<String>,
1371    attempts: i32,
1372    created_at: DateTime<Utc>,
1373    batch_id: Option<Uuid>,
1374    batch_index: Option<i32>,
1375    batch_size: Option<i32>,
1376}
1377
1378fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
1379    let stream_type = match row.stream_type.as_str() {
1380        "event_dispatched" => StreamType::EventDispatched,
1381        "effect_started" => StreamType::EffectStarted,
1382        "effect_completed" => StreamType::EffectCompleted,
1383        "effect_failed" => StreamType::EffectFailed,
1384        _ => StreamType::EventDispatched, // Default fallback
1385    };
1386
1387    // Extract event_type from payload if it's an event
1388    let event_type = if stream_type == StreamType::EventDispatched {
1389        row.payload
1390            .as_ref()
1391            .and_then(|p| p.get("event_type"))
1392            .and_then(|v| v.as_str())
1393            .map(|s| s.to_string())
1394    } else {
1395        None
1396    };
1397
1398    InsightEvent {
1399        seq: row.seq,
1400        stream_type,
1401        correlation_id: row.correlation_id,
1402        event_id: row.event_id,
1403        effect_event_id: row.effect_event_id,
1404        effect_id: row.effect_id,
1405        event_type,
1406        status: row.status,
1407        error: row.error,
1408        payload: row.payload,
1409        created_at: row.created_at,
1410    }
1411}
1412
1413fn build_event_tree(
1414    events: &[EventRow],
1415    effects: &[EffectTreeRow],
1416    parent_id: Option<Uuid>,
1417    event_ids: &HashSet<Uuid>,
1418    is_root_pass: bool,
1419) -> Vec<EventNode> {
1420    events
1421        .iter()
1422        .filter(|event| {
1423            if is_root_pass {
1424                event.parent_id.is_none()
1425                    || event
1426                        .parent_id
1427                        .map(|parent| !event_ids.contains(&parent))
1428                        .unwrap_or(false)
1429            } else {
1430                event.parent_id == parent_id
1431            }
1432        })
1433        .map(|event| {
1434            // Get effects for this event
1435            let event_effects: Vec<EffectNode> = effects
1436                .iter()
1437                .filter(|eff| eff.event_id == event.event_id)
1438                .map(|eff| EffectNode {
1439                    effect_id: eff.effect_id.clone(),
1440                    event_id: eff.event_id,
1441                    status: eff.status.clone(),
1442                    result: eff.result.clone(),
1443                    error: eff.error.clone(),
1444                    attempts: eff.attempts,
1445                    created_at: eff.created_at,
1446                    batch_id: eff.batch_id,
1447                    batch_index: eff.batch_index,
1448                    batch_size: eff.batch_size,
1449                })
1450                .collect();
1451
1452            // Recursively build children
1453            let children =
1454                build_event_tree(events, effects, Some(event.event_id), event_ids, false);
1455
1456            EventNode {
1457                event_id: event.event_id,
1458                event_type: event.event_type.clone(),
1459                payload: event.payload.clone(),
1460                created_at: event.created_at,
1461                batch_id: event.batch_id,
1462                batch_index: event.batch_index,
1463                batch_size: event.batch_size,
1464                children,
1465                effects: event_effects,
1466            }
1467        })
1468        .collect()
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473    use super::*;
1474    use chrono::{TimeZone, Timelike};
1475
1476    #[test]
1477    fn emitted_event_created_at_is_midnight_on_parent_day() {
1478        let parent = Utc
1479            .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
1480            .single()
1481            .expect("valid timestamp");
1482
1483        let emitted = emitted_event_created_at(parent);
1484
1485        assert_eq!(emitted.date_naive(), parent.date_naive());
1486        assert_eq!(emitted.hour(), 0);
1487        assert_eq!(emitted.minute(), 0);
1488        assert_eq!(emitted.second(), 0);
1489    }
1490
1491    #[test]
1492    fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
1493        let first_parent = Utc
1494            .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
1495            .single()
1496            .expect("valid timestamp");
1497        let second_parent = Utc
1498            .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
1499            .single()
1500            .expect("valid timestamp");
1501
1502        let first_emitted = emitted_event_created_at(first_parent);
1503        let second_emitted = emitted_event_created_at(second_parent);
1504
1505        assert_eq!(first_emitted, second_emitted);
1506    }
1507
1508    #[test]
1509    fn effect_retry_delay_seconds_uses_exponential_backoff() {
1510        assert_eq!(effect_retry_delay_seconds(1), 1);
1511        assert_eq!(effect_retry_delay_seconds(2), 2);
1512        assert_eq!(effect_retry_delay_seconds(3), 4);
1513        assert_eq!(effect_retry_delay_seconds(4), 8);
1514    }
1515
1516    #[test]
1517    fn effect_retry_delay_seconds_is_capped() {
1518        assert_eq!(effect_retry_delay_seconds(9), 256);
1519        assert_eq!(effect_retry_delay_seconds(50), 256);
1520    }
1521
1522    #[test]
1523    fn build_event_tree_treats_orphan_parent_as_root() {
1524        let correlation_id = Uuid::new_v4();
1525        let event_id = Uuid::new_v4();
1526        let missing_parent = Uuid::new_v4();
1527        let now = Utc::now();
1528
1529        let events = vec![EventRow {
1530            id: 1,
1531            event_id,
1532            parent_id: Some(missing_parent),
1533            correlation_id,
1534            event_type: "OrphanEvent".to_string(),
1535            payload: serde_json::json!({"ok": true}),
1536            hops: 1,
1537            batch_id: None,
1538            batch_index: None,
1539            batch_size: None,
1540            created_at: now,
1541        }];
1542
1543        let effects: Vec<EffectTreeRow> = Vec::new();
1544        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1545
1546        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1547
1548        assert_eq!(roots.len(), 1);
1549        assert_eq!(roots[0].event_id, event_id);
1550    }
1551}