1use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8 insight::*, EmittedEvent, JoinEntry, QueuedEffectExecution, QueuedEvent, Store,
9 NAMESPACE_SEESAW,
10};
11use serde::{Deserialize, Serialize};
12use sqlx::{FromRow, PgPool};
13use std::collections::HashSet;
14use uuid::Uuid;
15
16const EVENT_CLAIM_SECONDS: i64 = 30;
17
18fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
19 parent_created_at
20 .date_naive()
21 .and_hms_opt(0, 0, 0)
22 .expect("midnight should always be a valid UTC timestamp")
23 .and_utc()
24}
25
26fn effect_retry_delay_seconds(attempts: i32) -> i64 {
27 let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
28 1_i64 << exponent
29}
30
31pub struct PostgresStore {
33 pool: PgPool,
34}
35
36impl PostgresStore {
37 pub fn new(pool: PgPool) -> Self {
38 Self { pool }
39 }
40
41 pub fn pool(&self) -> &PgPool {
42 &self.pool
43 }
44}
45
46impl Clone for PostgresStore {
47 fn clone(&self) -> Self {
48 Self {
49 pool: self.pool.clone(),
50 }
51 }
52}
53
54#[derive(FromRow)]
55struct EventRow {
56 id: i64,
57 event_id: Uuid,
58 parent_id: Option<Uuid>,
59 correlation_id: Uuid,
60 event_type: String,
61 payload: serde_json::Value,
62 hops: i32,
63 retry_count: i32,
64 batch_id: Option<Uuid>,
65 batch_index: Option<i32>,
66 batch_size: Option<i32>,
67 created_at: DateTime<Utc>,
68}
69
70#[derive(FromRow)]
71struct StateRow {
72 state: serde_json::Value,
73 version: i32,
74}
75
76#[derive(FromRow)]
77struct EffectRow {
78 event_id: Uuid,
79 effect_id: String,
80 correlation_id: Uuid,
81 event_type: String,
82 event_payload: serde_json::Value,
83 parent_event_id: Option<Uuid>,
84 batch_id: Option<Uuid>,
85 batch_index: Option<i32>,
86 batch_size: Option<i32>,
87 execute_at: DateTime<Utc>,
88 timeout_seconds: i32,
89 max_attempts: i32,
90 priority: i32,
91 attempts: i32,
92}
93
94#[derive(FromRow)]
95struct ParentEventRow {
96 hops: i32,
97 created_at: DateTime<Utc>,
98}
99
100#[derive(FromRow)]
101struct WorkflowEventRow {
102 id: i64,
103 event_id: Uuid,
104 correlation_id: Uuid,
105 event_type: String,
106 payload: serde_json::Value,
107 created_at: DateTime<Utc>,
108}
109
110#[async_trait]
111impl Store for PostgresStore {
112 async fn publish(&self, event: QueuedEvent) -> Result<()> {
113 let mut tx = self.pool.begin().await?;
114
115 let inserted: Option<Uuid> = sqlx::query_scalar(
118 "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
119 VALUES ($1, $2, $3)
120 ON CONFLICT (event_id) DO NOTHING
121 RETURNING event_id",
122 )
123 .bind(event.event_id)
124 .bind(event.correlation_id)
125 .bind(event.created_at)
126 .fetch_optional(&mut *tx)
127 .await?;
128
129 if inserted.is_none() {
130 tx.commit().await?;
131 return Ok(());
132 }
133
134 sqlx::query(
135 "INSERT INTO seesaw_events (
136 event_id, parent_id, correlation_id, event_type, payload, hops, retry_count,
137 batch_id, batch_index, batch_size, created_at
138 )
139 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
140 )
141 .bind(event.event_id)
142 .bind(event.parent_id)
143 .bind(event.correlation_id)
144 .bind(event.event_type)
145 .bind(event.payload)
146 .bind(event.hops)
147 .bind(event.retry_count)
148 .bind(event.batch_id)
149 .bind(event.batch_index)
150 .bind(event.batch_size)
151 .bind(event.created_at)
152 .execute(&mut *tx)
153 .await?;
154
155 tx.commit().await?;
156
157 Ok(())
158 }
159
160 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
161 let row: Option<EventRow> = sqlx::query_as(
162 "WITH next_event AS (
163 SELECT e.id
164 FROM seesaw_events e
165 WHERE e.processed_at IS NULL
166 AND (e.locked_until IS NULL OR e.locked_until < NOW())
167 AND NOT EXISTS (
168 SELECT 1
169 FROM seesaw_events older
170 WHERE older.correlation_id = e.correlation_id
171 AND older.processed_at IS NULL
172 AND (
173 older.created_at < e.created_at
174 OR (older.created_at = e.created_at AND older.id < e.id)
175 )
176 )
177 ORDER BY e.created_at ASC, e.id ASC
178 LIMIT 1
179 FOR UPDATE SKIP LOCKED
180 )
181 UPDATE seesaw_events e
182 SET locked_until = NOW() + ($1 * INTERVAL '1 second')
183 FROM next_event
184 WHERE e.id = next_event.id
185 RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload,
186 e.hops, e.retry_count, e.batch_id, e.batch_index, e.batch_size, e.created_at",
187 )
188 .bind(EVENT_CLAIM_SECONDS)
189 .fetch_optional(&self.pool)
190 .await?;
191
192 Ok(row.map(|r| QueuedEvent {
193 id: r.id,
194 event_id: r.event_id,
195 parent_id: r.parent_id,
196 correlation_id: r.correlation_id,
197 event_type: r.event_type,
198 payload: r.payload,
199 hops: r.hops,
200 retry_count: r.retry_count,
201 batch_id: r.batch_id,
202 batch_index: r.batch_index,
203 batch_size: r.batch_size,
204 created_at: r.created_at,
205 }))
206 }
207
208 async fn ack(&self, id: i64) -> Result<()> {
209 sqlx::query(
210 "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
211 )
212 .bind(id)
213 .execute(&self.pool)
214 .await?;
215 Ok(())
216 }
217
218 async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
219 let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
220 sqlx::query(
221 "UPDATE seesaw_events
222 SET retry_count = retry_count + 1,
223 locked_until = $2
224 WHERE id = $1",
225 )
226 .bind(id)
227 .bind(locked_until)
228 .execute(&self.pool)
229 .await?;
230 Ok(())
231 }
232
233 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
234 where
235 S: for<'de> Deserialize<'de> + Send,
236 {
237 let row: Option<StateRow> =
238 sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
239 .bind(correlation_id)
240 .fetch_optional(&self.pool)
241 .await?;
242
243 match row {
244 Some(r) => {
245 let state: S = serde_json::from_value(r.state)?;
246 Ok(Some((state, r.version)))
247 }
248 None => Ok(None),
249 }
250 }
251
252 async fn save_state<S>(
253 &self,
254 correlation_id: Uuid,
255 state: &S,
256 expected_version: i32,
257 ) -> Result<i32>
258 where
259 S: Serialize + Send + Sync,
260 {
261 let state_json = serde_json::to_value(state)?;
262 let new_version = expected_version + 1;
263
264 let result = sqlx::query(
265 "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
266 VALUES ($1, $2, $3, NOW())
267 ON CONFLICT (correlation_id) DO UPDATE
268 SET state = $2,
269 version = $3,
270 updated_at = NOW()
271 WHERE seesaw_state.version = $4",
272 )
273 .bind(correlation_id)
274 .bind(&state_json)
275 .bind(new_version)
276 .bind(expected_version)
277 .execute(&self.pool)
278 .await?;
279
280 if result.rows_affected() == 0 {
281 anyhow::bail!("Version conflict: state was modified concurrently");
282 }
283
284 Ok(new_version)
285 }
286
287 async fn insert_effect_intent(
288 &self,
289 event_id: Uuid,
290 effect_id: String,
291 correlation_id: Uuid,
292 event_type: String,
293 event_payload: serde_json::Value,
294 parent_event_id: Option<Uuid>,
295 batch_id: Option<Uuid>,
296 batch_index: Option<i32>,
297 batch_size: Option<i32>,
298 execute_at: DateTime<Utc>,
299 timeout_seconds: i32,
300 max_attempts: i32,
301 priority: i32,
302 ) -> Result<()> {
303 sqlx::query(
304 "INSERT INTO seesaw_effect_executions (
305 event_id, effect_id, correlation_id, status,
306 event_type, event_payload, parent_event_id,
307 batch_id, batch_index, batch_size,
308 execute_at, timeout_seconds, max_attempts, priority
309 )
310 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
311 )
312 .bind(event_id)
313 .bind(effect_id)
314 .bind(correlation_id)
315 .bind(event_type)
316 .bind(event_payload)
317 .bind(parent_event_id)
318 .bind(batch_id)
319 .bind(batch_index)
320 .bind(batch_size)
321 .bind(execute_at)
322 .bind(timeout_seconds)
323 .bind(max_attempts)
324 .bind(priority)
325 .execute(&self.pool)
326 .await?;
327
328 Ok(())
329 }
330
331 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
332 let row: Option<EffectRow> = sqlx::query_as(
333 "WITH next_effect AS (
334 SELECT event_id, effect_id
335 FROM seesaw_effect_executions
336 WHERE (
337 status = 'pending'
338 OR (status = 'failed' AND attempts < max_attempts)
339 )
340 AND execute_at <= NOW()
341 ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
342 LIMIT 1
343 FOR UPDATE SKIP LOCKED
344 )
345 UPDATE seesaw_effect_executions e
346 SET status = 'executing',
347 claimed_at = NOW(),
348 last_attempted_at = NOW(),
349 attempts = e.attempts + 1
350 FROM next_effect
351 WHERE e.event_id = next_effect.event_id
352 AND e.effect_id = next_effect.effect_id
353 RETURNING
354 e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
355 e.batch_id, e.batch_index, e.batch_size,
356 e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
357 )
358 .fetch_optional(&self.pool)
359 .await?;
360
361 if let Some(r) = row {
362 Ok(Some(QueuedEffectExecution {
363 event_id: r.event_id,
364 effect_id: r.effect_id,
365 correlation_id: r.correlation_id,
366 event_type: r.event_type,
367 event_payload: r.event_payload,
368 parent_event_id: r.parent_event_id,
369 batch_id: r.batch_id,
370 batch_index: r.batch_index,
371 batch_size: r.batch_size,
372 execute_at: r.execute_at,
373 timeout_seconds: r.timeout_seconds,
374 max_attempts: r.max_attempts,
375 priority: r.priority,
376 attempts: r.attempts,
377 }))
378 } else {
379 Ok(None)
380 }
381 }
382
383 async fn complete_effect(
384 &self,
385 event_id: Uuid,
386 effect_id: String,
387 result: serde_json::Value,
388 ) -> Result<()> {
389 sqlx::query(
390 "UPDATE seesaw_effect_executions
391 SET status = 'completed',
392 result = $3,
393 completed_at = NOW()
394 WHERE event_id = $1 AND effect_id = $2",
395 )
396 .bind(event_id)
397 .bind(effect_id)
398 .bind(result)
399 .execute(&self.pool)
400 .await?;
401
402 Ok(())
403 }
404
405 async fn complete_effect_with_events(
406 &self,
407 event_id: Uuid,
408 effect_id: String,
409 result: serde_json::Value,
410 emitted_events: Vec<EmittedEvent>,
411 ) -> Result<()> {
412 let effect: EffectRow = sqlx::query_as(
414 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
415 batch_id, batch_index, batch_size,
416 execute_at, timeout_seconds, max_attempts, priority, attempts
417 FROM seesaw_effect_executions
418 WHERE event_id = $1 AND effect_id = $2",
419 )
420 .bind(event_id)
421 .bind(&effect_id)
422 .fetch_one(&self.pool)
423 .await?;
424
425 let parent: ParentEventRow = sqlx::query_as(
427 "SELECT hops, created_at
428 FROM seesaw_events
429 WHERE event_id = $1
430 ORDER BY created_at ASC, id ASC
431 LIMIT 1",
432 )
433 .bind(event_id)
434 .fetch_one(&self.pool)
435 .await?;
436
437 let mut tx = self.pool.begin().await?;
439
440 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
442 let deterministic_id = Uuid::new_v5(
445 &NAMESPACE_SEESAW,
446 format!(
447 "{}-{}-{}-{}",
448 event_id, effect_id, emitted.event_type, emitted_index
449 )
450 .as_bytes(),
451 );
452
453 let deterministic_timestamp = emitted_event_created_at(parent.created_at);
456
457 sqlx::query(
459 "INSERT INTO seesaw_events (
460 event_id, parent_id, correlation_id, event_type, payload, hops,
461 batch_id, batch_index, batch_size, created_at
462 )
463 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
464 ON CONFLICT (event_id, created_at) DO NOTHING",
465 )
466 .bind(deterministic_id)
467 .bind(Some(event_id))
468 .bind(effect.correlation_id)
469 .bind(&emitted.event_type)
470 .bind(emitted.payload)
471 .bind(parent.hops + 1)
472 .bind(emitted.batch_id)
473 .bind(emitted.batch_index)
474 .bind(emitted.batch_size)
475 .bind(deterministic_timestamp)
476 .execute(&mut *tx)
477 .await?;
478 }
479
480 sqlx::query(
482 "UPDATE seesaw_effect_executions
483 SET status = 'completed',
484 result = $3,
485 completed_at = NOW()
486 WHERE event_id = $1 AND effect_id = $2",
487 )
488 .bind(event_id)
489 .bind(effect_id)
490 .bind(result)
491 .execute(&mut *tx)
492 .await?;
493
494 tx.commit().await?;
496
497 Ok(())
498 }
499
500 async fn fail_effect(
501 &self,
502 event_id: Uuid,
503 effect_id: String,
504 error: String,
505 attempts: i32,
506 ) -> Result<()> {
507 let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
508 sqlx::query(
509 "UPDATE seesaw_effect_executions
510 SET status = 'failed',
511 error = $3,
512 execute_at = $5,
513 claimed_at = NULL
514 WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
515 )
516 .bind(event_id)
517 .bind(effect_id)
518 .bind(error)
519 .bind(attempts)
520 .bind(retry_at)
521 .execute(&self.pool)
522 .await?;
523
524 Ok(())
525 }
526
527 async fn dlq_effect(
528 &self,
529 event_id: Uuid,
530 effect_id: String,
531 error: String,
532 reason: String,
533 attempts: i32,
534 ) -> Result<()> {
535 self.dlq_effect_with_events(event_id, effect_id, error, reason, attempts, Vec::new())
536 .await
537 }
538
539 async fn dlq_effect_with_events(
540 &self,
541 event_id: Uuid,
542 effect_id: String,
543 error: String,
544 reason: String,
545 attempts: i32,
546 emitted_events: Vec<EmittedEvent>,
547 ) -> Result<()> {
548 let effect: EffectRow = sqlx::query_as(
550 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
551 batch_id, batch_index, batch_size,
552 execute_at, timeout_seconds, max_attempts, priority, attempts
553 FROM seesaw_effect_executions
554 WHERE event_id = $1 AND effect_id = $2",
555 )
556 .bind(event_id)
557 .bind(&effect_id)
558 .fetch_one(&self.pool)
559 .await?;
560
561 let parent = sqlx::query_as::<_, ParentEventRow>(
562 "SELECT hops, created_at
563 FROM seesaw_events
564 WHERE event_id = $1
565 ORDER BY created_at ASC, id ASC
566 LIMIT 1",
567 )
568 .bind(event_id)
569 .fetch_optional(&self.pool)
570 .await?;
571
572 let mut tx = self.pool.begin().await?;
573
574 sqlx::query(
576 "INSERT INTO seesaw_dlq (
577 event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
578 )
579 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
580 )
581 .bind(event_id)
582 .bind(&effect_id)
583 .bind(effect.correlation_id)
584 .bind(&error)
585 .bind(&effect.event_type)
586 .bind(&effect.event_payload)
587 .bind(&reason)
588 .bind(attempts)
589 .execute(&mut *tx)
590 .await?;
591
592 let synthetic_created_at = parent
593 .as_ref()
594 .map(|row| emitted_event_created_at(row.created_at))
595 .unwrap_or_else(Utc::now);
596 let synthetic_hops = parent.as_ref().map(|row| row.hops + 1).unwrap_or(0);
597
598 if emitted_events.is_empty() {
599 if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
600 (effect.batch_id, effect.batch_index, effect.batch_size)
601 {
602 let synthetic_event_id = Uuid::new_v5(
603 &NAMESPACE_SEESAW,
604 format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
605 );
606
607 sqlx::query(
608 "INSERT INTO seesaw_events (
609 event_id, parent_id, correlation_id, event_type, payload, hops,
610 batch_id, batch_index, batch_size, created_at
611 )
612 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
613 ON CONFLICT (event_id, created_at) DO NOTHING",
614 )
615 .bind(synthetic_event_id)
616 .bind(Some(event_id))
617 .bind(effect.correlation_id)
618 .bind(&effect.event_type)
619 .bind(&effect.event_payload)
620 .bind(synthetic_hops)
621 .bind(Some(batch_id))
622 .bind(Some(batch_index))
623 .bind(Some(batch_size))
624 .bind(synthetic_created_at)
625 .execute(&mut *tx)
626 .await?;
627 }
628 } else {
629 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
630 let synthetic_event_id = Uuid::new_v5(
631 &NAMESPACE_SEESAW,
632 format!(
633 "{}-{}-dlq-terminal-{}-{}",
634 event_id, effect_id, emitted.event_type, emitted_index
635 )
636 .as_bytes(),
637 );
638
639 sqlx::query(
640 "INSERT INTO seesaw_events (
641 event_id, parent_id, correlation_id, event_type, payload, hops,
642 batch_id, batch_index, batch_size, created_at
643 )
644 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
645 ON CONFLICT (event_id, created_at) DO NOTHING",
646 )
647 .bind(synthetic_event_id)
648 .bind(Some(event_id))
649 .bind(effect.correlation_id)
650 .bind(&emitted.event_type)
651 .bind(emitted.payload)
652 .bind(synthetic_hops)
653 .bind(emitted.batch_id.or(effect.batch_id))
654 .bind(emitted.batch_index.or(effect.batch_index))
655 .bind(emitted.batch_size.or(effect.batch_size))
656 .bind(synthetic_created_at)
657 .execute(&mut *tx)
658 .await?;
659 }
660 }
661
662 sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
664 .bind(event_id)
665 .bind(&effect_id)
666 .execute(&mut *tx)
667 .await?;
668
669 tx.commit().await?;
670
671 Ok(())
672 }
673
674 async fn subscribe_workflow_events(
675 &self,
676 correlation_id: Uuid,
677 ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
678 use sqlx::postgres::PgListener;
679
680 let channel = format!("seesaw_workflow_{}", correlation_id);
681 const PAGE_SIZE: i64 = 256;
682 const CATCH_UP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
683
684 let initial_cursor: Option<(DateTime<Utc>, i64)> = sqlx::query_as(
687 "SELECT created_at, id
688 FROM seesaw_events
689 WHERE correlation_id = $1
690 ORDER BY created_at DESC, id DESC
691 LIMIT 1",
692 )
693 .bind(correlation_id)
694 .fetch_optional(&self.pool)
695 .await?;
696
697 let mut listener = PgListener::connect_with(&self.pool).await?;
699 listener.listen(&channel).await?;
700
701 let pool = self.pool.clone();
702 let (tx, rx) = futures::channel::mpsc::unbounded::<seesaw_core::WorkflowEvent>();
703
704 tokio::spawn(async move {
705 let mut cursor = initial_cursor;
706 let mut drain_pending = true;
707
708 loop {
709 if !drain_pending {
710 match tokio::time::timeout(CATCH_UP_INTERVAL, listener.recv()).await {
711 Ok(Ok(_notification)) => {}
712 Ok(Err(error)) => {
713 tracing::warn!(
714 "workflow listener recv failed for {}: {}",
715 correlation_id,
716 error
717 );
718 return;
719 }
720 Err(_) => {}
721 }
722 }
723 drain_pending = false;
724
725 loop {
726 let rows_result: std::result::Result<Vec<WorkflowEventRow>, sqlx::Error> =
727 if let Some((created_at, id)) = cursor {
728 sqlx::query_as(
729 "SELECT id, event_id, correlation_id, event_type, payload, created_at
730 FROM seesaw_events
731 WHERE correlation_id = $1
732 AND (
733 created_at > $2
734 OR (created_at = $2 AND id > $3)
735 )
736 ORDER BY created_at ASC, id ASC
737 LIMIT $4",
738 )
739 .bind(correlation_id)
740 .bind(created_at)
741 .bind(id)
742 .bind(PAGE_SIZE)
743 .fetch_all(&pool)
744 .await
745 } else {
746 sqlx::query_as(
747 "SELECT id, event_id, correlation_id, event_type, payload, created_at
748 FROM seesaw_events
749 WHERE correlation_id = $1
750 ORDER BY created_at ASC, id ASC
751 LIMIT $2",
752 )
753 .bind(correlation_id)
754 .bind(PAGE_SIZE)
755 .fetch_all(&pool)
756 .await
757 };
758
759 let rows = match rows_result {
760 Ok(rows) => rows,
761 Err(error) => {
762 tracing::warn!(
763 "workflow event query failed for {}: {}",
764 correlation_id,
765 error
766 );
767 return;
768 }
769 };
770
771 if rows.is_empty() {
772 break;
773 }
774
775 for row in rows {
776 cursor = Some((row.created_at, row.id));
777 if tx
778 .unbounded_send(seesaw_core::WorkflowEvent {
779 event_id: row.event_id,
780 correlation_id: row.correlation_id,
781 event_type: row.event_type,
782 payload: row.payload,
783 })
784 .is_err()
785 {
786 return;
787 }
788 }
789 }
790 }
791 });
792
793 Ok(Box::new(rx))
794 }
795
796 async fn get_workflow_status(
797 &self,
798 correlation_id: Uuid,
799 ) -> Result<seesaw_core::WorkflowStatus> {
800 let state = sqlx::query_as::<_, (serde_json::Value,)>(
801 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
802 )
803 .bind(correlation_id)
804 .fetch_optional(&self.pool)
805 .await?
806 .map(|r| r.0);
807
808 let pending_effects = sqlx::query_as::<_, (i64,)>(
809 "SELECT COUNT(*) FROM seesaw_effect_executions
810 WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
811 )
812 .bind(correlation_id)
813 .fetch_one(&self.pool)
814 .await?
815 .0;
816
817 let last_event = sqlx::query_as::<_, (String,)>(
818 "SELECT event_type FROM seesaw_events
819 WHERE correlation_id = $1
820 ORDER BY created_at DESC, id DESC
821 LIMIT 1",
822 )
823 .bind(correlation_id)
824 .fetch_optional(&self.pool)
825 .await?
826 .map(|r| r.0);
827
828 Ok(seesaw_core::WorkflowStatus {
829 correlation_id,
830 state,
831 pending_effects,
832 is_settled: pending_effects == 0,
833 last_event,
834 })
835 }
836
837 async fn join_same_batch_append_and_maybe_claim(
838 &self,
839 join_effect_id: String,
840 correlation_id: Uuid,
841 source_event_id: Uuid,
842 source_event_type: String,
843 source_payload: serde_json::Value,
844 source_created_at: DateTime<Utc>,
845 batch_id: Uuid,
846 batch_index: i32,
847 batch_size: i32,
848 ) -> Result<Option<Vec<JoinEntry>>> {
849 let mut tx = self.pool.begin().await?;
850
851 sqlx::query(
852 "INSERT INTO seesaw_join_entries (
853 join_effect_id, correlation_id, source_event_id, source_event_type, source_payload,
854 source_created_at, batch_id, batch_index, batch_size
855 )
856 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
857 ON CONFLICT (join_effect_id, correlation_id, source_event_id) DO NOTHING",
858 )
859 .bind(&join_effect_id)
860 .bind(correlation_id)
861 .bind(source_event_id)
862 .bind(&source_event_type)
863 .bind(source_payload)
864 .bind(source_created_at)
865 .bind(batch_id)
866 .bind(batch_index)
867 .bind(batch_size)
868 .execute(&mut *tx)
869 .await?;
870
871 sqlx::query(
872 "INSERT INTO seesaw_join_windows (
873 join_effect_id, correlation_id, mode, batch_id, target_count, status
874 )
875 VALUES ($1, $2, 'same_batch', $3, $4, 'open')
876 ON CONFLICT (join_effect_id, correlation_id, batch_id) DO NOTHING",
877 )
878 .bind(&join_effect_id)
879 .bind(correlation_id)
880 .bind(batch_id)
881 .bind(batch_size)
882 .execute(&mut *tx)
883 .await?;
884
885 sqlx::query(
886 "UPDATE seesaw_join_windows
887 SET target_count = $4,
888 updated_at = NOW()
889 WHERE join_effect_id = $1
890 AND correlation_id = $2
891 AND batch_id = $3
892 AND target_count <> $4",
893 )
894 .bind(&join_effect_id)
895 .bind(correlation_id)
896 .bind(batch_id)
897 .bind(batch_size)
898 .execute(&mut *tx)
899 .await?;
900
901 let claimed: Option<(String,)> = sqlx::query_as(
902 "UPDATE seesaw_join_windows w
903 SET status = 'processing',
904 sealed_at = COALESCE(w.sealed_at, NOW()),
905 processing_started_at = NOW(),
906 updated_at = NOW(),
907 last_error = NULL
908 WHERE w.join_effect_id = $1
909 AND w.correlation_id = $2
910 AND w.batch_id = $3
911 AND w.status = 'open'
912 AND (
913 SELECT COUNT(*)::int
914 FROM seesaw_join_entries e
915 WHERE e.join_effect_id = w.join_effect_id
916 AND e.correlation_id = w.correlation_id
917 AND e.batch_id = w.batch_id
918 ) >= w.target_count
919 RETURNING w.join_effect_id",
920 )
921 .bind(&join_effect_id)
922 .bind(correlation_id)
923 .bind(batch_id)
924 .fetch_optional(&mut *tx)
925 .await?;
926
927 if claimed.is_none() {
928 tx.commit().await?;
929 return Ok(None);
930 }
931
932 let rows = sqlx::query_as::<_, (Uuid, String, serde_json::Value, Uuid, i32, i32, DateTime<Utc>)>(
933 "SELECT source_event_id, source_event_type, source_payload, batch_id, batch_index, batch_size, source_created_at
934 FROM seesaw_join_entries
935 WHERE join_effect_id = $1
936 AND correlation_id = $2
937 AND batch_id = $3
938 ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
939 )
940 .bind(&join_effect_id)
941 .bind(correlation_id)
942 .bind(batch_id)
943 .fetch_all(&mut *tx)
944 .await?;
945
946 let entries = rows
947 .into_iter()
948 .map(
949 |(
950 source_event_id,
951 event_type,
952 payload,
953 batch_id,
954 batch_index,
955 batch_size,
956 created_at,
957 )| JoinEntry {
958 source_event_id,
959 event_type,
960 payload,
961 batch_id,
962 batch_index,
963 batch_size,
964 created_at,
965 },
966 )
967 .collect::<Vec<_>>();
968
969 tx.commit().await?;
970 Ok(Some(entries))
971 }
972
973 async fn join_same_batch_complete(
974 &self,
975 join_effect_id: String,
976 correlation_id: Uuid,
977 batch_id: Uuid,
978 ) -> Result<()> {
979 let mut tx = self.pool.begin().await?;
980
981 sqlx::query(
982 "UPDATE seesaw_join_windows
983 SET status = 'completed',
984 completed_at = NOW(),
985 updated_at = NOW()
986 WHERE join_effect_id = $1
987 AND correlation_id = $2
988 AND batch_id = $3",
989 )
990 .bind(&join_effect_id)
991 .bind(correlation_id)
992 .bind(batch_id)
993 .execute(&mut *tx)
994 .await?;
995
996 sqlx::query(
997 "DELETE FROM seesaw_join_entries
998 WHERE join_effect_id = $1
999 AND correlation_id = $2
1000 AND batch_id = $3",
1001 )
1002 .bind(&join_effect_id)
1003 .bind(correlation_id)
1004 .bind(batch_id)
1005 .execute(&mut *tx)
1006 .await?;
1007
1008 sqlx::query(
1009 "DELETE FROM seesaw_join_windows
1010 WHERE join_effect_id = $1
1011 AND correlation_id = $2
1012 AND batch_id = $3",
1013 )
1014 .bind(&join_effect_id)
1015 .bind(correlation_id)
1016 .bind(batch_id)
1017 .execute(&mut *tx)
1018 .await?;
1019
1020 tx.commit().await?;
1021 Ok(())
1022 }
1023
1024 async fn join_same_batch_release(
1025 &self,
1026 join_effect_id: String,
1027 correlation_id: Uuid,
1028 batch_id: Uuid,
1029 error: String,
1030 ) -> Result<()> {
1031 sqlx::query(
1032 "UPDATE seesaw_join_windows
1033 SET status = 'open',
1034 processing_started_at = NULL,
1035 last_error = $4,
1036 updated_at = NOW()
1037 WHERE join_effect_id = $1
1038 AND correlation_id = $2
1039 AND batch_id = $3
1040 AND status = 'processing'",
1041 )
1042 .bind(&join_effect_id)
1043 .bind(correlation_id)
1044 .bind(batch_id)
1045 .bind(error)
1046 .execute(&self.pool)
1047 .await?;
1048
1049 Ok(())
1050 }
1051}
1052
1053#[derive(FromRow)]
1054struct StreamRow {
1055 seq: i64,
1056 stream_type: String,
1057 correlation_id: Uuid,
1058 event_id: Option<Uuid>,
1059 effect_event_id: Option<Uuid>,
1060 effect_id: Option<String>,
1061 status: Option<String>,
1062 error: Option<String>,
1063 payload: Option<serde_json::Value>,
1064 created_at: DateTime<Utc>,
1065}
1066
1067#[derive(FromRow)]
1068struct EffectLogRow {
1069 correlation_id: Uuid,
1070 event_id: Uuid,
1071 effect_id: String,
1072 status: String,
1073 attempts: i32,
1074 event_type: String,
1075 result: Option<serde_json::Value>,
1076 error: Option<String>,
1077 created_at: DateTime<Utc>,
1078 execute_at: DateTime<Utc>,
1079 claimed_at: Option<DateTime<Utc>>,
1080 last_attempted_at: Option<DateTime<Utc>>,
1081 completed_at: Option<DateTime<Utc>>,
1082}
1083
1084#[derive(FromRow)]
1085struct DeadLetterRow {
1086 correlation_id: Uuid,
1087 event_id: Uuid,
1088 effect_id: String,
1089 event_type: String,
1090 event_payload: serde_json::Value,
1091 error: String,
1092 reason: String,
1093 attempts: i32,
1094 failed_at: DateTime<Utc>,
1095 resolved_at: Option<DateTime<Utc>>,
1096}
1097
1098#[derive(FromRow)]
1099struct FailedWorkflowRow {
1100 correlation_id: Uuid,
1101 failed_effects: i64,
1102 active_effects: i64,
1103 dead_letters: i64,
1104 last_failed_at: Option<DateTime<Utc>>,
1105 last_error: Option<String>,
1106}
1107
1108#[async_trait]
1109impl InsightStore for PostgresStore {
1110 async fn subscribe_events(
1111 &self,
1112 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
1113 use futures::stream::StreamExt;
1114 use sqlx::postgres::PgListener;
1115
1116 let mut listener = PgListener::connect_with(&self.pool).await?;
1118 listener.listen("seesaw_stream").await?;
1119
1120 let pool = self.pool.clone();
1122 let stream = listener.into_stream().filter_map(move |result| {
1123 let pool = pool.clone();
1124 Box::pin(async move {
1125 match result {
1126 Ok(_notification) => {
1127 if let Ok(row) = sqlx::query_as::<_, StreamRow>(
1130 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1131 effect_id, status, error, payload, created_at
1132 FROM seesaw_stream
1133 ORDER BY seq DESC
1134 LIMIT 1",
1135 )
1136 .fetch_one(&pool)
1137 .await
1138 {
1139 Some(stream_row_to_insight_event(row))
1140 } else {
1141 None
1142 }
1143 }
1144 Err(_) => None,
1145 }
1146 })
1147 });
1148
1149 Ok(Box::new(stream))
1150 }
1151
1152 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
1153 let events = sqlx::query_as::<_, EventRow>(
1155 "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
1156 batch_id, batch_index, batch_size, created_at
1157 FROM seesaw_events
1158 WHERE correlation_id = $1
1159 ORDER BY created_at ASC",
1160 )
1161 .bind(correlation_id)
1162 .fetch_all(&self.pool)
1163 .await?;
1164
1165 let effects = sqlx::query_as::<_, EffectTreeRow>(
1167 "SELECT event_id, effect_id, status, result, error, attempts, created_at,
1168 batch_id, batch_index, batch_size
1169 FROM seesaw_effect_executions
1170 WHERE correlation_id = $1
1171 ORDER BY created_at ASC",
1172 )
1173 .bind(correlation_id)
1174 .fetch_all(&self.pool)
1175 .await?;
1176
1177 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1179 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1180
1181 let state = sqlx::query_as::<_, (serde_json::Value,)>(
1183 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
1184 )
1185 .bind(correlation_id)
1186 .fetch_optional(&self.pool)
1187 .await?
1188 .map(|r| r.0);
1189
1190 Ok(WorkflowTree {
1191 correlation_id,
1192 roots,
1193 state,
1194 event_count: events.len(),
1195 effect_count: effects.len(),
1196 })
1197 }
1198
1199 async fn get_stats(&self) -> Result<InsightStats> {
1200 let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
1201 .fetch_one(&self.pool)
1202 .await?
1203 .0;
1204
1205 let active_effects = sqlx::query_as::<_, (i64,)>(
1206 "SELECT COUNT(*) FROM seesaw_effect_executions
1207 WHERE status IN ('pending', 'executing')",
1208 )
1209 .fetch_one(&self.pool)
1210 .await?
1211 .0;
1212
1213 let completed_effects = sqlx::query_as::<_, (i64,)>(
1214 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
1215 )
1216 .fetch_one(&self.pool)
1217 .await?
1218 .0;
1219
1220 let failed_effects = sqlx::query_as::<_, (i64,)>(
1221 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
1222 )
1223 .fetch_one(&self.pool)
1224 .await?
1225 .0;
1226
1227 Ok(InsightStats {
1228 total_events,
1229 active_effects,
1230 completed_effects,
1231 failed_effects,
1232 })
1233 }
1234
1235 async fn get_recent_events(
1236 &self,
1237 cursor: Option<i64>,
1238 limit: usize,
1239 ) -> Result<Vec<InsightEvent>> {
1240 let rows = if let Some(cursor_seq) = cursor {
1241 sqlx::query_as::<_, StreamRow>(
1242 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1243 effect_id, status, error, payload, created_at
1244 FROM seesaw_stream
1245 WHERE seq > $1
1246 ORDER BY seq ASC
1247 LIMIT $2",
1248 )
1249 .bind(cursor_seq)
1250 .bind(limit as i64)
1251 .fetch_all(&self.pool)
1252 .await?
1253 } else {
1254 sqlx::query_as::<_, StreamRow>(
1255 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1256 effect_id, status, error, payload, created_at
1257 FROM seesaw_stream
1258 ORDER BY seq DESC
1259 LIMIT $1",
1260 )
1261 .bind(limit as i64)
1262 .fetch_all(&self.pool)
1263 .await?
1264 };
1265
1266 Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
1267 }
1268
1269 async fn get_effect_logs(
1270 &self,
1271 correlation_id: Option<Uuid>,
1272 limit: usize,
1273 ) -> Result<Vec<EffectExecutionLog>> {
1274 let rows = sqlx::query_as::<_, EffectLogRow>(
1275 "SELECT
1276 correlation_id,
1277 event_id,
1278 effect_id,
1279 status,
1280 attempts,
1281 event_type,
1282 result,
1283 error,
1284 created_at,
1285 execute_at,
1286 claimed_at,
1287 last_attempted_at,
1288 completed_at
1289 FROM seesaw_effect_executions
1290 WHERE ($1::uuid IS NULL OR correlation_id = $1)
1291 ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
1292 LIMIT $2",
1293 )
1294 .bind(correlation_id)
1295 .bind(limit as i64)
1296 .fetch_all(&self.pool)
1297 .await?;
1298
1299 Ok(rows
1300 .into_iter()
1301 .map(|row| {
1302 let started_at = row.claimed_at.or(row.last_attempted_at);
1303 let duration_ms = match (started_at, row.completed_at) {
1304 (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
1305 _ => None,
1306 };
1307
1308 EffectExecutionLog {
1309 correlation_id: row.correlation_id,
1310 event_id: row.event_id,
1311 effect_id: row.effect_id,
1312 status: row.status,
1313 attempts: row.attempts,
1314 event_type: Some(row.event_type),
1315 result: row.result,
1316 error: row.error,
1317 created_at: row.created_at,
1318 execute_at: Some(row.execute_at),
1319 claimed_at: row.claimed_at,
1320 last_attempted_at: row.last_attempted_at,
1321 completed_at: row.completed_at,
1322 duration_ms,
1323 }
1324 })
1325 .collect())
1326 }
1327
1328 async fn get_dead_letters(
1329 &self,
1330 unresolved_only: bool,
1331 limit: usize,
1332 ) -> Result<Vec<DeadLetterEntry>> {
1333 let rows = sqlx::query_as::<_, DeadLetterRow>(
1334 "SELECT
1335 correlation_id,
1336 event_id,
1337 effect_id,
1338 event_type,
1339 event_payload,
1340 error,
1341 reason,
1342 attempts,
1343 failed_at,
1344 resolved_at
1345 FROM seesaw_dlq
1346 WHERE (NOT $1 OR resolved_at IS NULL)
1347 ORDER BY failed_at DESC
1348 LIMIT $2",
1349 )
1350 .bind(unresolved_only)
1351 .bind(limit as i64)
1352 .fetch_all(&self.pool)
1353 .await?;
1354
1355 Ok(rows
1356 .into_iter()
1357 .map(|row| DeadLetterEntry {
1358 correlation_id: row.correlation_id,
1359 event_id: row.event_id,
1360 effect_id: row.effect_id,
1361 event_type: row.event_type,
1362 event_payload: row.event_payload,
1363 error: row.error,
1364 reason: row.reason,
1365 attempts: row.attempts,
1366 failed_at: row.failed_at,
1367 resolved_at: row.resolved_at,
1368 })
1369 .collect())
1370 }
1371
1372 async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
1373 let rows = sqlx::query_as::<_, FailedWorkflowRow>(
1374 "WITH effect_agg AS (
1375 SELECT
1376 correlation_id,
1377 COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
1378 COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
1379 MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
1380 MAX(error) FILTER (WHERE status = 'failed') AS last_error
1381 FROM seesaw_effect_executions
1382 GROUP BY correlation_id
1383 ),
1384 dlq_agg AS (
1385 SELECT
1386 correlation_id,
1387 COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
1388 MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
1389 MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
1390 FROM seesaw_dlq
1391 GROUP BY correlation_id
1392 )
1393 SELECT
1394 COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
1395 COALESCE(e.failed_effects, 0) AS failed_effects,
1396 COALESCE(e.active_effects, 0) AS active_effects,
1397 COALESCE(d.dead_letters, 0) AS dead_letters,
1398 GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
1399 COALESCE(d.last_dlq_error, e.last_error) AS last_error
1400 FROM effect_agg e
1401 FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
1402 WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
1403 ORDER BY last_failed_at DESC NULLS LAST
1404 LIMIT $1",
1405 )
1406 .bind(limit as i64)
1407 .fetch_all(&self.pool)
1408 .await?;
1409
1410 Ok(rows
1411 .into_iter()
1412 .map(|row| FailedWorkflow {
1413 correlation_id: row.correlation_id,
1414 failed_effects: row.failed_effects,
1415 active_effects: row.active_effects,
1416 dead_letters: row.dead_letters,
1417 last_failed_at: row.last_failed_at,
1418 last_error: row.last_error,
1419 })
1420 .collect())
1421 }
1422}
1423
1424#[derive(FromRow)]
1425struct EffectTreeRow {
1426 event_id: Uuid,
1427 effect_id: String,
1428 status: String,
1429 result: Option<serde_json::Value>,
1430 error: Option<String>,
1431 attempts: i32,
1432 created_at: DateTime<Utc>,
1433 batch_id: Option<Uuid>,
1434 batch_index: Option<i32>,
1435 batch_size: Option<i32>,
1436}
1437
1438fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
1439 let stream_type = match row.stream_type.as_str() {
1440 "event_dispatched" => StreamType::EventDispatched,
1441 "effect_started" => StreamType::EffectStarted,
1442 "effect_completed" => StreamType::EffectCompleted,
1443 "effect_failed" => StreamType::EffectFailed,
1444 _ => StreamType::EventDispatched, };
1446
1447 let event_type = if stream_type == StreamType::EventDispatched {
1449 row.payload
1450 .as_ref()
1451 .and_then(|p| p.get("event_type"))
1452 .and_then(|v| v.as_str())
1453 .map(|s| s.to_string())
1454 } else {
1455 None
1456 };
1457
1458 InsightEvent {
1459 seq: row.seq,
1460 stream_type,
1461 correlation_id: row.correlation_id,
1462 event_id: row.event_id,
1463 effect_event_id: row.effect_event_id,
1464 effect_id: row.effect_id,
1465 event_type,
1466 status: row.status,
1467 error: row.error,
1468 payload: row.payload,
1469 created_at: row.created_at,
1470 }
1471}
1472
1473fn build_event_tree(
1474 events: &[EventRow],
1475 effects: &[EffectTreeRow],
1476 parent_id: Option<Uuid>,
1477 event_ids: &HashSet<Uuid>,
1478 is_root_pass: bool,
1479) -> Vec<EventNode> {
1480 events
1481 .iter()
1482 .filter(|event| {
1483 if is_root_pass {
1484 event.parent_id.is_none()
1485 || event
1486 .parent_id
1487 .map(|parent| !event_ids.contains(&parent))
1488 .unwrap_or(false)
1489 } else {
1490 event.parent_id == parent_id
1491 }
1492 })
1493 .map(|event| {
1494 let event_effects: Vec<EffectNode> = effects
1496 .iter()
1497 .filter(|eff| eff.event_id == event.event_id)
1498 .map(|eff| EffectNode {
1499 effect_id: eff.effect_id.clone(),
1500 event_id: eff.event_id,
1501 status: eff.status.clone(),
1502 result: eff.result.clone(),
1503 error: eff.error.clone(),
1504 attempts: eff.attempts,
1505 created_at: eff.created_at,
1506 batch_id: eff.batch_id,
1507 batch_index: eff.batch_index,
1508 batch_size: eff.batch_size,
1509 })
1510 .collect();
1511
1512 let children =
1514 build_event_tree(events, effects, Some(event.event_id), event_ids, false);
1515
1516 EventNode {
1517 event_id: event.event_id,
1518 event_type: event.event_type.clone(),
1519 payload: event.payload.clone(),
1520 created_at: event.created_at,
1521 batch_id: event.batch_id,
1522 batch_index: event.batch_index,
1523 batch_size: event.batch_size,
1524 children,
1525 effects: event_effects,
1526 }
1527 })
1528 .collect()
1529}
1530
1531#[cfg(test)]
1532mod tests {
1533 use super::*;
1534 use chrono::{TimeZone, Timelike};
1535
1536 #[test]
1537 fn emitted_event_created_at_is_midnight_on_parent_day() {
1538 let parent = Utc
1539 .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
1540 .single()
1541 .expect("valid timestamp");
1542
1543 let emitted = emitted_event_created_at(parent);
1544
1545 assert_eq!(emitted.date_naive(), parent.date_naive());
1546 assert_eq!(emitted.hour(), 0);
1547 assert_eq!(emitted.minute(), 0);
1548 assert_eq!(emitted.second(), 0);
1549 }
1550
1551 #[test]
1552 fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
1553 let first_parent = Utc
1554 .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
1555 .single()
1556 .expect("valid timestamp");
1557 let second_parent = Utc
1558 .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
1559 .single()
1560 .expect("valid timestamp");
1561
1562 let first_emitted = emitted_event_created_at(first_parent);
1563 let second_emitted = emitted_event_created_at(second_parent);
1564
1565 assert_eq!(first_emitted, second_emitted);
1566 }
1567
1568 #[test]
1569 fn effect_retry_delay_seconds_uses_exponential_backoff() {
1570 assert_eq!(effect_retry_delay_seconds(1), 1);
1571 assert_eq!(effect_retry_delay_seconds(2), 2);
1572 assert_eq!(effect_retry_delay_seconds(3), 4);
1573 assert_eq!(effect_retry_delay_seconds(4), 8);
1574 }
1575
1576 #[test]
1577 fn effect_retry_delay_seconds_is_capped() {
1578 assert_eq!(effect_retry_delay_seconds(9), 256);
1579 assert_eq!(effect_retry_delay_seconds(50), 256);
1580 }
1581
1582 #[test]
1583 fn build_event_tree_treats_orphan_parent_as_root() {
1584 let correlation_id = Uuid::new_v4();
1585 let event_id = Uuid::new_v4();
1586 let missing_parent = Uuid::new_v4();
1587 let now = Utc::now();
1588
1589 let events = vec![EventRow {
1590 id: 1,
1591 event_id,
1592 parent_id: Some(missing_parent),
1593 correlation_id,
1594 event_type: "OrphanEvent".to_string(),
1595 payload: serde_json::json!({"ok": true}),
1596 hops: 1,
1597 retry_count: 0,
1598 batch_id: None,
1599 batch_index: None,
1600 batch_size: None,
1601 created_at: now,
1602 }];
1603
1604 let effects: Vec<EffectTreeRow> = Vec::new();
1605 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1606
1607 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1608
1609 assert_eq!(roots.len(), 1);
1610 assert_eq!(roots[0].event_id, event_id);
1611 }
1612}