1use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8 insight::*, EmittedEvent, JoinEntry, QueuedEffectExecution, QueuedEvent, Store,
9 NAMESPACE_SEESAW,
10};
11use serde::{Deserialize, Serialize};
12use sqlx::{FromRow, PgPool};
13use std::collections::HashSet;
14use uuid::Uuid;
15
16const EVENT_CLAIM_SECONDS: i64 = 30;
17
18fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
19 parent_created_at
20 .date_naive()
21 .and_hms_opt(0, 0, 0)
22 .expect("midnight should always be a valid UTC timestamp")
23 .and_utc()
24}
25
26fn effect_retry_delay_seconds(attempts: i32) -> i64 {
27 let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
28 1_i64 << exponent
29}
30
31pub struct PostgresStore {
33 pool: PgPool,
34}
35
36impl PostgresStore {
37 pub fn new(pool: PgPool) -> Self {
38 Self { pool }
39 }
40
41 pub fn pool(&self) -> &PgPool {
42 &self.pool
43 }
44}
45
46impl Clone for PostgresStore {
47 fn clone(&self) -> Self {
48 Self {
49 pool: self.pool.clone(),
50 }
51 }
52}
53
54#[derive(FromRow)]
55struct EventRow {
56 id: i64,
57 event_id: Uuid,
58 parent_id: Option<Uuid>,
59 correlation_id: Uuid,
60 event_type: String,
61 payload: serde_json::Value,
62 hops: i32,
63 batch_id: Option<Uuid>,
64 batch_index: Option<i32>,
65 batch_size: Option<i32>,
66 created_at: DateTime<Utc>,
67}
68
69#[derive(FromRow)]
70struct StateRow {
71 state: serde_json::Value,
72 version: i32,
73}
74
75#[derive(FromRow)]
76struct EffectRow {
77 event_id: Uuid,
78 effect_id: String,
79 correlation_id: Uuid,
80 event_type: String,
81 event_payload: serde_json::Value,
82 parent_event_id: Option<Uuid>,
83 batch_id: Option<Uuid>,
84 batch_index: Option<i32>,
85 batch_size: Option<i32>,
86 execute_at: DateTime<Utc>,
87 timeout_seconds: i32,
88 max_attempts: i32,
89 priority: i32,
90 attempts: i32,
91}
92
93#[derive(FromRow)]
94struct ParentEventRow {
95 hops: i32,
96 created_at: DateTime<Utc>,
97}
98
99#[derive(FromRow)]
100struct WorkflowEventRow {
101 id: i64,
102 event_id: Uuid,
103 correlation_id: Uuid,
104 event_type: String,
105 payload: serde_json::Value,
106 created_at: DateTime<Utc>,
107}
108
109#[async_trait]
110impl Store for PostgresStore {
111 async fn publish(&self, event: QueuedEvent) -> Result<()> {
112 let mut tx = self.pool.begin().await?;
113
114 let inserted: Option<Uuid> = sqlx::query_scalar(
117 "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
118 VALUES ($1, $2, $3)
119 ON CONFLICT (event_id) DO NOTHING
120 RETURNING event_id",
121 )
122 .bind(event.event_id)
123 .bind(event.correlation_id)
124 .bind(event.created_at)
125 .fetch_optional(&mut *tx)
126 .await?;
127
128 if inserted.is_none() {
129 tx.commit().await?;
130 return Ok(());
131 }
132
133 sqlx::query(
134 "INSERT INTO seesaw_events (
135 event_id, parent_id, correlation_id, event_type, payload, hops,
136 batch_id, batch_index, batch_size, created_at
137 )
138 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
139 )
140 .bind(event.event_id)
141 .bind(event.parent_id)
142 .bind(event.correlation_id)
143 .bind(event.event_type)
144 .bind(event.payload)
145 .bind(event.hops)
146 .bind(event.batch_id)
147 .bind(event.batch_index)
148 .bind(event.batch_size)
149 .bind(event.created_at)
150 .execute(&mut *tx)
151 .await?;
152
153 tx.commit().await?;
154
155 Ok(())
156 }
157
158 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
159 let row: Option<EventRow> = sqlx::query_as(
160 "WITH next_event AS (
161 SELECT e.id
162 FROM seesaw_events e
163 WHERE e.processed_at IS NULL
164 AND (e.locked_until IS NULL OR e.locked_until < NOW())
165 AND NOT EXISTS (
166 SELECT 1
167 FROM seesaw_events older
168 WHERE older.correlation_id = e.correlation_id
169 AND older.processed_at IS NULL
170 AND (
171 older.created_at < e.created_at
172 OR (older.created_at = e.created_at AND older.id < e.id)
173 )
174 )
175 ORDER BY e.created_at ASC, e.id ASC
176 LIMIT 1
177 FOR UPDATE SKIP LOCKED
178 )
179 UPDATE seesaw_events e
180 SET locked_until = NOW() + ($1 * INTERVAL '1 second')
181 FROM next_event
182 WHERE e.id = next_event.id
183 RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload,
184 e.hops, e.batch_id, e.batch_index, e.batch_size, e.created_at",
185 )
186 .bind(EVENT_CLAIM_SECONDS)
187 .fetch_optional(&self.pool)
188 .await?;
189
190 Ok(row.map(|r| QueuedEvent {
191 id: r.id,
192 event_id: r.event_id,
193 parent_id: r.parent_id,
194 correlation_id: r.correlation_id,
195 event_type: r.event_type,
196 payload: r.payload,
197 hops: r.hops,
198 batch_id: r.batch_id,
199 batch_index: r.batch_index,
200 batch_size: r.batch_size,
201 created_at: r.created_at,
202 }))
203 }
204
205 async fn ack(&self, id: i64) -> Result<()> {
206 sqlx::query(
207 "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
208 )
209 .bind(id)
210 .execute(&self.pool)
211 .await?;
212 Ok(())
213 }
214
215 async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
216 let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
217 sqlx::query(
218 "UPDATE seesaw_events
219 SET retry_count = retry_count + 1,
220 locked_until = $2
221 WHERE id = $1",
222 )
223 .bind(id)
224 .bind(locked_until)
225 .execute(&self.pool)
226 .await?;
227 Ok(())
228 }
229
230 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
231 where
232 S: for<'de> Deserialize<'de> + Send,
233 {
234 let row: Option<StateRow> =
235 sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
236 .bind(correlation_id)
237 .fetch_optional(&self.pool)
238 .await?;
239
240 match row {
241 Some(r) => {
242 let state: S = serde_json::from_value(r.state)?;
243 Ok(Some((state, r.version)))
244 }
245 None => Ok(None),
246 }
247 }
248
249 async fn save_state<S>(
250 &self,
251 correlation_id: Uuid,
252 state: &S,
253 expected_version: i32,
254 ) -> Result<i32>
255 where
256 S: Serialize + Send + Sync,
257 {
258 let state_json = serde_json::to_value(state)?;
259 let new_version = expected_version + 1;
260
261 let result = sqlx::query(
262 "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
263 VALUES ($1, $2, $3, NOW())
264 ON CONFLICT (correlation_id) DO UPDATE
265 SET state = $2,
266 version = $3,
267 updated_at = NOW()
268 WHERE seesaw_state.version = $4",
269 )
270 .bind(correlation_id)
271 .bind(&state_json)
272 .bind(new_version)
273 .bind(expected_version)
274 .execute(&self.pool)
275 .await?;
276
277 if result.rows_affected() == 0 {
278 anyhow::bail!("Version conflict: state was modified concurrently");
279 }
280
281 Ok(new_version)
282 }
283
284 async fn insert_effect_intent(
285 &self,
286 event_id: Uuid,
287 effect_id: String,
288 correlation_id: Uuid,
289 event_type: String,
290 event_payload: serde_json::Value,
291 parent_event_id: Option<Uuid>,
292 batch_id: Option<Uuid>,
293 batch_index: Option<i32>,
294 batch_size: Option<i32>,
295 execute_at: DateTime<Utc>,
296 timeout_seconds: i32,
297 max_attempts: i32,
298 priority: i32,
299 ) -> Result<()> {
300 sqlx::query(
301 "INSERT INTO seesaw_effect_executions (
302 event_id, effect_id, correlation_id, status,
303 event_type, event_payload, parent_event_id,
304 batch_id, batch_index, batch_size,
305 execute_at, timeout_seconds, max_attempts, priority
306 )
307 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
308 )
309 .bind(event_id)
310 .bind(effect_id)
311 .bind(correlation_id)
312 .bind(event_type)
313 .bind(event_payload)
314 .bind(parent_event_id)
315 .bind(batch_id)
316 .bind(batch_index)
317 .bind(batch_size)
318 .bind(execute_at)
319 .bind(timeout_seconds)
320 .bind(max_attempts)
321 .bind(priority)
322 .execute(&self.pool)
323 .await?;
324
325 Ok(())
326 }
327
328 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
329 let row: Option<EffectRow> = sqlx::query_as(
330 "WITH next_effect AS (
331 SELECT event_id, effect_id
332 FROM seesaw_effect_executions
333 WHERE (
334 status = 'pending'
335 OR (status = 'failed' AND attempts < max_attempts)
336 )
337 AND execute_at <= NOW()
338 ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
339 LIMIT 1
340 FOR UPDATE SKIP LOCKED
341 )
342 UPDATE seesaw_effect_executions e
343 SET status = 'executing',
344 claimed_at = NOW(),
345 last_attempted_at = NOW(),
346 attempts = e.attempts + 1
347 FROM next_effect
348 WHERE e.event_id = next_effect.event_id
349 AND e.effect_id = next_effect.effect_id
350 RETURNING
351 e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
352 e.batch_id, e.batch_index, e.batch_size,
353 e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
354 )
355 .fetch_optional(&self.pool)
356 .await?;
357
358 if let Some(r) = row {
359 Ok(Some(QueuedEffectExecution {
360 event_id: r.event_id,
361 effect_id: r.effect_id,
362 correlation_id: r.correlation_id,
363 event_type: r.event_type,
364 event_payload: r.event_payload,
365 parent_event_id: r.parent_event_id,
366 batch_id: r.batch_id,
367 batch_index: r.batch_index,
368 batch_size: r.batch_size,
369 execute_at: r.execute_at,
370 timeout_seconds: r.timeout_seconds,
371 max_attempts: r.max_attempts,
372 priority: r.priority,
373 attempts: r.attempts,
374 }))
375 } else {
376 Ok(None)
377 }
378 }
379
380 async fn complete_effect(
381 &self,
382 event_id: Uuid,
383 effect_id: String,
384 result: serde_json::Value,
385 ) -> Result<()> {
386 sqlx::query(
387 "UPDATE seesaw_effect_executions
388 SET status = 'completed',
389 result = $3,
390 completed_at = NOW()
391 WHERE event_id = $1 AND effect_id = $2",
392 )
393 .bind(event_id)
394 .bind(effect_id)
395 .bind(result)
396 .execute(&self.pool)
397 .await?;
398
399 Ok(())
400 }
401
402 async fn complete_effect_with_events(
403 &self,
404 event_id: Uuid,
405 effect_id: String,
406 result: serde_json::Value,
407 emitted_events: Vec<EmittedEvent>,
408 ) -> Result<()> {
409 let effect: EffectRow = sqlx::query_as(
411 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
412 batch_id, batch_index, batch_size,
413 execute_at, timeout_seconds, max_attempts, priority, attempts
414 FROM seesaw_effect_executions
415 WHERE event_id = $1 AND effect_id = $2",
416 )
417 .bind(event_id)
418 .bind(&effect_id)
419 .fetch_one(&self.pool)
420 .await?;
421
422 let parent: ParentEventRow = sqlx::query_as(
424 "SELECT hops, created_at
425 FROM seesaw_events
426 WHERE event_id = $1
427 ORDER BY created_at ASC, id ASC
428 LIMIT 1",
429 )
430 .bind(event_id)
431 .fetch_one(&self.pool)
432 .await?;
433
434 let mut tx = self.pool.begin().await?;
436
437 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
439 let deterministic_id = Uuid::new_v5(
442 &NAMESPACE_SEESAW,
443 format!(
444 "{}-{}-{}-{}",
445 event_id, effect_id, emitted.event_type, emitted_index
446 )
447 .as_bytes(),
448 );
449
450 let deterministic_timestamp = emitted_event_created_at(parent.created_at);
453
454 sqlx::query(
456 "INSERT INTO seesaw_events (
457 event_id, parent_id, correlation_id, event_type, payload, hops,
458 batch_id, batch_index, batch_size, created_at
459 )
460 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
461 ON CONFLICT (event_id, created_at) DO NOTHING",
462 )
463 .bind(deterministic_id)
464 .bind(Some(event_id))
465 .bind(effect.correlation_id)
466 .bind(&emitted.event_type)
467 .bind(emitted.payload)
468 .bind(parent.hops + 1)
469 .bind(emitted.batch_id)
470 .bind(emitted.batch_index)
471 .bind(emitted.batch_size)
472 .bind(deterministic_timestamp)
473 .execute(&mut *tx)
474 .await?;
475 }
476
477 sqlx::query(
479 "UPDATE seesaw_effect_executions
480 SET status = 'completed',
481 result = $3,
482 completed_at = NOW()
483 WHERE event_id = $1 AND effect_id = $2",
484 )
485 .bind(event_id)
486 .bind(effect_id)
487 .bind(result)
488 .execute(&mut *tx)
489 .await?;
490
491 tx.commit().await?;
493
494 Ok(())
495 }
496
497 async fn fail_effect(
498 &self,
499 event_id: Uuid,
500 effect_id: String,
501 error: String,
502 attempts: i32,
503 ) -> Result<()> {
504 let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
505 sqlx::query(
506 "UPDATE seesaw_effect_executions
507 SET status = 'failed',
508 error = $3,
509 execute_at = $5,
510 claimed_at = NULL
511 WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
512 )
513 .bind(event_id)
514 .bind(effect_id)
515 .bind(error)
516 .bind(attempts)
517 .bind(retry_at)
518 .execute(&self.pool)
519 .await?;
520
521 Ok(())
522 }
523
524 async fn dlq_effect(
525 &self,
526 event_id: Uuid,
527 effect_id: String,
528 error: String,
529 reason: String,
530 attempts: i32,
531 ) -> Result<()> {
532 self.dlq_effect_with_events(event_id, effect_id, error, reason, attempts, Vec::new())
533 .await
534 }
535
536 async fn dlq_effect_with_events(
537 &self,
538 event_id: Uuid,
539 effect_id: String,
540 error: String,
541 reason: String,
542 attempts: i32,
543 emitted_events: Vec<EmittedEvent>,
544 ) -> Result<()> {
545 let effect: EffectRow = sqlx::query_as(
547 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
548 batch_id, batch_index, batch_size,
549 execute_at, timeout_seconds, max_attempts, priority, attempts
550 FROM seesaw_effect_executions
551 WHERE event_id = $1 AND effect_id = $2",
552 )
553 .bind(event_id)
554 .bind(&effect_id)
555 .fetch_one(&self.pool)
556 .await?;
557
558 let parent = sqlx::query_as::<_, ParentEventRow>(
559 "SELECT hops, created_at
560 FROM seesaw_events
561 WHERE event_id = $1
562 ORDER BY created_at ASC, id ASC
563 LIMIT 1",
564 )
565 .bind(event_id)
566 .fetch_optional(&self.pool)
567 .await?;
568
569 let mut tx = self.pool.begin().await?;
570
571 sqlx::query(
573 "INSERT INTO seesaw_dlq (
574 event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
575 )
576 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
577 )
578 .bind(event_id)
579 .bind(&effect_id)
580 .bind(effect.correlation_id)
581 .bind(&error)
582 .bind(&effect.event_type)
583 .bind(&effect.event_payload)
584 .bind(&reason)
585 .bind(attempts)
586 .execute(&mut *tx)
587 .await?;
588
589 let synthetic_created_at = parent
590 .as_ref()
591 .map(|row| emitted_event_created_at(row.created_at))
592 .unwrap_or_else(Utc::now);
593 let synthetic_hops = parent.as_ref().map(|row| row.hops + 1).unwrap_or(0);
594
595 if emitted_events.is_empty() {
596 if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
597 (effect.batch_id, effect.batch_index, effect.batch_size)
598 {
599 let synthetic_event_id = Uuid::new_v5(
600 &NAMESPACE_SEESAW,
601 format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
602 );
603
604 sqlx::query(
605 "INSERT INTO seesaw_events (
606 event_id, parent_id, correlation_id, event_type, payload, hops,
607 batch_id, batch_index, batch_size, created_at
608 )
609 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
610 ON CONFLICT (event_id, created_at) DO NOTHING",
611 )
612 .bind(synthetic_event_id)
613 .bind(Some(event_id))
614 .bind(effect.correlation_id)
615 .bind(&effect.event_type)
616 .bind(&effect.event_payload)
617 .bind(synthetic_hops)
618 .bind(Some(batch_id))
619 .bind(Some(batch_index))
620 .bind(Some(batch_size))
621 .bind(synthetic_created_at)
622 .execute(&mut *tx)
623 .await?;
624 }
625 } else {
626 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
627 let synthetic_event_id = Uuid::new_v5(
628 &NAMESPACE_SEESAW,
629 format!(
630 "{}-{}-dlq-terminal-{}-{}",
631 event_id, effect_id, emitted.event_type, emitted_index
632 )
633 .as_bytes(),
634 );
635
636 sqlx::query(
637 "INSERT INTO seesaw_events (
638 event_id, parent_id, correlation_id, event_type, payload, hops,
639 batch_id, batch_index, batch_size, created_at
640 )
641 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
642 ON CONFLICT (event_id, created_at) DO NOTHING",
643 )
644 .bind(synthetic_event_id)
645 .bind(Some(event_id))
646 .bind(effect.correlation_id)
647 .bind(&emitted.event_type)
648 .bind(emitted.payload)
649 .bind(synthetic_hops)
650 .bind(emitted.batch_id.or(effect.batch_id))
651 .bind(emitted.batch_index.or(effect.batch_index))
652 .bind(emitted.batch_size.or(effect.batch_size))
653 .bind(synthetic_created_at)
654 .execute(&mut *tx)
655 .await?;
656 }
657 }
658
659 sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
661 .bind(event_id)
662 .bind(&effect_id)
663 .execute(&mut *tx)
664 .await?;
665
666 tx.commit().await?;
667
668 Ok(())
669 }
670
671 async fn subscribe_workflow_events(
672 &self,
673 correlation_id: Uuid,
674 ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
675 use sqlx::postgres::PgListener;
676
677 let channel = format!("seesaw_workflow_{}", correlation_id);
678 const PAGE_SIZE: i64 = 256;
679 const CATCH_UP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
680
681 let initial_cursor: Option<(DateTime<Utc>, i64)> = sqlx::query_as(
684 "SELECT created_at, id
685 FROM seesaw_events
686 WHERE correlation_id = $1
687 ORDER BY created_at DESC, id DESC
688 LIMIT 1",
689 )
690 .bind(correlation_id)
691 .fetch_optional(&self.pool)
692 .await?;
693
694 let mut listener = PgListener::connect_with(&self.pool).await?;
696 listener.listen(&channel).await?;
697
698 let pool = self.pool.clone();
699 let (tx, rx) = futures::channel::mpsc::unbounded::<seesaw_core::WorkflowEvent>();
700
701 tokio::spawn(async move {
702 let mut cursor = initial_cursor;
703 let mut drain_pending = true;
704
705 loop {
706 if !drain_pending {
707 match tokio::time::timeout(CATCH_UP_INTERVAL, listener.recv()).await {
708 Ok(Ok(_notification)) => {}
709 Ok(Err(error)) => {
710 tracing::warn!(
711 "workflow listener recv failed for {}: {}",
712 correlation_id,
713 error
714 );
715 return;
716 }
717 Err(_) => {}
718 }
719 }
720 drain_pending = false;
721
722 loop {
723 let rows_result: std::result::Result<Vec<WorkflowEventRow>, sqlx::Error> =
724 if let Some((created_at, id)) = cursor {
725 sqlx::query_as(
726 "SELECT id, event_id, correlation_id, event_type, payload, created_at
727 FROM seesaw_events
728 WHERE correlation_id = $1
729 AND (
730 created_at > $2
731 OR (created_at = $2 AND id > $3)
732 )
733 ORDER BY created_at ASC, id ASC
734 LIMIT $4",
735 )
736 .bind(correlation_id)
737 .bind(created_at)
738 .bind(id)
739 .bind(PAGE_SIZE)
740 .fetch_all(&pool)
741 .await
742 } else {
743 sqlx::query_as(
744 "SELECT id, event_id, correlation_id, event_type, payload, created_at
745 FROM seesaw_events
746 WHERE correlation_id = $1
747 ORDER BY created_at ASC, id ASC
748 LIMIT $2",
749 )
750 .bind(correlation_id)
751 .bind(PAGE_SIZE)
752 .fetch_all(&pool)
753 .await
754 };
755
756 let rows = match rows_result {
757 Ok(rows) => rows,
758 Err(error) => {
759 tracing::warn!(
760 "workflow event query failed for {}: {}",
761 correlation_id,
762 error
763 );
764 return;
765 }
766 };
767
768 if rows.is_empty() {
769 break;
770 }
771
772 for row in rows {
773 cursor = Some((row.created_at, row.id));
774 if tx
775 .unbounded_send(seesaw_core::WorkflowEvent {
776 event_id: row.event_id,
777 correlation_id: row.correlation_id,
778 event_type: row.event_type,
779 payload: row.payload,
780 })
781 .is_err()
782 {
783 return;
784 }
785 }
786 }
787 }
788 });
789
790 Ok(Box::new(rx))
791 }
792
793 async fn get_workflow_status(
794 &self,
795 correlation_id: Uuid,
796 ) -> Result<seesaw_core::WorkflowStatus> {
797 let state = sqlx::query_as::<_, (serde_json::Value,)>(
798 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
799 )
800 .bind(correlation_id)
801 .fetch_optional(&self.pool)
802 .await?
803 .map(|r| r.0);
804
805 let pending_effects = sqlx::query_as::<_, (i64,)>(
806 "SELECT COUNT(*) FROM seesaw_effect_executions
807 WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
808 )
809 .bind(correlation_id)
810 .fetch_one(&self.pool)
811 .await?
812 .0;
813
814 let last_event = sqlx::query_as::<_, (String,)>(
815 "SELECT event_type FROM seesaw_events
816 WHERE correlation_id = $1
817 ORDER BY created_at DESC, id DESC
818 LIMIT 1",
819 )
820 .bind(correlation_id)
821 .fetch_optional(&self.pool)
822 .await?
823 .map(|r| r.0);
824
825 Ok(seesaw_core::WorkflowStatus {
826 correlation_id,
827 state,
828 pending_effects,
829 is_settled: pending_effects == 0,
830 last_event,
831 })
832 }
833
834 async fn join_same_batch_append_and_maybe_claim(
835 &self,
836 join_effect_id: String,
837 correlation_id: Uuid,
838 source_event_id: Uuid,
839 source_event_type: String,
840 source_payload: serde_json::Value,
841 source_created_at: DateTime<Utc>,
842 batch_id: Uuid,
843 batch_index: i32,
844 batch_size: i32,
845 ) -> Result<Option<Vec<JoinEntry>>> {
846 let mut tx = self.pool.begin().await?;
847
848 sqlx::query(
849 "INSERT INTO seesaw_join_entries (
850 join_effect_id, correlation_id, source_event_id, source_event_type, source_payload,
851 source_created_at, batch_id, batch_index, batch_size
852 )
853 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
854 ON CONFLICT (join_effect_id, correlation_id, source_event_id) DO NOTHING",
855 )
856 .bind(&join_effect_id)
857 .bind(correlation_id)
858 .bind(source_event_id)
859 .bind(&source_event_type)
860 .bind(source_payload)
861 .bind(source_created_at)
862 .bind(batch_id)
863 .bind(batch_index)
864 .bind(batch_size)
865 .execute(&mut *tx)
866 .await?;
867
868 sqlx::query(
869 "INSERT INTO seesaw_join_windows (
870 join_effect_id, correlation_id, mode, batch_id, target_count, status
871 )
872 VALUES ($1, $2, 'same_batch', $3, $4, 'open')
873 ON CONFLICT (join_effect_id, correlation_id, batch_id) DO NOTHING",
874 )
875 .bind(&join_effect_id)
876 .bind(correlation_id)
877 .bind(batch_id)
878 .bind(batch_size)
879 .execute(&mut *tx)
880 .await?;
881
882 sqlx::query(
883 "UPDATE seesaw_join_windows
884 SET target_count = $4,
885 updated_at = NOW()
886 WHERE join_effect_id = $1
887 AND correlation_id = $2
888 AND batch_id = $3
889 AND target_count <> $4",
890 )
891 .bind(&join_effect_id)
892 .bind(correlation_id)
893 .bind(batch_id)
894 .bind(batch_size)
895 .execute(&mut *tx)
896 .await?;
897
898 let claimed: Option<(String,)> = sqlx::query_as(
899 "UPDATE seesaw_join_windows w
900 SET status = 'processing',
901 sealed_at = COALESCE(w.sealed_at, NOW()),
902 processing_started_at = NOW(),
903 updated_at = NOW(),
904 last_error = NULL
905 WHERE w.join_effect_id = $1
906 AND w.correlation_id = $2
907 AND w.batch_id = $3
908 AND w.status = 'open'
909 AND (
910 SELECT COUNT(*)::int
911 FROM seesaw_join_entries e
912 WHERE e.join_effect_id = w.join_effect_id
913 AND e.correlation_id = w.correlation_id
914 AND e.batch_id = w.batch_id
915 ) >= w.target_count
916 RETURNING w.join_effect_id",
917 )
918 .bind(&join_effect_id)
919 .bind(correlation_id)
920 .bind(batch_id)
921 .fetch_optional(&mut *tx)
922 .await?;
923
924 if claimed.is_none() {
925 tx.commit().await?;
926 return Ok(None);
927 }
928
929 let rows = sqlx::query_as::<_, (Uuid, String, serde_json::Value, Uuid, i32, i32, DateTime<Utc>)>(
930 "SELECT source_event_id, source_event_type, source_payload, batch_id, batch_index, batch_size, source_created_at
931 FROM seesaw_join_entries
932 WHERE join_effect_id = $1
933 AND correlation_id = $2
934 AND batch_id = $3
935 ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
936 )
937 .bind(&join_effect_id)
938 .bind(correlation_id)
939 .bind(batch_id)
940 .fetch_all(&mut *tx)
941 .await?;
942
943 let entries = rows
944 .into_iter()
945 .map(
946 |(
947 source_event_id,
948 event_type,
949 payload,
950 batch_id,
951 batch_index,
952 batch_size,
953 created_at,
954 )| JoinEntry {
955 source_event_id,
956 event_type,
957 payload,
958 batch_id,
959 batch_index,
960 batch_size,
961 created_at,
962 },
963 )
964 .collect::<Vec<_>>();
965
966 tx.commit().await?;
967 Ok(Some(entries))
968 }
969
970 async fn join_same_batch_complete(
971 &self,
972 join_effect_id: String,
973 correlation_id: Uuid,
974 batch_id: Uuid,
975 ) -> Result<()> {
976 let mut tx = self.pool.begin().await?;
977
978 sqlx::query(
979 "UPDATE seesaw_join_windows
980 SET status = 'completed',
981 completed_at = NOW(),
982 updated_at = NOW()
983 WHERE join_effect_id = $1
984 AND correlation_id = $2
985 AND batch_id = $3",
986 )
987 .bind(&join_effect_id)
988 .bind(correlation_id)
989 .bind(batch_id)
990 .execute(&mut *tx)
991 .await?;
992
993 sqlx::query(
994 "DELETE FROM seesaw_join_entries
995 WHERE join_effect_id = $1
996 AND correlation_id = $2
997 AND batch_id = $3",
998 )
999 .bind(&join_effect_id)
1000 .bind(correlation_id)
1001 .bind(batch_id)
1002 .execute(&mut *tx)
1003 .await?;
1004
1005 sqlx::query(
1006 "DELETE FROM seesaw_join_windows
1007 WHERE join_effect_id = $1
1008 AND correlation_id = $2
1009 AND batch_id = $3",
1010 )
1011 .bind(&join_effect_id)
1012 .bind(correlation_id)
1013 .bind(batch_id)
1014 .execute(&mut *tx)
1015 .await?;
1016
1017 tx.commit().await?;
1018 Ok(())
1019 }
1020
1021 async fn join_same_batch_release(
1022 &self,
1023 join_effect_id: String,
1024 correlation_id: Uuid,
1025 batch_id: Uuid,
1026 error: String,
1027 ) -> Result<()> {
1028 sqlx::query(
1029 "UPDATE seesaw_join_windows
1030 SET status = 'open',
1031 processing_started_at = NULL,
1032 last_error = $4,
1033 updated_at = NOW()
1034 WHERE join_effect_id = $1
1035 AND correlation_id = $2
1036 AND batch_id = $3
1037 AND status = 'processing'",
1038 )
1039 .bind(&join_effect_id)
1040 .bind(correlation_id)
1041 .bind(batch_id)
1042 .bind(error)
1043 .execute(&self.pool)
1044 .await?;
1045
1046 Ok(())
1047 }
1048}
1049
1050#[derive(FromRow)]
1051struct StreamRow {
1052 seq: i64,
1053 stream_type: String,
1054 correlation_id: Uuid,
1055 event_id: Option<Uuid>,
1056 effect_event_id: Option<Uuid>,
1057 effect_id: Option<String>,
1058 status: Option<String>,
1059 error: Option<String>,
1060 payload: Option<serde_json::Value>,
1061 created_at: DateTime<Utc>,
1062}
1063
1064#[derive(FromRow)]
1065struct EffectLogRow {
1066 correlation_id: Uuid,
1067 event_id: Uuid,
1068 effect_id: String,
1069 status: String,
1070 attempts: i32,
1071 event_type: String,
1072 result: Option<serde_json::Value>,
1073 error: Option<String>,
1074 created_at: DateTime<Utc>,
1075 execute_at: DateTime<Utc>,
1076 claimed_at: Option<DateTime<Utc>>,
1077 last_attempted_at: Option<DateTime<Utc>>,
1078 completed_at: Option<DateTime<Utc>>,
1079}
1080
1081#[derive(FromRow)]
1082struct DeadLetterRow {
1083 correlation_id: Uuid,
1084 event_id: Uuid,
1085 effect_id: String,
1086 event_type: String,
1087 event_payload: serde_json::Value,
1088 error: String,
1089 reason: String,
1090 attempts: i32,
1091 failed_at: DateTime<Utc>,
1092 resolved_at: Option<DateTime<Utc>>,
1093}
1094
1095#[derive(FromRow)]
1096struct FailedWorkflowRow {
1097 correlation_id: Uuid,
1098 failed_effects: i64,
1099 active_effects: i64,
1100 dead_letters: i64,
1101 last_failed_at: Option<DateTime<Utc>>,
1102 last_error: Option<String>,
1103}
1104
1105#[async_trait]
1106impl InsightStore for PostgresStore {
1107 async fn subscribe_events(
1108 &self,
1109 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
1110 use futures::stream::StreamExt;
1111 use sqlx::postgres::PgListener;
1112
1113 let mut listener = PgListener::connect_with(&self.pool).await?;
1115 listener.listen("seesaw_stream").await?;
1116
1117 let pool = self.pool.clone();
1119 let stream = listener.into_stream().filter_map(move |result| {
1120 let pool = pool.clone();
1121 Box::pin(async move {
1122 match result {
1123 Ok(_notification) => {
1124 if let Ok(row) = sqlx::query_as::<_, StreamRow>(
1127 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1128 effect_id, status, error, payload, created_at
1129 FROM seesaw_stream
1130 ORDER BY seq DESC
1131 LIMIT 1",
1132 )
1133 .fetch_one(&pool)
1134 .await
1135 {
1136 Some(stream_row_to_insight_event(row))
1137 } else {
1138 None
1139 }
1140 }
1141 Err(_) => None,
1142 }
1143 })
1144 });
1145
1146 Ok(Box::new(stream))
1147 }
1148
1149 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
1150 let events = sqlx::query_as::<_, EventRow>(
1152 "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
1153 batch_id, batch_index, batch_size, created_at
1154 FROM seesaw_events
1155 WHERE correlation_id = $1
1156 ORDER BY created_at ASC",
1157 )
1158 .bind(correlation_id)
1159 .fetch_all(&self.pool)
1160 .await?;
1161
1162 let effects = sqlx::query_as::<_, EffectTreeRow>(
1164 "SELECT event_id, effect_id, status, result, error, attempts, created_at,
1165 batch_id, batch_index, batch_size
1166 FROM seesaw_effect_executions
1167 WHERE correlation_id = $1
1168 ORDER BY created_at ASC",
1169 )
1170 .bind(correlation_id)
1171 .fetch_all(&self.pool)
1172 .await?;
1173
1174 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1176 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1177
1178 let state = sqlx::query_as::<_, (serde_json::Value,)>(
1180 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
1181 )
1182 .bind(correlation_id)
1183 .fetch_optional(&self.pool)
1184 .await?
1185 .map(|r| r.0);
1186
1187 Ok(WorkflowTree {
1188 correlation_id,
1189 roots,
1190 state,
1191 event_count: events.len(),
1192 effect_count: effects.len(),
1193 })
1194 }
1195
1196 async fn get_stats(&self) -> Result<InsightStats> {
1197 let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
1198 .fetch_one(&self.pool)
1199 .await?
1200 .0;
1201
1202 let active_effects = sqlx::query_as::<_, (i64,)>(
1203 "SELECT COUNT(*) FROM seesaw_effect_executions
1204 WHERE status IN ('pending', 'executing')",
1205 )
1206 .fetch_one(&self.pool)
1207 .await?
1208 .0;
1209
1210 let completed_effects = sqlx::query_as::<_, (i64,)>(
1211 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
1212 )
1213 .fetch_one(&self.pool)
1214 .await?
1215 .0;
1216
1217 let failed_effects = sqlx::query_as::<_, (i64,)>(
1218 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
1219 )
1220 .fetch_one(&self.pool)
1221 .await?
1222 .0;
1223
1224 Ok(InsightStats {
1225 total_events,
1226 active_effects,
1227 completed_effects,
1228 failed_effects,
1229 })
1230 }
1231
1232 async fn get_recent_events(
1233 &self,
1234 cursor: Option<i64>,
1235 limit: usize,
1236 ) -> Result<Vec<InsightEvent>> {
1237 let rows = if let Some(cursor_seq) = cursor {
1238 sqlx::query_as::<_, StreamRow>(
1239 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1240 effect_id, status, error, payload, created_at
1241 FROM seesaw_stream
1242 WHERE seq > $1
1243 ORDER BY seq ASC
1244 LIMIT $2",
1245 )
1246 .bind(cursor_seq)
1247 .bind(limit as i64)
1248 .fetch_all(&self.pool)
1249 .await?
1250 } else {
1251 sqlx::query_as::<_, StreamRow>(
1252 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1253 effect_id, status, error, payload, created_at
1254 FROM seesaw_stream
1255 ORDER BY seq DESC
1256 LIMIT $1",
1257 )
1258 .bind(limit as i64)
1259 .fetch_all(&self.pool)
1260 .await?
1261 };
1262
1263 Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
1264 }
1265
1266 async fn get_effect_logs(
1267 &self,
1268 correlation_id: Option<Uuid>,
1269 limit: usize,
1270 ) -> Result<Vec<EffectExecutionLog>> {
1271 let rows = sqlx::query_as::<_, EffectLogRow>(
1272 "SELECT
1273 correlation_id,
1274 event_id,
1275 effect_id,
1276 status,
1277 attempts,
1278 event_type,
1279 result,
1280 error,
1281 created_at,
1282 execute_at,
1283 claimed_at,
1284 last_attempted_at,
1285 completed_at
1286 FROM seesaw_effect_executions
1287 WHERE ($1::uuid IS NULL OR correlation_id = $1)
1288 ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
1289 LIMIT $2",
1290 )
1291 .bind(correlation_id)
1292 .bind(limit as i64)
1293 .fetch_all(&self.pool)
1294 .await?;
1295
1296 Ok(rows
1297 .into_iter()
1298 .map(|row| {
1299 let started_at = row.claimed_at.or(row.last_attempted_at);
1300 let duration_ms = match (started_at, row.completed_at) {
1301 (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
1302 _ => None,
1303 };
1304
1305 EffectExecutionLog {
1306 correlation_id: row.correlation_id,
1307 event_id: row.event_id,
1308 effect_id: row.effect_id,
1309 status: row.status,
1310 attempts: row.attempts,
1311 event_type: Some(row.event_type),
1312 result: row.result,
1313 error: row.error,
1314 created_at: row.created_at,
1315 execute_at: Some(row.execute_at),
1316 claimed_at: row.claimed_at,
1317 last_attempted_at: row.last_attempted_at,
1318 completed_at: row.completed_at,
1319 duration_ms,
1320 }
1321 })
1322 .collect())
1323 }
1324
1325 async fn get_dead_letters(
1326 &self,
1327 unresolved_only: bool,
1328 limit: usize,
1329 ) -> Result<Vec<DeadLetterEntry>> {
1330 let rows = sqlx::query_as::<_, DeadLetterRow>(
1331 "SELECT
1332 correlation_id,
1333 event_id,
1334 effect_id,
1335 event_type,
1336 event_payload,
1337 error,
1338 reason,
1339 attempts,
1340 failed_at,
1341 resolved_at
1342 FROM seesaw_dlq
1343 WHERE (NOT $1 OR resolved_at IS NULL)
1344 ORDER BY failed_at DESC
1345 LIMIT $2",
1346 )
1347 .bind(unresolved_only)
1348 .bind(limit as i64)
1349 .fetch_all(&self.pool)
1350 .await?;
1351
1352 Ok(rows
1353 .into_iter()
1354 .map(|row| DeadLetterEntry {
1355 correlation_id: row.correlation_id,
1356 event_id: row.event_id,
1357 effect_id: row.effect_id,
1358 event_type: row.event_type,
1359 event_payload: row.event_payload,
1360 error: row.error,
1361 reason: row.reason,
1362 attempts: row.attempts,
1363 failed_at: row.failed_at,
1364 resolved_at: row.resolved_at,
1365 })
1366 .collect())
1367 }
1368
1369 async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
1370 let rows = sqlx::query_as::<_, FailedWorkflowRow>(
1371 "WITH effect_agg AS (
1372 SELECT
1373 correlation_id,
1374 COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
1375 COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
1376 MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
1377 MAX(error) FILTER (WHERE status = 'failed') AS last_error
1378 FROM seesaw_effect_executions
1379 GROUP BY correlation_id
1380 ),
1381 dlq_agg AS (
1382 SELECT
1383 correlation_id,
1384 COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
1385 MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
1386 MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
1387 FROM seesaw_dlq
1388 GROUP BY correlation_id
1389 )
1390 SELECT
1391 COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
1392 COALESCE(e.failed_effects, 0) AS failed_effects,
1393 COALESCE(e.active_effects, 0) AS active_effects,
1394 COALESCE(d.dead_letters, 0) AS dead_letters,
1395 GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
1396 COALESCE(d.last_dlq_error, e.last_error) AS last_error
1397 FROM effect_agg e
1398 FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
1399 WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
1400 ORDER BY last_failed_at DESC NULLS LAST
1401 LIMIT $1",
1402 )
1403 .bind(limit as i64)
1404 .fetch_all(&self.pool)
1405 .await?;
1406
1407 Ok(rows
1408 .into_iter()
1409 .map(|row| FailedWorkflow {
1410 correlation_id: row.correlation_id,
1411 failed_effects: row.failed_effects,
1412 active_effects: row.active_effects,
1413 dead_letters: row.dead_letters,
1414 last_failed_at: row.last_failed_at,
1415 last_error: row.last_error,
1416 })
1417 .collect())
1418 }
1419}
1420
1421#[derive(FromRow)]
1422struct EffectTreeRow {
1423 event_id: Uuid,
1424 effect_id: String,
1425 status: String,
1426 result: Option<serde_json::Value>,
1427 error: Option<String>,
1428 attempts: i32,
1429 created_at: DateTime<Utc>,
1430 batch_id: Option<Uuid>,
1431 batch_index: Option<i32>,
1432 batch_size: Option<i32>,
1433}
1434
1435fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
1436 let stream_type = match row.stream_type.as_str() {
1437 "event_dispatched" => StreamType::EventDispatched,
1438 "effect_started" => StreamType::EffectStarted,
1439 "effect_completed" => StreamType::EffectCompleted,
1440 "effect_failed" => StreamType::EffectFailed,
1441 _ => StreamType::EventDispatched, };
1443
1444 let event_type = if stream_type == StreamType::EventDispatched {
1446 row.payload
1447 .as_ref()
1448 .and_then(|p| p.get("event_type"))
1449 .and_then(|v| v.as_str())
1450 .map(|s| s.to_string())
1451 } else {
1452 None
1453 };
1454
1455 InsightEvent {
1456 seq: row.seq,
1457 stream_type,
1458 correlation_id: row.correlation_id,
1459 event_id: row.event_id,
1460 effect_event_id: row.effect_event_id,
1461 effect_id: row.effect_id,
1462 event_type,
1463 status: row.status,
1464 error: row.error,
1465 payload: row.payload,
1466 created_at: row.created_at,
1467 }
1468}
1469
1470fn build_event_tree(
1471 events: &[EventRow],
1472 effects: &[EffectTreeRow],
1473 parent_id: Option<Uuid>,
1474 event_ids: &HashSet<Uuid>,
1475 is_root_pass: bool,
1476) -> Vec<EventNode> {
1477 events
1478 .iter()
1479 .filter(|event| {
1480 if is_root_pass {
1481 event.parent_id.is_none()
1482 || event
1483 .parent_id
1484 .map(|parent| !event_ids.contains(&parent))
1485 .unwrap_or(false)
1486 } else {
1487 event.parent_id == parent_id
1488 }
1489 })
1490 .map(|event| {
1491 let event_effects: Vec<EffectNode> = effects
1493 .iter()
1494 .filter(|eff| eff.event_id == event.event_id)
1495 .map(|eff| EffectNode {
1496 effect_id: eff.effect_id.clone(),
1497 event_id: eff.event_id,
1498 status: eff.status.clone(),
1499 result: eff.result.clone(),
1500 error: eff.error.clone(),
1501 attempts: eff.attempts,
1502 created_at: eff.created_at,
1503 batch_id: eff.batch_id,
1504 batch_index: eff.batch_index,
1505 batch_size: eff.batch_size,
1506 })
1507 .collect();
1508
1509 let children =
1511 build_event_tree(events, effects, Some(event.event_id), event_ids, false);
1512
1513 EventNode {
1514 event_id: event.event_id,
1515 event_type: event.event_type.clone(),
1516 payload: event.payload.clone(),
1517 created_at: event.created_at,
1518 batch_id: event.batch_id,
1519 batch_index: event.batch_index,
1520 batch_size: event.batch_size,
1521 children,
1522 effects: event_effects,
1523 }
1524 })
1525 .collect()
1526}
1527
1528#[cfg(test)]
1529mod tests {
1530 use super::*;
1531 use chrono::{TimeZone, Timelike};
1532
1533 #[test]
1534 fn emitted_event_created_at_is_midnight_on_parent_day() {
1535 let parent = Utc
1536 .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
1537 .single()
1538 .expect("valid timestamp");
1539
1540 let emitted = emitted_event_created_at(parent);
1541
1542 assert_eq!(emitted.date_naive(), parent.date_naive());
1543 assert_eq!(emitted.hour(), 0);
1544 assert_eq!(emitted.minute(), 0);
1545 assert_eq!(emitted.second(), 0);
1546 }
1547
1548 #[test]
1549 fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
1550 let first_parent = Utc
1551 .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
1552 .single()
1553 .expect("valid timestamp");
1554 let second_parent = Utc
1555 .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
1556 .single()
1557 .expect("valid timestamp");
1558
1559 let first_emitted = emitted_event_created_at(first_parent);
1560 let second_emitted = emitted_event_created_at(second_parent);
1561
1562 assert_eq!(first_emitted, second_emitted);
1563 }
1564
1565 #[test]
1566 fn effect_retry_delay_seconds_uses_exponential_backoff() {
1567 assert_eq!(effect_retry_delay_seconds(1), 1);
1568 assert_eq!(effect_retry_delay_seconds(2), 2);
1569 assert_eq!(effect_retry_delay_seconds(3), 4);
1570 assert_eq!(effect_retry_delay_seconds(4), 8);
1571 }
1572
1573 #[test]
1574 fn effect_retry_delay_seconds_is_capped() {
1575 assert_eq!(effect_retry_delay_seconds(9), 256);
1576 assert_eq!(effect_retry_delay_seconds(50), 256);
1577 }
1578
1579 #[test]
1580 fn build_event_tree_treats_orphan_parent_as_root() {
1581 let correlation_id = Uuid::new_v4();
1582 let event_id = Uuid::new_v4();
1583 let missing_parent = Uuid::new_v4();
1584 let now = Utc::now();
1585
1586 let events = vec![EventRow {
1587 id: 1,
1588 event_id,
1589 parent_id: Some(missing_parent),
1590 correlation_id,
1591 event_type: "OrphanEvent".to_string(),
1592 payload: serde_json::json!({"ok": true}),
1593 hops: 1,
1594 batch_id: None,
1595 batch_index: None,
1596 batch_size: None,
1597 created_at: now,
1598 }];
1599
1600 let effects: Vec<EffectTreeRow> = Vec::new();
1601 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1602
1603 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1604
1605 assert_eq!(roots.len(), 1);
1606 assert_eq!(roots[0].event_id, event_id);
1607 }
1608}