1use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8 insight::*, EmittedEvent, QueuedEffectExecution, QueuedEvent, Store, NAMESPACE_SEESAW,
9};
10use serde::{Deserialize, Serialize};
11use sqlx::{FromRow, PgPool};
12use uuid::Uuid;
13
14const EVENT_CLAIM_SECONDS: i64 = 30;
15
16fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
17 parent_created_at
18 .date_naive()
19 .and_hms_opt(0, 0, 0)
20 .expect("midnight should always be a valid UTC timestamp")
21 .and_utc()
22}
23
24fn effect_retry_delay_seconds(attempts: i32) -> i64 {
25 let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
26 1_i64 << exponent
27}
28
29pub struct PostgresStore {
31 pool: PgPool,
32}
33
34impl PostgresStore {
35 pub fn new(pool: PgPool) -> Self {
36 Self { pool }
37 }
38
39 pub fn pool(&self) -> &PgPool {
40 &self.pool
41 }
42}
43
44impl Clone for PostgresStore {
45 fn clone(&self) -> Self {
46 Self {
47 pool: self.pool.clone(),
48 }
49 }
50}
51
52#[derive(FromRow)]
53struct EventRow {
54 id: i64,
55 event_id: Uuid,
56 parent_id: Option<Uuid>,
57 correlation_id: Uuid,
58 event_type: String,
59 payload: serde_json::Value,
60 hops: i32,
61 created_at: DateTime<Utc>,
62}
63
64#[derive(FromRow)]
65struct StateRow {
66 state: serde_json::Value,
67 version: i32,
68}
69
70#[derive(FromRow)]
71struct EffectRow {
72 event_id: Uuid,
73 effect_id: String,
74 correlation_id: Uuid,
75 event_type: String,
76 event_payload: serde_json::Value,
77 parent_event_id: Option<Uuid>,
78 execute_at: DateTime<Utc>,
79 timeout_seconds: i32,
80 max_attempts: i32,
81 priority: i32,
82 attempts: i32,
83}
84
85#[derive(FromRow)]
86struct ParentEventRow {
87 hops: i32,
88 created_at: DateTime<Utc>,
89}
90
91#[async_trait]
92impl Store for PostgresStore {
93 async fn publish(&self, event: QueuedEvent) -> Result<()> {
94 let mut tx = self.pool.begin().await?;
95
96 let inserted: Option<Uuid> = sqlx::query_scalar(
99 "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
100 VALUES ($1, $2, $3)
101 ON CONFLICT (event_id) DO NOTHING
102 RETURNING event_id",
103 )
104 .bind(event.event_id)
105 .bind(event.correlation_id)
106 .bind(event.created_at)
107 .fetch_optional(&mut *tx)
108 .await?;
109
110 if inserted.is_none() {
111 tx.commit().await?;
112 return Ok(());
113 }
114
115 sqlx::query(
116 "INSERT INTO seesaw_events (
117 event_id, parent_id, correlation_id, event_type, payload, hops, created_at
118 )
119 VALUES ($1, $2, $3, $4, $5, $6, $7)",
120 )
121 .bind(event.event_id)
122 .bind(event.parent_id)
123 .bind(event.correlation_id)
124 .bind(event.event_type)
125 .bind(event.payload)
126 .bind(event.hops)
127 .bind(event.created_at)
128 .execute(&mut *tx)
129 .await?;
130
131 tx.commit().await?;
132
133 Ok(())
134 }
135
136 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
137 let row: Option<EventRow> = sqlx::query_as(
138 "WITH next_event AS (
139 SELECT e.id
140 FROM seesaw_events e
141 WHERE e.processed_at IS NULL
142 AND (e.locked_until IS NULL OR e.locked_until < NOW())
143 AND NOT EXISTS (
144 SELECT 1
145 FROM seesaw_events older
146 WHERE older.correlation_id = e.correlation_id
147 AND older.processed_at IS NULL
148 AND (
149 older.created_at < e.created_at
150 OR (older.created_at = e.created_at AND older.id < e.id)
151 )
152 )
153 ORDER BY e.created_at ASC, e.id ASC
154 LIMIT 1
155 FOR UPDATE SKIP LOCKED
156 )
157 UPDATE seesaw_events e
158 SET locked_until = NOW() + ($1 * INTERVAL '1 second')
159 FROM next_event
160 WHERE e.id = next_event.id
161 RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload, e.hops, e.created_at",
162 )
163 .bind(EVENT_CLAIM_SECONDS)
164 .fetch_optional(&self.pool)
165 .await?;
166
167 Ok(row.map(|r| QueuedEvent {
168 id: r.id,
169 event_id: r.event_id,
170 parent_id: r.parent_id,
171 correlation_id: r.correlation_id,
172 event_type: r.event_type,
173 payload: r.payload,
174 hops: r.hops,
175 created_at: r.created_at,
176 }))
177 }
178
179 async fn ack(&self, id: i64) -> Result<()> {
180 sqlx::query(
181 "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
182 )
183 .bind(id)
184 .execute(&self.pool)
185 .await?;
186 Ok(())
187 }
188
189 async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
190 let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
191 sqlx::query(
192 "UPDATE seesaw_events
193 SET retry_count = retry_count + 1,
194 locked_until = $2
195 WHERE id = $1",
196 )
197 .bind(id)
198 .bind(locked_until)
199 .execute(&self.pool)
200 .await?;
201 Ok(())
202 }
203
204 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
205 where
206 S: for<'de> Deserialize<'de> + Send,
207 {
208 let row: Option<StateRow> =
209 sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
210 .bind(correlation_id)
211 .fetch_optional(&self.pool)
212 .await?;
213
214 match row {
215 Some(r) => {
216 let state: S = serde_json::from_value(r.state)?;
217 Ok(Some((state, r.version)))
218 }
219 None => Ok(None),
220 }
221 }
222
223 async fn save_state<S>(
224 &self,
225 correlation_id: Uuid,
226 state: &S,
227 expected_version: i32,
228 ) -> Result<i32>
229 where
230 S: Serialize + Send + Sync,
231 {
232 let state_json = serde_json::to_value(state)?;
233 let new_version = expected_version + 1;
234
235 let result = sqlx::query(
236 "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
237 VALUES ($1, $2, $3, NOW())
238 ON CONFLICT (correlation_id) DO UPDATE
239 SET state = $2,
240 version = $3,
241 updated_at = NOW()
242 WHERE seesaw_state.version = $4",
243 )
244 .bind(correlation_id)
245 .bind(&state_json)
246 .bind(new_version)
247 .bind(expected_version)
248 .execute(&self.pool)
249 .await?;
250
251 if result.rows_affected() == 0 {
252 anyhow::bail!("Version conflict: state was modified concurrently");
253 }
254
255 Ok(new_version)
256 }
257
258 async fn insert_effect_intent(
259 &self,
260 event_id: Uuid,
261 effect_id: String,
262 correlation_id: Uuid,
263 event_type: String,
264 event_payload: serde_json::Value,
265 parent_event_id: Option<Uuid>,
266 execute_at: DateTime<Utc>,
267 timeout_seconds: i32,
268 max_attempts: i32,
269 priority: i32,
270 ) -> Result<()> {
271 sqlx::query(
272 "INSERT INTO seesaw_effect_executions (
273 event_id, effect_id, correlation_id, status,
274 event_type, event_payload, parent_event_id,
275 execute_at, timeout_seconds, max_attempts, priority
276 )
277 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10)",
278 )
279 .bind(event_id)
280 .bind(effect_id)
281 .bind(correlation_id)
282 .bind(event_type)
283 .bind(event_payload)
284 .bind(parent_event_id)
285 .bind(execute_at)
286 .bind(timeout_seconds)
287 .bind(max_attempts)
288 .bind(priority)
289 .execute(&self.pool)
290 .await?;
291
292 Ok(())
293 }
294
295 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
296 let row: Option<EffectRow> = sqlx::query_as(
297 "WITH next_effect AS (
298 SELECT event_id, effect_id
299 FROM seesaw_effect_executions
300 WHERE (
301 status = 'pending'
302 OR (status = 'failed' AND attempts < max_attempts)
303 )
304 AND execute_at <= NOW()
305 ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
306 LIMIT 1
307 FOR UPDATE SKIP LOCKED
308 )
309 UPDATE seesaw_effect_executions e
310 SET status = 'executing',
311 claimed_at = NOW(),
312 last_attempted_at = NOW(),
313 attempts = e.attempts + 1
314 FROM next_effect
315 WHERE e.event_id = next_effect.event_id
316 AND e.effect_id = next_effect.effect_id
317 RETURNING
318 e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
319 e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
320 )
321 .fetch_optional(&self.pool)
322 .await?;
323
324 if let Some(r) = row {
325 Ok(Some(QueuedEffectExecution {
326 event_id: r.event_id,
327 effect_id: r.effect_id,
328 correlation_id: r.correlation_id,
329 event_type: r.event_type,
330 event_payload: r.event_payload,
331 parent_event_id: r.parent_event_id,
332 execute_at: r.execute_at,
333 timeout_seconds: r.timeout_seconds,
334 max_attempts: r.max_attempts,
335 priority: r.priority,
336 attempts: r.attempts,
337 }))
338 } else {
339 Ok(None)
340 }
341 }
342
343 async fn complete_effect(
344 &self,
345 event_id: Uuid,
346 effect_id: String,
347 result: serde_json::Value,
348 ) -> Result<()> {
349 sqlx::query(
350 "UPDATE seesaw_effect_executions
351 SET status = 'completed',
352 result = $3,
353 completed_at = NOW()
354 WHERE event_id = $1 AND effect_id = $2",
355 )
356 .bind(event_id)
357 .bind(effect_id)
358 .bind(result)
359 .execute(&self.pool)
360 .await?;
361
362 Ok(())
363 }
364
365 async fn complete_effect_with_events(
366 &self,
367 event_id: Uuid,
368 effect_id: String,
369 result: serde_json::Value,
370 emitted_events: Vec<EmittedEvent>,
371 ) -> Result<()> {
372 let effect: EffectRow = sqlx::query_as(
374 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
375 execute_at, timeout_seconds, max_attempts, priority, attempts
376 FROM seesaw_effect_executions
377 WHERE event_id = $1 AND effect_id = $2",
378 )
379 .bind(event_id)
380 .bind(&effect_id)
381 .fetch_one(&self.pool)
382 .await?;
383
384 let parent: ParentEventRow = sqlx::query_as(
386 "SELECT hops, created_at
387 FROM seesaw_events
388 WHERE event_id = $1
389 ORDER BY created_at ASC, id ASC
390 LIMIT 1",
391 )
392 .bind(event_id)
393 .fetch_one(&self.pool)
394 .await?;
395
396 let mut tx = self.pool.begin().await?;
398
399 for emitted in emitted_events {
401 let deterministic_id = Uuid::new_v5(
403 &NAMESPACE_SEESAW,
404 format!("{}-{}-{}", event_id, effect_id, emitted.event_type).as_bytes(),
405 );
406
407 let deterministic_timestamp = emitted_event_created_at(parent.created_at);
410
411 sqlx::query(
413 "INSERT INTO seesaw_events (
414 event_id, parent_id, correlation_id, event_type, payload, hops, created_at
415 )
416 VALUES ($1, $2, $3, $4, $5, $6, $7)
417 ON CONFLICT (event_id, created_at) DO NOTHING",
418 )
419 .bind(deterministic_id)
420 .bind(Some(event_id))
421 .bind(effect.correlation_id)
422 .bind(&emitted.event_type)
423 .bind(emitted.payload)
424 .bind(parent.hops + 1)
425 .bind(deterministic_timestamp)
426 .execute(&mut *tx)
427 .await?;
428 }
429
430 sqlx::query(
432 "UPDATE seesaw_effect_executions
433 SET status = 'completed',
434 result = $3,
435 completed_at = NOW()
436 WHERE event_id = $1 AND effect_id = $2",
437 )
438 .bind(event_id)
439 .bind(effect_id)
440 .bind(result)
441 .execute(&mut *tx)
442 .await?;
443
444 tx.commit().await?;
446
447 Ok(())
448 }
449
450 async fn fail_effect(
451 &self,
452 event_id: Uuid,
453 effect_id: String,
454 error: String,
455 attempts: i32,
456 ) -> Result<()> {
457 let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
458 sqlx::query(
459 "UPDATE seesaw_effect_executions
460 SET status = 'failed',
461 error = $3,
462 execute_at = $5,
463 claimed_at = NULL
464 WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
465 )
466 .bind(event_id)
467 .bind(effect_id)
468 .bind(error)
469 .bind(attempts)
470 .bind(retry_at)
471 .execute(&self.pool)
472 .await?;
473
474 Ok(())
475 }
476
477 async fn dlq_effect(
478 &self,
479 event_id: Uuid,
480 effect_id: String,
481 error: String,
482 reason: String,
483 attempts: i32,
484 ) -> Result<()> {
485 let effect: EffectRow = sqlx::query_as(
487 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
488 execute_at, timeout_seconds, max_attempts, priority, attempts
489 FROM seesaw_effect_executions
490 WHERE event_id = $1 AND effect_id = $2",
491 )
492 .bind(event_id)
493 .bind(&effect_id)
494 .fetch_one(&self.pool)
495 .await?;
496
497 sqlx::query(
499 "INSERT INTO seesaw_dlq (
500 event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
501 )
502 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
503 )
504 .bind(event_id)
505 .bind(&effect_id)
506 .bind(effect.correlation_id)
507 .bind(error)
508 .bind(effect.event_type)
509 .bind(effect.event_payload)
510 .bind(reason)
511 .bind(attempts)
512 .execute(&self.pool)
513 .await?;
514
515 sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
517 .bind(event_id)
518 .bind(effect_id)
519 .execute(&self.pool)
520 .await?;
521
522 Ok(())
523 }
524
525 async fn subscribe_workflow_events(
526 &self,
527 correlation_id: Uuid,
528 ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
529 use futures::stream::StreamExt;
530 use sqlx::postgres::PgListener;
531
532 let channel = format!("seesaw_workflow_{}", correlation_id);
533
534 let mut listener = PgListener::connect_with(&self.pool).await?;
536 listener.listen(&channel).await?;
537
538 let stream = listener.into_stream().filter_map(|result| {
540 Box::pin(async move {
541 match result {
542 Ok(notification) => {
543 if let Ok(event) =
545 serde_json::from_str::<seesaw_core::WorkflowEvent>(notification.payload())
546 {
547 Some(event)
548 } else {
549 None
550 }
551 }
552 Err(_) => None,
553 }
554 })
555 });
556
557 Ok(Box::new(stream))
558 }
559
560 async fn get_workflow_status(
561 &self,
562 correlation_id: Uuid,
563 ) -> Result<seesaw_core::WorkflowStatus> {
564 let state = sqlx::query_as::<_, (serde_json::Value,)>(
565 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
566 )
567 .bind(correlation_id)
568 .fetch_optional(&self.pool)
569 .await?
570 .map(|r| r.0);
571
572 let pending_effects = sqlx::query_as::<_, (i64,)>(
573 "SELECT COUNT(*) FROM seesaw_effect_executions
574 WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
575 )
576 .bind(correlation_id)
577 .fetch_one(&self.pool)
578 .await?
579 .0;
580
581 let last_event = sqlx::query_as::<_, (String,)>(
582 "SELECT event_type FROM seesaw_events
583 WHERE correlation_id = $1
584 ORDER BY created_at DESC, id DESC
585 LIMIT 1",
586 )
587 .bind(correlation_id)
588 .fetch_optional(&self.pool)
589 .await?
590 .map(|r| r.0);
591
592 Ok(seesaw_core::WorkflowStatus {
593 correlation_id,
594 state,
595 pending_effects,
596 is_settled: pending_effects == 0,
597 last_event,
598 })
599 }
600}
601
602#[derive(FromRow)]
603struct StreamRow {
604 seq: i64,
605 stream_type: String,
606 correlation_id: Uuid,
607 event_id: Option<Uuid>,
608 effect_event_id: Option<Uuid>,
609 effect_id: Option<String>,
610 status: Option<String>,
611 error: Option<String>,
612 payload: Option<serde_json::Value>,
613 created_at: DateTime<Utc>,
614}
615
616#[async_trait]
617impl InsightStore for PostgresStore {
618 async fn subscribe_events(
619 &self,
620 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
621 use futures::stream::StreamExt;
622 use sqlx::postgres::PgListener;
623
624 let mut listener = PgListener::connect_with(&self.pool).await?;
626 listener.listen("seesaw_stream").await?;
627
628 let pool = self.pool.clone();
630 let stream = listener.into_stream().filter_map(move |result| {
631 let pool = pool.clone();
632 Box::pin(async move {
633 match result {
634 Ok(_notification) => {
635 if let Ok(row) = sqlx::query_as::<_, StreamRow>(
638 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
639 effect_id, status, error, payload, created_at
640 FROM seesaw_stream
641 ORDER BY seq DESC
642 LIMIT 1",
643 )
644 .fetch_one(&pool)
645 .await
646 {
647 Some(stream_row_to_insight_event(row))
648 } else {
649 None
650 }
651 }
652 Err(_) => None,
653 }
654 })
655 });
656
657 Ok(Box::new(stream))
658 }
659
660 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
661 let events = sqlx::query_as::<_, EventRow>(
663 "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops, created_at
664 FROM seesaw_events
665 WHERE correlation_id = $1
666 ORDER BY created_at ASC",
667 )
668 .bind(correlation_id)
669 .fetch_all(&self.pool)
670 .await?;
671
672 let effects = sqlx::query_as::<_, EffectTreeRow>(
674 "SELECT event_id, effect_id, status, result, error, attempts, created_at
675 FROM seesaw_effect_executions
676 WHERE correlation_id = $1
677 ORDER BY created_at ASC",
678 )
679 .bind(correlation_id)
680 .fetch_all(&self.pool)
681 .await?;
682
683 let roots = build_event_tree(&events, &effects, None);
685
686 let state = sqlx::query_as::<_, (serde_json::Value,)>(
688 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
689 )
690 .bind(correlation_id)
691 .fetch_optional(&self.pool)
692 .await?
693 .map(|r| r.0);
694
695 Ok(WorkflowTree {
696 correlation_id,
697 roots,
698 state,
699 event_count: events.len(),
700 effect_count: effects.len(),
701 })
702 }
703
704 async fn get_stats(&self) -> Result<InsightStats> {
705 let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
706 .fetch_one(&self.pool)
707 .await?
708 .0;
709
710 let active_effects = sqlx::query_as::<_, (i64,)>(
711 "SELECT COUNT(*) FROM seesaw_effect_executions
712 WHERE status IN ('pending', 'executing')",
713 )
714 .fetch_one(&self.pool)
715 .await?
716 .0;
717
718 let completed_effects = sqlx::query_as::<_, (i64,)>(
719 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
720 )
721 .fetch_one(&self.pool)
722 .await?
723 .0;
724
725 let failed_effects = sqlx::query_as::<_, (i64,)>(
726 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
727 )
728 .fetch_one(&self.pool)
729 .await?
730 .0;
731
732 Ok(InsightStats {
733 total_events,
734 active_effects,
735 completed_effects,
736 failed_effects,
737 })
738 }
739
740 async fn get_recent_events(
741 &self,
742 cursor: Option<i64>,
743 limit: usize,
744 ) -> Result<Vec<InsightEvent>> {
745 let rows = if let Some(cursor_seq) = cursor {
746 sqlx::query_as::<_, StreamRow>(
747 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
748 effect_id, status, error, payload, created_at
749 FROM seesaw_stream
750 WHERE seq > $1
751 ORDER BY seq ASC
752 LIMIT $2",
753 )
754 .bind(cursor_seq)
755 .bind(limit as i64)
756 .fetch_all(&self.pool)
757 .await?
758 } else {
759 sqlx::query_as::<_, StreamRow>(
760 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
761 effect_id, status, error, payload, created_at
762 FROM seesaw_stream
763 ORDER BY seq DESC
764 LIMIT $1",
765 )
766 .bind(limit as i64)
767 .fetch_all(&self.pool)
768 .await?
769 };
770
771 Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
772 }
773}
774
775#[derive(FromRow)]
776struct EffectTreeRow {
777 event_id: Uuid,
778 effect_id: String,
779 status: String,
780 result: Option<serde_json::Value>,
781 error: Option<String>,
782 attempts: i32,
783 created_at: DateTime<Utc>,
784}
785
786fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
787 let stream_type = match row.stream_type.as_str() {
788 "event_dispatched" => StreamType::EventDispatched,
789 "effect_started" => StreamType::EffectStarted,
790 "effect_completed" => StreamType::EffectCompleted,
791 "effect_failed" => StreamType::EffectFailed,
792 _ => StreamType::EventDispatched, };
794
795 let event_type = if stream_type == StreamType::EventDispatched {
797 row.payload
798 .as_ref()
799 .and_then(|p| p.get("event_type"))
800 .and_then(|v| v.as_str())
801 .map(|s| s.to_string())
802 } else {
803 None
804 };
805
806 InsightEvent {
807 seq: row.seq,
808 stream_type,
809 correlation_id: row.correlation_id,
810 event_id: row.event_id,
811 effect_event_id: row.effect_event_id,
812 effect_id: row.effect_id,
813 event_type,
814 status: row.status,
815 error: row.error,
816 payload: row.payload,
817 created_at: row.created_at,
818 }
819}
820
821fn build_event_tree(
822 events: &[EventRow],
823 effects: &[EffectTreeRow],
824 parent_id: Option<Uuid>,
825) -> Vec<EventNode> {
826 events
827 .iter()
828 .filter(|e| e.parent_id == parent_id)
829 .map(|event| {
830 let event_effects: Vec<EffectNode> = effects
832 .iter()
833 .filter(|eff| eff.event_id == event.event_id)
834 .map(|eff| EffectNode {
835 effect_id: eff.effect_id.clone(),
836 event_id: eff.event_id,
837 status: eff.status.clone(),
838 result: eff.result.clone(),
839 error: eff.error.clone(),
840 attempts: eff.attempts,
841 created_at: eff.created_at,
842 })
843 .collect();
844
845 let children = build_event_tree(events, effects, Some(event.event_id));
847
848 EventNode {
849 event_id: event.event_id,
850 event_type: event.event_type.clone(),
851 payload: event.payload.clone(),
852 created_at: event.created_at,
853 children,
854 effects: event_effects,
855 }
856 })
857 .collect()
858}
859
860#[cfg(test)]
861mod tests {
862 use super::*;
863 use chrono::{TimeZone, Timelike};
864
865 #[test]
866 fn emitted_event_created_at_is_midnight_on_parent_day() {
867 let parent = Utc
868 .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
869 .single()
870 .expect("valid timestamp");
871
872 let emitted = emitted_event_created_at(parent);
873
874 assert_eq!(emitted.date_naive(), parent.date_naive());
875 assert_eq!(emitted.hour(), 0);
876 assert_eq!(emitted.minute(), 0);
877 assert_eq!(emitted.second(), 0);
878 }
879
880 #[test]
881 fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
882 let first_parent = Utc
883 .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
884 .single()
885 .expect("valid timestamp");
886 let second_parent = Utc
887 .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
888 .single()
889 .expect("valid timestamp");
890
891 let first_emitted = emitted_event_created_at(first_parent);
892 let second_emitted = emitted_event_created_at(second_parent);
893
894 assert_eq!(first_emitted, second_emitted);
895 }
896
897 #[test]
898 fn effect_retry_delay_seconds_uses_exponential_backoff() {
899 assert_eq!(effect_retry_delay_seconds(1), 1);
900 assert_eq!(effect_retry_delay_seconds(2), 2);
901 assert_eq!(effect_retry_delay_seconds(3), 4);
902 assert_eq!(effect_retry_delay_seconds(4), 8);
903 }
904
905 #[test]
906 fn effect_retry_delay_seconds_is_capped() {
907 assert_eq!(effect_retry_delay_seconds(9), 256);
908 assert_eq!(effect_retry_delay_seconds(50), 256);
909 }
910}