1use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8 insight::*, EmittedEvent, QueuedEffectExecution, QueuedEvent, Store, NAMESPACE_SEESAW,
9};
10use serde::{Deserialize, Serialize};
11use sqlx::{FromRow, PgPool};
12use uuid::Uuid;
13
14const EVENT_CLAIM_SECONDS: i64 = 30;
15
16fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
17 parent_created_at
18 .date_naive()
19 .and_hms_opt(0, 0, 0)
20 .expect("midnight should always be a valid UTC timestamp")
21 .and_utc()
22}
23
24fn effect_retry_delay_seconds(attempts: i32) -> i64 {
25 let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
26 1_i64 << exponent
27}
28
29pub struct PostgresStore {
31 pool: PgPool,
32}
33
34impl PostgresStore {
35 pub fn new(pool: PgPool) -> Self {
36 Self { pool }
37 }
38
39 pub fn pool(&self) -> &PgPool {
40 &self.pool
41 }
42}
43
44impl Clone for PostgresStore {
45 fn clone(&self) -> Self {
46 Self {
47 pool: self.pool.clone(),
48 }
49 }
50}
51
52#[derive(FromRow)]
53struct EventRow {
54 id: i64,
55 event_id: Uuid,
56 parent_id: Option<Uuid>,
57 correlation_id: Uuid,
58 event_type: String,
59 payload: serde_json::Value,
60 hops: i32,
61 created_at: DateTime<Utc>,
62}
63
64#[derive(FromRow)]
65struct StateRow {
66 state: serde_json::Value,
67 version: i32,
68}
69
70#[derive(FromRow)]
71struct EffectRow {
72 event_id: Uuid,
73 effect_id: String,
74 correlation_id: Uuid,
75 event_type: String,
76 event_payload: serde_json::Value,
77 parent_event_id: Option<Uuid>,
78 execute_at: DateTime<Utc>,
79 timeout_seconds: i32,
80 max_attempts: i32,
81 priority: i32,
82 attempts: i32,
83}
84
85#[derive(FromRow)]
86struct ParentEventRow {
87 hops: i32,
88 created_at: DateTime<Utc>,
89}
90
91#[async_trait]
92impl Store for PostgresStore {
93 async fn publish(&self, event: QueuedEvent) -> Result<()> {
94 let mut tx = self.pool.begin().await?;
95
96 let inserted: Option<Uuid> = sqlx::query_scalar(
99 "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
100 VALUES ($1, $2, $3)
101 ON CONFLICT (event_id) DO NOTHING
102 RETURNING event_id",
103 )
104 .bind(event.event_id)
105 .bind(event.correlation_id)
106 .bind(event.created_at)
107 .fetch_optional(&mut *tx)
108 .await?;
109
110 if inserted.is_none() {
111 tx.commit().await?;
112 return Ok(());
113 }
114
115 sqlx::query(
116 "INSERT INTO seesaw_events (
117 event_id, parent_id, correlation_id, event_type, payload, hops, created_at
118 )
119 VALUES ($1, $2, $3, $4, $5, $6, $7)",
120 )
121 .bind(event.event_id)
122 .bind(event.parent_id)
123 .bind(event.correlation_id)
124 .bind(event.event_type)
125 .bind(event.payload)
126 .bind(event.hops)
127 .bind(event.created_at)
128 .execute(&mut *tx)
129 .await?;
130
131 tx.commit().await?;
132
133 Ok(())
134 }
135
136 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
137 let row: Option<EventRow> = sqlx::query_as(
138 "WITH next_event AS (
139 SELECT e.id
140 FROM seesaw_events e
141 WHERE e.processed_at IS NULL
142 AND (e.locked_until IS NULL OR e.locked_until < NOW())
143 AND NOT EXISTS (
144 SELECT 1
145 FROM seesaw_events older
146 WHERE older.correlation_id = e.correlation_id
147 AND older.processed_at IS NULL
148 AND (
149 older.created_at < e.created_at
150 OR (older.created_at = e.created_at AND older.id < e.id)
151 )
152 )
153 ORDER BY e.created_at ASC, e.id ASC
154 LIMIT 1
155 FOR UPDATE SKIP LOCKED
156 )
157 UPDATE seesaw_events e
158 SET locked_until = NOW() + ($1 * INTERVAL '1 second')
159 FROM next_event
160 WHERE e.id = next_event.id
161 RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload, e.hops, e.created_at",
162 )
163 .bind(EVENT_CLAIM_SECONDS)
164 .fetch_optional(&self.pool)
165 .await?;
166
167 Ok(row.map(|r| QueuedEvent {
168 id: r.id,
169 event_id: r.event_id,
170 parent_id: r.parent_id,
171 correlation_id: r.correlation_id,
172 event_type: r.event_type,
173 payload: r.payload,
174 hops: r.hops,
175 created_at: r.created_at,
176 }))
177 }
178
179 async fn ack(&self, id: i64) -> Result<()> {
180 sqlx::query(
181 "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
182 )
183 .bind(id)
184 .execute(&self.pool)
185 .await?;
186 Ok(())
187 }
188
189 async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
190 let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
191 sqlx::query(
192 "UPDATE seesaw_events
193 SET retry_count = retry_count + 1,
194 locked_until = $2
195 WHERE id = $1",
196 )
197 .bind(id)
198 .bind(locked_until)
199 .execute(&self.pool)
200 .await?;
201 Ok(())
202 }
203
204 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
205 where
206 S: for<'de> Deserialize<'de> + Send,
207 {
208 let row: Option<StateRow> =
209 sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
210 .bind(correlation_id)
211 .fetch_optional(&self.pool)
212 .await?;
213
214 match row {
215 Some(r) => {
216 let state: S = serde_json::from_value(r.state)?;
217 Ok(Some((state, r.version)))
218 }
219 None => Ok(None),
220 }
221 }
222
223 async fn save_state<S>(
224 &self,
225 correlation_id: Uuid,
226 state: &S,
227 expected_version: i32,
228 ) -> Result<i32>
229 where
230 S: Serialize + Send + Sync,
231 {
232 let state_json = serde_json::to_value(state)?;
233 let new_version = expected_version + 1;
234
235 let result = sqlx::query(
236 "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
237 VALUES ($1, $2, $3, NOW())
238 ON CONFLICT (correlation_id) DO UPDATE
239 SET state = $2,
240 version = $3,
241 updated_at = NOW()
242 WHERE seesaw_state.version = $4",
243 )
244 .bind(correlation_id)
245 .bind(&state_json)
246 .bind(new_version)
247 .bind(expected_version)
248 .execute(&self.pool)
249 .await?;
250
251 if result.rows_affected() == 0 {
252 anyhow::bail!("Version conflict: state was modified concurrently");
253 }
254
255 Ok(new_version)
256 }
257
258 async fn insert_effect_intent(
259 &self,
260 event_id: Uuid,
261 effect_id: String,
262 correlation_id: Uuid,
263 event_type: String,
264 event_payload: serde_json::Value,
265 parent_event_id: Option<Uuid>,
266 execute_at: DateTime<Utc>,
267 timeout_seconds: i32,
268 max_attempts: i32,
269 priority: i32,
270 ) -> Result<()> {
271 sqlx::query(
272 "INSERT INTO seesaw_effect_executions (
273 event_id, effect_id, correlation_id, status,
274 event_type, event_payload, parent_event_id,
275 execute_at, timeout_seconds, max_attempts, priority
276 )
277 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10)",
278 )
279 .bind(event_id)
280 .bind(effect_id)
281 .bind(correlation_id)
282 .bind(event_type)
283 .bind(event_payload)
284 .bind(parent_event_id)
285 .bind(execute_at)
286 .bind(timeout_seconds)
287 .bind(max_attempts)
288 .bind(priority)
289 .execute(&self.pool)
290 .await?;
291
292 Ok(())
293 }
294
295 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
296 let row: Option<EffectRow> = sqlx::query_as(
297 "WITH next_effect AS (
298 SELECT event_id, effect_id
299 FROM seesaw_effect_executions
300 WHERE (
301 status = 'pending'
302 OR (status = 'failed' AND attempts < max_attempts)
303 )
304 AND execute_at <= NOW()
305 ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
306 LIMIT 1
307 FOR UPDATE SKIP LOCKED
308 )
309 UPDATE seesaw_effect_executions e
310 SET status = 'executing',
311 claimed_at = NOW(),
312 last_attempted_at = NOW(),
313 attempts = e.attempts + 1
314 FROM next_effect
315 WHERE e.event_id = next_effect.event_id
316 AND e.effect_id = next_effect.effect_id
317 RETURNING
318 e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
319 e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
320 )
321 .fetch_optional(&self.pool)
322 .await?;
323
324 if let Some(r) = row {
325 Ok(Some(QueuedEffectExecution {
326 event_id: r.event_id,
327 effect_id: r.effect_id,
328 correlation_id: r.correlation_id,
329 event_type: r.event_type,
330 event_payload: r.event_payload,
331 parent_event_id: r.parent_event_id,
332 execute_at: r.execute_at,
333 timeout_seconds: r.timeout_seconds,
334 max_attempts: r.max_attempts,
335 priority: r.priority,
336 attempts: r.attempts,
337 }))
338 } else {
339 Ok(None)
340 }
341 }
342
343 async fn complete_effect(
344 &self,
345 event_id: Uuid,
346 effect_id: String,
347 result: serde_json::Value,
348 ) -> Result<()> {
349 sqlx::query(
350 "UPDATE seesaw_effect_executions
351 SET status = 'completed',
352 result = $3,
353 completed_at = NOW()
354 WHERE event_id = $1 AND effect_id = $2",
355 )
356 .bind(event_id)
357 .bind(effect_id)
358 .bind(result)
359 .execute(&self.pool)
360 .await?;
361
362 Ok(())
363 }
364
365 async fn complete_effect_with_events(
366 &self,
367 event_id: Uuid,
368 effect_id: String,
369 result: serde_json::Value,
370 emitted_events: Vec<EmittedEvent>,
371 ) -> Result<()> {
372 let effect: EffectRow = sqlx::query_as(
374 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
375 execute_at, timeout_seconds, max_attempts, priority, attempts
376 FROM seesaw_effect_executions
377 WHERE event_id = $1 AND effect_id = $2",
378 )
379 .bind(event_id)
380 .bind(&effect_id)
381 .fetch_one(&self.pool)
382 .await?;
383
384 let parent: ParentEventRow = sqlx::query_as(
386 "SELECT hops, created_at
387 FROM seesaw_events
388 WHERE event_id = $1
389 ORDER BY created_at ASC, id ASC
390 LIMIT 1",
391 )
392 .bind(event_id)
393 .fetch_one(&self.pool)
394 .await?;
395
396 let mut tx = self.pool.begin().await?;
398
399 for emitted in emitted_events {
401 let deterministic_id = Uuid::new_v5(
403 &NAMESPACE_SEESAW,
404 format!("{}-{}-{}", event_id, effect_id, emitted.event_type).as_bytes(),
405 );
406
407 let deterministic_timestamp = emitted_event_created_at(parent.created_at);
410
411 sqlx::query(
413 "INSERT INTO seesaw_events (
414 event_id, parent_id, correlation_id, event_type, payload, hops, created_at
415 )
416 VALUES ($1, $2, $3, $4, $5, $6, $7)
417 ON CONFLICT (event_id, created_at) DO NOTHING",
418 )
419 .bind(deterministic_id)
420 .bind(Some(event_id))
421 .bind(effect.correlation_id)
422 .bind(&emitted.event_type)
423 .bind(emitted.payload)
424 .bind(parent.hops + 1)
425 .bind(deterministic_timestamp)
426 .execute(&mut *tx)
427 .await?;
428 }
429
430 sqlx::query(
432 "UPDATE seesaw_effect_executions
433 SET status = 'completed',
434 result = $3,
435 completed_at = NOW()
436 WHERE event_id = $1 AND effect_id = $2",
437 )
438 .bind(event_id)
439 .bind(effect_id)
440 .bind(result)
441 .execute(&mut *tx)
442 .await?;
443
444 tx.commit().await?;
446
447 Ok(())
448 }
449
450 async fn fail_effect(
451 &self,
452 event_id: Uuid,
453 effect_id: String,
454 error: String,
455 attempts: i32,
456 ) -> Result<()> {
457 let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
458 sqlx::query(
459 "UPDATE seesaw_effect_executions
460 SET status = 'failed',
461 error = $3,
462 execute_at = $5,
463 claimed_at = NULL
464 WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
465 )
466 .bind(event_id)
467 .bind(effect_id)
468 .bind(error)
469 .bind(attempts)
470 .bind(retry_at)
471 .execute(&self.pool)
472 .await?;
473
474 Ok(())
475 }
476
477 async fn dlq_effect(
478 &self,
479 event_id: Uuid,
480 effect_id: String,
481 error: String,
482 reason: String,
483 attempts: i32,
484 ) -> Result<()> {
485 let effect: EffectRow = sqlx::query_as(
487 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
488 execute_at, timeout_seconds, max_attempts, priority, attempts
489 FROM seesaw_effect_executions
490 WHERE event_id = $1 AND effect_id = $2",
491 )
492 .bind(event_id)
493 .bind(&effect_id)
494 .fetch_one(&self.pool)
495 .await?;
496
497 sqlx::query(
499 "INSERT INTO seesaw_dlq (
500 event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
501 )
502 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
503 )
504 .bind(event_id)
505 .bind(&effect_id)
506 .bind(effect.correlation_id)
507 .bind(error)
508 .bind(effect.event_type)
509 .bind(effect.event_payload)
510 .bind(reason)
511 .bind(attempts)
512 .execute(&self.pool)
513 .await?;
514
515 sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
517 .bind(event_id)
518 .bind(effect_id)
519 .execute(&self.pool)
520 .await?;
521
522 Ok(())
523 }
524
525 async fn subscribe_workflow_events(
526 &self,
527 correlation_id: Uuid,
528 ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
529 use futures::stream::StreamExt;
530 use sqlx::postgres::PgListener;
531
532 let channel = format!("seesaw_workflow_{}", correlation_id);
533
534 let mut listener = PgListener::connect_with(&self.pool).await?;
536 listener.listen(&channel).await?;
537
538 let pool = self.pool.clone();
539
540 let stream = listener.into_stream().filter_map(move |result| {
542 let pool = pool.clone();
543 Box::pin(async move {
544 match result {
545 Ok(notification) => {
546 #[derive(serde::Deserialize)]
548 struct NotificationMeta {
549 event_id: Uuid,
550 correlation_id: Uuid,
551 event_type: String,
552 }
553
554 let meta = serde_json::from_str::<NotificationMeta>(notification.payload()).ok()?;
555
556 sqlx::query_as::<_, (Uuid, Uuid, String, serde_json::Value)>(
558 "SELECT event_id, correlation_id, event_type, payload
559 FROM seesaw_events
560 WHERE event_id = $1"
561 )
562 .bind(meta.event_id)
563 .fetch_optional(&pool)
564 .await
565 .ok()?
566 .map(|(event_id, correlation_id, event_type, payload)| {
567 seesaw_core::WorkflowEvent {
568 event_id,
569 correlation_id,
570 event_type,
571 payload,
572 }
573 })
574 }
575 Err(_) => None,
576 }
577 })
578 });
579
580 Ok(Box::new(stream))
581 }
582
583 async fn get_workflow_status(
584 &self,
585 correlation_id: Uuid,
586 ) -> Result<seesaw_core::WorkflowStatus> {
587 let state = sqlx::query_as::<_, (serde_json::Value,)>(
588 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
589 )
590 .bind(correlation_id)
591 .fetch_optional(&self.pool)
592 .await?
593 .map(|r| r.0);
594
595 let pending_effects = sqlx::query_as::<_, (i64,)>(
596 "SELECT COUNT(*) FROM seesaw_effect_executions
597 WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
598 )
599 .bind(correlation_id)
600 .fetch_one(&self.pool)
601 .await?
602 .0;
603
604 let last_event = sqlx::query_as::<_, (String,)>(
605 "SELECT event_type FROM seesaw_events
606 WHERE correlation_id = $1
607 ORDER BY created_at DESC, id DESC
608 LIMIT 1",
609 )
610 .bind(correlation_id)
611 .fetch_optional(&self.pool)
612 .await?
613 .map(|r| r.0);
614
615 Ok(seesaw_core::WorkflowStatus {
616 correlation_id,
617 state,
618 pending_effects,
619 is_settled: pending_effects == 0,
620 last_event,
621 })
622 }
623}
624
625#[derive(FromRow)]
626struct StreamRow {
627 seq: i64,
628 stream_type: String,
629 correlation_id: Uuid,
630 event_id: Option<Uuid>,
631 effect_event_id: Option<Uuid>,
632 effect_id: Option<String>,
633 status: Option<String>,
634 error: Option<String>,
635 payload: Option<serde_json::Value>,
636 created_at: DateTime<Utc>,
637}
638
639#[async_trait]
640impl InsightStore for PostgresStore {
641 async fn subscribe_events(
642 &self,
643 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
644 use futures::stream::StreamExt;
645 use sqlx::postgres::PgListener;
646
647 let mut listener = PgListener::connect_with(&self.pool).await?;
649 listener.listen("seesaw_stream").await?;
650
651 let pool = self.pool.clone();
653 let stream = listener.into_stream().filter_map(move |result| {
654 let pool = pool.clone();
655 Box::pin(async move {
656 match result {
657 Ok(_notification) => {
658 if let Ok(row) = sqlx::query_as::<_, StreamRow>(
661 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
662 effect_id, status, error, payload, created_at
663 FROM seesaw_stream
664 ORDER BY seq DESC
665 LIMIT 1",
666 )
667 .fetch_one(&pool)
668 .await
669 {
670 Some(stream_row_to_insight_event(row))
671 } else {
672 None
673 }
674 }
675 Err(_) => None,
676 }
677 })
678 });
679
680 Ok(Box::new(stream))
681 }
682
683 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
684 let events = sqlx::query_as::<_, EventRow>(
686 "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops, created_at
687 FROM seesaw_events
688 WHERE correlation_id = $1
689 ORDER BY created_at ASC",
690 )
691 .bind(correlation_id)
692 .fetch_all(&self.pool)
693 .await?;
694
695 let effects = sqlx::query_as::<_, EffectTreeRow>(
697 "SELECT event_id, effect_id, status, result, error, attempts, created_at
698 FROM seesaw_effect_executions
699 WHERE correlation_id = $1
700 ORDER BY created_at ASC",
701 )
702 .bind(correlation_id)
703 .fetch_all(&self.pool)
704 .await?;
705
706 let roots = build_event_tree(&events, &effects, None);
708
709 let state = sqlx::query_as::<_, (serde_json::Value,)>(
711 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
712 )
713 .bind(correlation_id)
714 .fetch_optional(&self.pool)
715 .await?
716 .map(|r| r.0);
717
718 Ok(WorkflowTree {
719 correlation_id,
720 roots,
721 state,
722 event_count: events.len(),
723 effect_count: effects.len(),
724 })
725 }
726
727 async fn get_stats(&self) -> Result<InsightStats> {
728 let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
729 .fetch_one(&self.pool)
730 .await?
731 .0;
732
733 let active_effects = sqlx::query_as::<_, (i64,)>(
734 "SELECT COUNT(*) FROM seesaw_effect_executions
735 WHERE status IN ('pending', 'executing')",
736 )
737 .fetch_one(&self.pool)
738 .await?
739 .0;
740
741 let completed_effects = sqlx::query_as::<_, (i64,)>(
742 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
743 )
744 .fetch_one(&self.pool)
745 .await?
746 .0;
747
748 let failed_effects = sqlx::query_as::<_, (i64,)>(
749 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
750 )
751 .fetch_one(&self.pool)
752 .await?
753 .0;
754
755 Ok(InsightStats {
756 total_events,
757 active_effects,
758 completed_effects,
759 failed_effects,
760 })
761 }
762
763 async fn get_recent_events(
764 &self,
765 cursor: Option<i64>,
766 limit: usize,
767 ) -> Result<Vec<InsightEvent>> {
768 let rows = if let Some(cursor_seq) = cursor {
769 sqlx::query_as::<_, StreamRow>(
770 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
771 effect_id, status, error, payload, created_at
772 FROM seesaw_stream
773 WHERE seq > $1
774 ORDER BY seq ASC
775 LIMIT $2",
776 )
777 .bind(cursor_seq)
778 .bind(limit as i64)
779 .fetch_all(&self.pool)
780 .await?
781 } else {
782 sqlx::query_as::<_, StreamRow>(
783 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
784 effect_id, status, error, payload, created_at
785 FROM seesaw_stream
786 ORDER BY seq DESC
787 LIMIT $1",
788 )
789 .bind(limit as i64)
790 .fetch_all(&self.pool)
791 .await?
792 };
793
794 Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
795 }
796}
797
798#[derive(FromRow)]
799struct EffectTreeRow {
800 event_id: Uuid,
801 effect_id: String,
802 status: String,
803 result: Option<serde_json::Value>,
804 error: Option<String>,
805 attempts: i32,
806 created_at: DateTime<Utc>,
807}
808
809fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
810 let stream_type = match row.stream_type.as_str() {
811 "event_dispatched" => StreamType::EventDispatched,
812 "effect_started" => StreamType::EffectStarted,
813 "effect_completed" => StreamType::EffectCompleted,
814 "effect_failed" => StreamType::EffectFailed,
815 _ => StreamType::EventDispatched, };
817
818 let event_type = if stream_type == StreamType::EventDispatched {
820 row.payload
821 .as_ref()
822 .and_then(|p| p.get("event_type"))
823 .and_then(|v| v.as_str())
824 .map(|s| s.to_string())
825 } else {
826 None
827 };
828
829 InsightEvent {
830 seq: row.seq,
831 stream_type,
832 correlation_id: row.correlation_id,
833 event_id: row.event_id,
834 effect_event_id: row.effect_event_id,
835 effect_id: row.effect_id,
836 event_type,
837 status: row.status,
838 error: row.error,
839 payload: row.payload,
840 created_at: row.created_at,
841 }
842}
843
844fn build_event_tree(
845 events: &[EventRow],
846 effects: &[EffectTreeRow],
847 parent_id: Option<Uuid>,
848) -> Vec<EventNode> {
849 events
850 .iter()
851 .filter(|e| e.parent_id == parent_id)
852 .map(|event| {
853 let event_effects: Vec<EffectNode> = effects
855 .iter()
856 .filter(|eff| eff.event_id == event.event_id)
857 .map(|eff| EffectNode {
858 effect_id: eff.effect_id.clone(),
859 event_id: eff.event_id,
860 status: eff.status.clone(),
861 result: eff.result.clone(),
862 error: eff.error.clone(),
863 attempts: eff.attempts,
864 created_at: eff.created_at,
865 })
866 .collect();
867
868 let children = build_event_tree(events, effects, Some(event.event_id));
870
871 EventNode {
872 event_id: event.event_id,
873 event_type: event.event_type.clone(),
874 payload: event.payload.clone(),
875 created_at: event.created_at,
876 children,
877 effects: event_effects,
878 }
879 })
880 .collect()
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886 use chrono::{TimeZone, Timelike};
887
888 #[test]
889 fn emitted_event_created_at_is_midnight_on_parent_day() {
890 let parent = Utc
891 .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
892 .single()
893 .expect("valid timestamp");
894
895 let emitted = emitted_event_created_at(parent);
896
897 assert_eq!(emitted.date_naive(), parent.date_naive());
898 assert_eq!(emitted.hour(), 0);
899 assert_eq!(emitted.minute(), 0);
900 assert_eq!(emitted.second(), 0);
901 }
902
903 #[test]
904 fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
905 let first_parent = Utc
906 .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
907 .single()
908 .expect("valid timestamp");
909 let second_parent = Utc
910 .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
911 .single()
912 .expect("valid timestamp");
913
914 let first_emitted = emitted_event_created_at(first_parent);
915 let second_emitted = emitted_event_created_at(second_parent);
916
917 assert_eq!(first_emitted, second_emitted);
918 }
919
920 #[test]
921 fn effect_retry_delay_seconds_uses_exponential_backoff() {
922 assert_eq!(effect_retry_delay_seconds(1), 1);
923 assert_eq!(effect_retry_delay_seconds(2), 2);
924 assert_eq!(effect_retry_delay_seconds(3), 4);
925 assert_eq!(effect_retry_delay_seconds(4), 8);
926 }
927
928 #[test]
929 fn effect_retry_delay_seconds_is_capped() {
930 assert_eq!(effect_retry_delay_seconds(9), 256);
931 assert_eq!(effect_retry_delay_seconds(50), 256);
932 }
933}