1use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use seesaw_core::{
8 insight::*, EmittedEvent, JoinEntry, QueuedEffectExecution, QueuedEvent, Store,
9 NAMESPACE_SEESAW,
10};
11use serde::{Deserialize, Serialize};
12use sqlx::{FromRow, PgPool};
13use std::collections::HashSet;
14use uuid::Uuid;
15
16const EVENT_CLAIM_SECONDS: i64 = 30;
17
18fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
19 parent_created_at
20 .date_naive()
21 .and_hms_opt(0, 0, 0)
22 .expect("midnight should always be a valid UTC timestamp")
23 .and_utc()
24}
25
26fn effect_retry_delay_seconds(attempts: i32) -> i64 {
27 let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
28 1_i64 << exponent
29}
30
31pub struct PostgresStore {
33 pool: PgPool,
34}
35
36impl PostgresStore {
37 pub fn new(pool: PgPool) -> Self {
38 Self { pool }
39 }
40
41 pub fn pool(&self) -> &PgPool {
42 &self.pool
43 }
44}
45
46impl Clone for PostgresStore {
47 fn clone(&self) -> Self {
48 Self {
49 pool: self.pool.clone(),
50 }
51 }
52}
53
54#[derive(FromRow)]
55struct EventRow {
56 id: i64,
57 event_id: Uuid,
58 parent_id: Option<Uuid>,
59 correlation_id: Uuid,
60 event_type: String,
61 payload: serde_json::Value,
62 hops: i32,
63 batch_id: Option<Uuid>,
64 batch_index: Option<i32>,
65 batch_size: Option<i32>,
66 created_at: DateTime<Utc>,
67}
68
69#[derive(FromRow)]
70struct StateRow {
71 state: serde_json::Value,
72 version: i32,
73}
74
75#[derive(FromRow)]
76struct EffectRow {
77 event_id: Uuid,
78 effect_id: String,
79 correlation_id: Uuid,
80 event_type: String,
81 event_payload: serde_json::Value,
82 parent_event_id: Option<Uuid>,
83 batch_id: Option<Uuid>,
84 batch_index: Option<i32>,
85 batch_size: Option<i32>,
86 execute_at: DateTime<Utc>,
87 timeout_seconds: i32,
88 max_attempts: i32,
89 priority: i32,
90 attempts: i32,
91}
92
93#[derive(FromRow)]
94struct ParentEventRow {
95 hops: i32,
96 created_at: DateTime<Utc>,
97}
98
99#[derive(FromRow)]
100struct WorkflowEventRow {
101 id: i64,
102 event_id: Uuid,
103 correlation_id: Uuid,
104 event_type: String,
105 payload: serde_json::Value,
106 created_at: DateTime<Utc>,
107}
108
109#[async_trait]
110impl Store for PostgresStore {
111 async fn publish(&self, event: QueuedEvent) -> Result<()> {
112 let mut tx = self.pool.begin().await?;
113
114 let inserted: Option<Uuid> = sqlx::query_scalar(
117 "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
118 VALUES ($1, $2, $3)
119 ON CONFLICT (event_id) DO NOTHING
120 RETURNING event_id",
121 )
122 .bind(event.event_id)
123 .bind(event.correlation_id)
124 .bind(event.created_at)
125 .fetch_optional(&mut *tx)
126 .await?;
127
128 if inserted.is_none() {
129 tx.commit().await?;
130 return Ok(());
131 }
132
133 sqlx::query(
134 "INSERT INTO seesaw_events (
135 event_id, parent_id, correlation_id, event_type, payload, hops,
136 batch_id, batch_index, batch_size, created_at
137 )
138 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
139 )
140 .bind(event.event_id)
141 .bind(event.parent_id)
142 .bind(event.correlation_id)
143 .bind(event.event_type)
144 .bind(event.payload)
145 .bind(event.hops)
146 .bind(event.batch_id)
147 .bind(event.batch_index)
148 .bind(event.batch_size)
149 .bind(event.created_at)
150 .execute(&mut *tx)
151 .await?;
152
153 tx.commit().await?;
154
155 Ok(())
156 }
157
158 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
159 let row: Option<EventRow> = sqlx::query_as(
160 "WITH next_event AS (
161 SELECT e.id
162 FROM seesaw_events e
163 WHERE e.processed_at IS NULL
164 AND (e.locked_until IS NULL OR e.locked_until < NOW())
165 AND NOT EXISTS (
166 SELECT 1
167 FROM seesaw_events older
168 WHERE older.correlation_id = e.correlation_id
169 AND older.processed_at IS NULL
170 AND (
171 older.created_at < e.created_at
172 OR (older.created_at = e.created_at AND older.id < e.id)
173 )
174 )
175 ORDER BY e.created_at ASC, e.id ASC
176 LIMIT 1
177 FOR UPDATE SKIP LOCKED
178 )
179 UPDATE seesaw_events e
180 SET locked_until = NOW() + ($1 * INTERVAL '1 second')
181 FROM next_event
182 WHERE e.id = next_event.id
183 RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload,
184 e.hops, e.batch_id, e.batch_index, e.batch_size, e.created_at",
185 )
186 .bind(EVENT_CLAIM_SECONDS)
187 .fetch_optional(&self.pool)
188 .await?;
189
190 Ok(row.map(|r| QueuedEvent {
191 id: r.id,
192 event_id: r.event_id,
193 parent_id: r.parent_id,
194 correlation_id: r.correlation_id,
195 event_type: r.event_type,
196 payload: r.payload,
197 hops: r.hops,
198 batch_id: r.batch_id,
199 batch_index: r.batch_index,
200 batch_size: r.batch_size,
201 created_at: r.created_at,
202 }))
203 }
204
205 async fn ack(&self, id: i64) -> Result<()> {
206 sqlx::query(
207 "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
208 )
209 .bind(id)
210 .execute(&self.pool)
211 .await?;
212 Ok(())
213 }
214
215 async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
216 let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
217 sqlx::query(
218 "UPDATE seesaw_events
219 SET retry_count = retry_count + 1,
220 locked_until = $2
221 WHERE id = $1",
222 )
223 .bind(id)
224 .bind(locked_until)
225 .execute(&self.pool)
226 .await?;
227 Ok(())
228 }
229
230 async fn load_state<S>(&self, correlation_id: Uuid) -> Result<Option<(S, i32)>>
231 where
232 S: for<'de> Deserialize<'de> + Send,
233 {
234 let row: Option<StateRow> =
235 sqlx::query_as("SELECT state, version FROM seesaw_state WHERE correlation_id = $1")
236 .bind(correlation_id)
237 .fetch_optional(&self.pool)
238 .await?;
239
240 match row {
241 Some(r) => {
242 let state: S = serde_json::from_value(r.state)?;
243 Ok(Some((state, r.version)))
244 }
245 None => Ok(None),
246 }
247 }
248
249 async fn save_state<S>(
250 &self,
251 correlation_id: Uuid,
252 state: &S,
253 expected_version: i32,
254 ) -> Result<i32>
255 where
256 S: Serialize + Send + Sync,
257 {
258 let state_json = serde_json::to_value(state)?;
259 let new_version = expected_version + 1;
260
261 let result = sqlx::query(
262 "INSERT INTO seesaw_state (correlation_id, state, version, updated_at)
263 VALUES ($1, $2, $3, NOW())
264 ON CONFLICT (correlation_id) DO UPDATE
265 SET state = $2,
266 version = $3,
267 updated_at = NOW()
268 WHERE seesaw_state.version = $4",
269 )
270 .bind(correlation_id)
271 .bind(&state_json)
272 .bind(new_version)
273 .bind(expected_version)
274 .execute(&self.pool)
275 .await?;
276
277 if result.rows_affected() == 0 {
278 anyhow::bail!("Version conflict: state was modified concurrently");
279 }
280
281 Ok(new_version)
282 }
283
284 async fn insert_effect_intent(
285 &self,
286 event_id: Uuid,
287 effect_id: String,
288 correlation_id: Uuid,
289 event_type: String,
290 event_payload: serde_json::Value,
291 parent_event_id: Option<Uuid>,
292 batch_id: Option<Uuid>,
293 batch_index: Option<i32>,
294 batch_size: Option<i32>,
295 execute_at: DateTime<Utc>,
296 timeout_seconds: i32,
297 max_attempts: i32,
298 priority: i32,
299 ) -> Result<()> {
300 sqlx::query(
301 "INSERT INTO seesaw_effect_executions (
302 event_id, effect_id, correlation_id, status,
303 event_type, event_payload, parent_event_id,
304 batch_id, batch_index, batch_size,
305 execute_at, timeout_seconds, max_attempts, priority
306 )
307 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
308 )
309 .bind(event_id)
310 .bind(effect_id)
311 .bind(correlation_id)
312 .bind(event_type)
313 .bind(event_payload)
314 .bind(parent_event_id)
315 .bind(batch_id)
316 .bind(batch_index)
317 .bind(batch_size)
318 .bind(execute_at)
319 .bind(timeout_seconds)
320 .bind(max_attempts)
321 .bind(priority)
322 .execute(&self.pool)
323 .await?;
324
325 Ok(())
326 }
327
328 async fn poll_next_effect(&self) -> Result<Option<QueuedEffectExecution>> {
329 let row: Option<EffectRow> = sqlx::query_as(
330 "WITH next_effect AS (
331 SELECT event_id, effect_id
332 FROM seesaw_effect_executions
333 WHERE (
334 status = 'pending'
335 OR (status = 'failed' AND attempts < max_attempts)
336 )
337 AND execute_at <= NOW()
338 ORDER BY priority ASC, execute_at ASC, event_id ASC, effect_id ASC
339 LIMIT 1
340 FOR UPDATE SKIP LOCKED
341 )
342 UPDATE seesaw_effect_executions e
343 SET status = 'executing',
344 claimed_at = NOW(),
345 last_attempted_at = NOW(),
346 attempts = e.attempts + 1
347 FROM next_effect
348 WHERE e.event_id = next_effect.event_id
349 AND e.effect_id = next_effect.effect_id
350 RETURNING
351 e.event_id, e.effect_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
352 e.batch_id, e.batch_index, e.batch_size,
353 e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts",
354 )
355 .fetch_optional(&self.pool)
356 .await?;
357
358 if let Some(r) = row {
359 Ok(Some(QueuedEffectExecution {
360 event_id: r.event_id,
361 effect_id: r.effect_id,
362 correlation_id: r.correlation_id,
363 event_type: r.event_type,
364 event_payload: r.event_payload,
365 parent_event_id: r.parent_event_id,
366 batch_id: r.batch_id,
367 batch_index: r.batch_index,
368 batch_size: r.batch_size,
369 execute_at: r.execute_at,
370 timeout_seconds: r.timeout_seconds,
371 max_attempts: r.max_attempts,
372 priority: r.priority,
373 attempts: r.attempts,
374 }))
375 } else {
376 Ok(None)
377 }
378 }
379
380 async fn complete_effect(
381 &self,
382 event_id: Uuid,
383 effect_id: String,
384 result: serde_json::Value,
385 ) -> Result<()> {
386 sqlx::query(
387 "UPDATE seesaw_effect_executions
388 SET status = 'completed',
389 result = $3,
390 completed_at = NOW()
391 WHERE event_id = $1 AND effect_id = $2",
392 )
393 .bind(event_id)
394 .bind(effect_id)
395 .bind(result)
396 .execute(&self.pool)
397 .await?;
398
399 Ok(())
400 }
401
402 async fn complete_effect_with_events(
403 &self,
404 event_id: Uuid,
405 effect_id: String,
406 result: serde_json::Value,
407 emitted_events: Vec<EmittedEvent>,
408 ) -> Result<()> {
409 let effect: EffectRow = sqlx::query_as(
411 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
412 batch_id, batch_index, batch_size,
413 execute_at, timeout_seconds, max_attempts, priority, attempts
414 FROM seesaw_effect_executions
415 WHERE event_id = $1 AND effect_id = $2",
416 )
417 .bind(event_id)
418 .bind(&effect_id)
419 .fetch_one(&self.pool)
420 .await?;
421
422 let parent: ParentEventRow = sqlx::query_as(
424 "SELECT hops, created_at
425 FROM seesaw_events
426 WHERE event_id = $1
427 ORDER BY created_at ASC, id ASC
428 LIMIT 1",
429 )
430 .bind(event_id)
431 .fetch_one(&self.pool)
432 .await?;
433
434 let mut tx = self.pool.begin().await?;
436
437 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
439 let deterministic_id = Uuid::new_v5(
441 &NAMESPACE_SEESAW,
442 format!(
443 "{}-{}-{}-{}",
444 event_id, effect_id, emitted.event_type, emitted_index
445 )
446 .as_bytes(),
447 );
448
449 let deterministic_timestamp = emitted_event_created_at(parent.created_at);
452
453 sqlx::query(
455 "INSERT INTO seesaw_events (
456 event_id, parent_id, correlation_id, event_type, payload, hops,
457 batch_id, batch_index, batch_size, created_at
458 )
459 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
460 ON CONFLICT (event_id, created_at) DO NOTHING",
461 )
462 .bind(deterministic_id)
463 .bind(Some(event_id))
464 .bind(effect.correlation_id)
465 .bind(&emitted.event_type)
466 .bind(emitted.payload)
467 .bind(parent.hops + 1)
468 .bind(emitted.batch_id)
469 .bind(emitted.batch_index)
470 .bind(emitted.batch_size)
471 .bind(deterministic_timestamp)
472 .execute(&mut *tx)
473 .await?;
474 }
475
476 sqlx::query(
478 "UPDATE seesaw_effect_executions
479 SET status = 'completed',
480 result = $3,
481 completed_at = NOW()
482 WHERE event_id = $1 AND effect_id = $2",
483 )
484 .bind(event_id)
485 .bind(effect_id)
486 .bind(result)
487 .execute(&mut *tx)
488 .await?;
489
490 tx.commit().await?;
492
493 Ok(())
494 }
495
496 async fn fail_effect(
497 &self,
498 event_id: Uuid,
499 effect_id: String,
500 error: String,
501 attempts: i32,
502 ) -> Result<()> {
503 let retry_at = Utc::now() + Duration::seconds(effect_retry_delay_seconds(attempts));
504 sqlx::query(
505 "UPDATE seesaw_effect_executions
506 SET status = 'failed',
507 error = $3,
508 execute_at = $5,
509 claimed_at = NULL
510 WHERE event_id = $1 AND effect_id = $2 AND attempts >= $4",
511 )
512 .bind(event_id)
513 .bind(effect_id)
514 .bind(error)
515 .bind(attempts)
516 .bind(retry_at)
517 .execute(&self.pool)
518 .await?;
519
520 Ok(())
521 }
522
523 async fn dlq_effect(
524 &self,
525 event_id: Uuid,
526 effect_id: String,
527 error: String,
528 reason: String,
529 attempts: i32,
530 ) -> Result<()> {
531 let effect: EffectRow = sqlx::query_as(
533 "SELECT event_id, effect_id, correlation_id, event_type, event_payload, parent_event_id,
534 batch_id, batch_index, batch_size,
535 execute_at, timeout_seconds, max_attempts, priority, attempts
536 FROM seesaw_effect_executions
537 WHERE event_id = $1 AND effect_id = $2",
538 )
539 .bind(event_id)
540 .bind(&effect_id)
541 .fetch_one(&self.pool)
542 .await?;
543
544 let parent = sqlx::query_as::<_, ParentEventRow>(
545 "SELECT hops, created_at
546 FROM seesaw_events
547 WHERE event_id = $1
548 ORDER BY created_at ASC, id ASC
549 LIMIT 1",
550 )
551 .bind(event_id)
552 .fetch_optional(&self.pool)
553 .await?;
554
555 let mut tx = self.pool.begin().await?;
556
557 sqlx::query(
559 "INSERT INTO seesaw_dlq (
560 event_id, effect_id, correlation_id, error, event_type, event_payload, reason, attempts
561 )
562 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
563 )
564 .bind(event_id)
565 .bind(&effect_id)
566 .bind(effect.correlation_id)
567 .bind(&error)
568 .bind(&effect.event_type)
569 .bind(&effect.event_payload)
570 .bind(&reason)
571 .bind(attempts)
572 .execute(&mut *tx)
573 .await?;
574
575 if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
576 (effect.batch_id, effect.batch_index, effect.batch_size)
577 {
578 let synthetic_event_id = Uuid::new_v5(
579 &NAMESPACE_SEESAW,
580 format!("{}-{}-dlq-terminal", event_id, effect_id).as_bytes(),
581 );
582 let synthetic_created_at = parent
583 .as_ref()
584 .map(|row| emitted_event_created_at(row.created_at))
585 .unwrap_or_else(Utc::now);
586 let synthetic_hops = parent.as_ref().map(|row| row.hops + 1).unwrap_or(0);
587
588 sqlx::query(
589 "INSERT INTO seesaw_events (
590 event_id, parent_id, correlation_id, event_type, payload, hops,
591 batch_id, batch_index, batch_size, created_at
592 )
593 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
594 ON CONFLICT (event_id, created_at) DO NOTHING",
595 )
596 .bind(synthetic_event_id)
597 .bind(Some(event_id))
598 .bind(effect.correlation_id)
599 .bind(&effect.event_type)
600 .bind(&effect.event_payload)
601 .bind(synthetic_hops)
602 .bind(Some(batch_id))
603 .bind(Some(batch_index))
604 .bind(Some(batch_size))
605 .bind(synthetic_created_at)
606 .execute(&mut *tx)
607 .await?;
608 }
609
610 sqlx::query("DELETE FROM seesaw_effect_executions WHERE event_id = $1 AND effect_id = $2")
612 .bind(event_id)
613 .bind(&effect_id)
614 .execute(&mut *tx)
615 .await?;
616
617 tx.commit().await?;
618
619 Ok(())
620 }
621
622 async fn subscribe_workflow_events(
623 &self,
624 correlation_id: Uuid,
625 ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
626 use sqlx::postgres::PgListener;
627
628 let channel = format!("seesaw_workflow_{}", correlation_id);
629 const PAGE_SIZE: i64 = 256;
630 const CATCH_UP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
631
632 let initial_cursor: Option<(DateTime<Utc>, i64)> = sqlx::query_as(
635 "SELECT created_at, id
636 FROM seesaw_events
637 WHERE correlation_id = $1
638 ORDER BY created_at DESC, id DESC
639 LIMIT 1",
640 )
641 .bind(correlation_id)
642 .fetch_optional(&self.pool)
643 .await?;
644
645 let mut listener = PgListener::connect_with(&self.pool).await?;
647 listener.listen(&channel).await?;
648
649 let pool = self.pool.clone();
650 let (tx, rx) = futures::channel::mpsc::unbounded::<seesaw_core::WorkflowEvent>();
651
652 tokio::spawn(async move {
653 let mut cursor = initial_cursor;
654 let mut drain_pending = true;
655
656 loop {
657 if !drain_pending {
658 match tokio::time::timeout(CATCH_UP_INTERVAL, listener.recv()).await {
659 Ok(Ok(_notification)) => {}
660 Ok(Err(error)) => {
661 tracing::warn!(
662 "workflow listener recv failed for {}: {}",
663 correlation_id,
664 error
665 );
666 return;
667 }
668 Err(_) => {}
669 }
670 }
671 drain_pending = false;
672
673 loop {
674 let rows_result: std::result::Result<Vec<WorkflowEventRow>, sqlx::Error> =
675 if let Some((created_at, id)) = cursor {
676 sqlx::query_as(
677 "SELECT id, event_id, correlation_id, event_type, payload, created_at
678 FROM seesaw_events
679 WHERE correlation_id = $1
680 AND (
681 created_at > $2
682 OR (created_at = $2 AND id > $3)
683 )
684 ORDER BY created_at ASC, id ASC
685 LIMIT $4",
686 )
687 .bind(correlation_id)
688 .bind(created_at)
689 .bind(id)
690 .bind(PAGE_SIZE)
691 .fetch_all(&pool)
692 .await
693 } else {
694 sqlx::query_as(
695 "SELECT id, event_id, correlation_id, event_type, payload, created_at
696 FROM seesaw_events
697 WHERE correlation_id = $1
698 ORDER BY created_at ASC, id ASC
699 LIMIT $2",
700 )
701 .bind(correlation_id)
702 .bind(PAGE_SIZE)
703 .fetch_all(&pool)
704 .await
705 };
706
707 let rows = match rows_result {
708 Ok(rows) => rows,
709 Err(error) => {
710 tracing::warn!(
711 "workflow event query failed for {}: {}",
712 correlation_id,
713 error
714 );
715 return;
716 }
717 };
718
719 if rows.is_empty() {
720 break;
721 }
722
723 for row in rows {
724 cursor = Some((row.created_at, row.id));
725 if tx
726 .unbounded_send(seesaw_core::WorkflowEvent {
727 event_id: row.event_id,
728 correlation_id: row.correlation_id,
729 event_type: row.event_type,
730 payload: row.payload,
731 })
732 .is_err()
733 {
734 return;
735 }
736 }
737 }
738 }
739 });
740
741 Ok(Box::new(rx))
742 }
743
744 async fn get_workflow_status(
745 &self,
746 correlation_id: Uuid,
747 ) -> Result<seesaw_core::WorkflowStatus> {
748 let state = sqlx::query_as::<_, (serde_json::Value,)>(
749 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
750 )
751 .bind(correlation_id)
752 .fetch_optional(&self.pool)
753 .await?
754 .map(|r| r.0);
755
756 let pending_effects = sqlx::query_as::<_, (i64,)>(
757 "SELECT COUNT(*) FROM seesaw_effect_executions
758 WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
759 )
760 .bind(correlation_id)
761 .fetch_one(&self.pool)
762 .await?
763 .0;
764
765 let last_event = sqlx::query_as::<_, (String,)>(
766 "SELECT event_type FROM seesaw_events
767 WHERE correlation_id = $1
768 ORDER BY created_at DESC, id DESC
769 LIMIT 1",
770 )
771 .bind(correlation_id)
772 .fetch_optional(&self.pool)
773 .await?
774 .map(|r| r.0);
775
776 Ok(seesaw_core::WorkflowStatus {
777 correlation_id,
778 state,
779 pending_effects,
780 is_settled: pending_effects == 0,
781 last_event,
782 })
783 }
784
785 async fn join_same_batch_append_and_maybe_claim(
786 &self,
787 join_effect_id: String,
788 correlation_id: Uuid,
789 source_event_id: Uuid,
790 source_event_type: String,
791 source_payload: serde_json::Value,
792 source_created_at: DateTime<Utc>,
793 batch_id: Uuid,
794 batch_index: i32,
795 batch_size: i32,
796 ) -> Result<Option<Vec<JoinEntry>>> {
797 let mut tx = self.pool.begin().await?;
798
799 sqlx::query(
800 "INSERT INTO seesaw_join_entries (
801 join_effect_id, correlation_id, source_event_id, source_event_type, source_payload,
802 source_created_at, batch_id, batch_index, batch_size
803 )
804 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
805 ON CONFLICT (join_effect_id, correlation_id, source_event_id) DO NOTHING",
806 )
807 .bind(&join_effect_id)
808 .bind(correlation_id)
809 .bind(source_event_id)
810 .bind(&source_event_type)
811 .bind(source_payload)
812 .bind(source_created_at)
813 .bind(batch_id)
814 .bind(batch_index)
815 .bind(batch_size)
816 .execute(&mut *tx)
817 .await?;
818
819 sqlx::query(
820 "INSERT INTO seesaw_join_windows (
821 join_effect_id, correlation_id, mode, batch_id, target_count, status
822 )
823 VALUES ($1, $2, 'same_batch', $3, $4, 'open')
824 ON CONFLICT (join_effect_id, correlation_id, batch_id) DO NOTHING",
825 )
826 .bind(&join_effect_id)
827 .bind(correlation_id)
828 .bind(batch_id)
829 .bind(batch_size)
830 .execute(&mut *tx)
831 .await?;
832
833 sqlx::query(
834 "UPDATE seesaw_join_windows
835 SET target_count = $4,
836 updated_at = NOW()
837 WHERE join_effect_id = $1
838 AND correlation_id = $2
839 AND batch_id = $3
840 AND target_count <> $4",
841 )
842 .bind(&join_effect_id)
843 .bind(correlation_id)
844 .bind(batch_id)
845 .bind(batch_size)
846 .execute(&mut *tx)
847 .await?;
848
849 let claimed: Option<(String,)> = sqlx::query_as(
850 "UPDATE seesaw_join_windows w
851 SET status = 'processing',
852 sealed_at = COALESCE(w.sealed_at, NOW()),
853 processing_started_at = NOW(),
854 updated_at = NOW(),
855 last_error = NULL
856 WHERE w.join_effect_id = $1
857 AND w.correlation_id = $2
858 AND w.batch_id = $3
859 AND w.status = 'open'
860 AND (
861 SELECT COUNT(*)::int
862 FROM seesaw_join_entries e
863 WHERE e.join_effect_id = w.join_effect_id
864 AND e.correlation_id = w.correlation_id
865 AND e.batch_id = w.batch_id
866 ) >= w.target_count
867 RETURNING w.join_effect_id",
868 )
869 .bind(&join_effect_id)
870 .bind(correlation_id)
871 .bind(batch_id)
872 .fetch_optional(&mut *tx)
873 .await?;
874
875 if claimed.is_none() {
876 tx.commit().await?;
877 return Ok(None);
878 }
879
880 let rows = sqlx::query_as::<_, (Uuid, String, serde_json::Value, Uuid, i32, i32, DateTime<Utc>)>(
881 "SELECT source_event_id, source_event_type, source_payload, batch_id, batch_index, batch_size, source_created_at
882 FROM seesaw_join_entries
883 WHERE join_effect_id = $1
884 AND correlation_id = $2
885 AND batch_id = $3
886 ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
887 )
888 .bind(&join_effect_id)
889 .bind(correlation_id)
890 .bind(batch_id)
891 .fetch_all(&mut *tx)
892 .await?;
893
894 let entries = rows
895 .into_iter()
896 .map(
897 |(source_event_id, event_type, payload, batch_id, batch_index, batch_size, created_at)| JoinEntry {
898 source_event_id,
899 event_type,
900 payload,
901 batch_id,
902 batch_index,
903 batch_size,
904 created_at,
905 },
906 )
907 .collect::<Vec<_>>();
908
909 tx.commit().await?;
910 Ok(Some(entries))
911 }
912
913 async fn join_same_batch_complete(
914 &self,
915 join_effect_id: String,
916 correlation_id: Uuid,
917 batch_id: Uuid,
918 ) -> Result<()> {
919 let mut tx = self.pool.begin().await?;
920
921 sqlx::query(
922 "UPDATE seesaw_join_windows
923 SET status = 'completed',
924 completed_at = NOW(),
925 updated_at = NOW()
926 WHERE join_effect_id = $1
927 AND correlation_id = $2
928 AND batch_id = $3",
929 )
930 .bind(&join_effect_id)
931 .bind(correlation_id)
932 .bind(batch_id)
933 .execute(&mut *tx)
934 .await?;
935
936 sqlx::query(
937 "DELETE FROM seesaw_join_entries
938 WHERE join_effect_id = $1
939 AND correlation_id = $2
940 AND batch_id = $3",
941 )
942 .bind(&join_effect_id)
943 .bind(correlation_id)
944 .bind(batch_id)
945 .execute(&mut *tx)
946 .await?;
947
948 sqlx::query(
949 "DELETE FROM seesaw_join_windows
950 WHERE join_effect_id = $1
951 AND correlation_id = $2
952 AND batch_id = $3",
953 )
954 .bind(&join_effect_id)
955 .bind(correlation_id)
956 .bind(batch_id)
957 .execute(&mut *tx)
958 .await?;
959
960 tx.commit().await?;
961 Ok(())
962 }
963
964 async fn join_same_batch_release(
965 &self,
966 join_effect_id: String,
967 correlation_id: Uuid,
968 batch_id: Uuid,
969 error: String,
970 ) -> Result<()> {
971 sqlx::query(
972 "UPDATE seesaw_join_windows
973 SET status = 'open',
974 processing_started_at = NULL,
975 last_error = $4,
976 updated_at = NOW()
977 WHERE join_effect_id = $1
978 AND correlation_id = $2
979 AND batch_id = $3
980 AND status = 'processing'",
981 )
982 .bind(&join_effect_id)
983 .bind(correlation_id)
984 .bind(batch_id)
985 .bind(error)
986 .execute(&self.pool)
987 .await?;
988
989 Ok(())
990 }
991}
992
993#[derive(FromRow)]
994struct StreamRow {
995 seq: i64,
996 stream_type: String,
997 correlation_id: Uuid,
998 event_id: Option<Uuid>,
999 effect_event_id: Option<Uuid>,
1000 effect_id: Option<String>,
1001 status: Option<String>,
1002 error: Option<String>,
1003 payload: Option<serde_json::Value>,
1004 created_at: DateTime<Utc>,
1005}
1006
1007#[derive(FromRow)]
1008struct EffectLogRow {
1009 correlation_id: Uuid,
1010 event_id: Uuid,
1011 effect_id: String,
1012 status: String,
1013 attempts: i32,
1014 event_type: String,
1015 result: Option<serde_json::Value>,
1016 error: Option<String>,
1017 created_at: DateTime<Utc>,
1018 execute_at: DateTime<Utc>,
1019 claimed_at: Option<DateTime<Utc>>,
1020 last_attempted_at: Option<DateTime<Utc>>,
1021 completed_at: Option<DateTime<Utc>>,
1022}
1023
1024#[derive(FromRow)]
1025struct DeadLetterRow {
1026 correlation_id: Uuid,
1027 event_id: Uuid,
1028 effect_id: String,
1029 event_type: String,
1030 event_payload: serde_json::Value,
1031 error: String,
1032 reason: String,
1033 attempts: i32,
1034 failed_at: DateTime<Utc>,
1035 resolved_at: Option<DateTime<Utc>>,
1036}
1037
1038#[derive(FromRow)]
1039struct FailedWorkflowRow {
1040 correlation_id: Uuid,
1041 failed_effects: i64,
1042 active_effects: i64,
1043 dead_letters: i64,
1044 last_failed_at: Option<DateTime<Utc>>,
1045 last_error: Option<String>,
1046}
1047
1048#[async_trait]
1049impl InsightStore for PostgresStore {
1050 async fn subscribe_events(
1051 &self,
1052 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
1053 use futures::stream::StreamExt;
1054 use sqlx::postgres::PgListener;
1055
1056 let mut listener = PgListener::connect_with(&self.pool).await?;
1058 listener.listen("seesaw_stream").await?;
1059
1060 let pool = self.pool.clone();
1062 let stream = listener.into_stream().filter_map(move |result| {
1063 let pool = pool.clone();
1064 Box::pin(async move {
1065 match result {
1066 Ok(_notification) => {
1067 if let Ok(row) = sqlx::query_as::<_, StreamRow>(
1070 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1071 effect_id, status, error, payload, created_at
1072 FROM seesaw_stream
1073 ORDER BY seq DESC
1074 LIMIT 1",
1075 )
1076 .fetch_one(&pool)
1077 .await
1078 {
1079 Some(stream_row_to_insight_event(row))
1080 } else {
1081 None
1082 }
1083 }
1084 Err(_) => None,
1085 }
1086 })
1087 });
1088
1089 Ok(Box::new(stream))
1090 }
1091
1092 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<WorkflowTree> {
1093 let events = sqlx::query_as::<_, EventRow>(
1095 "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
1096 batch_id, batch_index, batch_size, created_at
1097 FROM seesaw_events
1098 WHERE correlation_id = $1
1099 ORDER BY created_at ASC",
1100 )
1101 .bind(correlation_id)
1102 .fetch_all(&self.pool)
1103 .await?;
1104
1105 let effects = sqlx::query_as::<_, EffectTreeRow>(
1107 "SELECT event_id, effect_id, status, result, error, attempts, created_at,
1108 batch_id, batch_index, batch_size
1109 FROM seesaw_effect_executions
1110 WHERE correlation_id = $1
1111 ORDER BY created_at ASC",
1112 )
1113 .bind(correlation_id)
1114 .fetch_all(&self.pool)
1115 .await?;
1116
1117 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1119 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1120
1121 let state = sqlx::query_as::<_, (serde_json::Value,)>(
1123 "SELECT state FROM seesaw_state WHERE correlation_id = $1",
1124 )
1125 .bind(correlation_id)
1126 .fetch_optional(&self.pool)
1127 .await?
1128 .map(|r| r.0);
1129
1130 Ok(WorkflowTree {
1131 correlation_id,
1132 roots,
1133 state,
1134 event_count: events.len(),
1135 effect_count: effects.len(),
1136 })
1137 }
1138
1139 async fn get_stats(&self) -> Result<InsightStats> {
1140 let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
1141 .fetch_one(&self.pool)
1142 .await?
1143 .0;
1144
1145 let active_effects = sqlx::query_as::<_, (i64,)>(
1146 "SELECT COUNT(*) FROM seesaw_effect_executions
1147 WHERE status IN ('pending', 'executing')",
1148 )
1149 .fetch_one(&self.pool)
1150 .await?
1151 .0;
1152
1153 let completed_effects = sqlx::query_as::<_, (i64,)>(
1154 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'completed'",
1155 )
1156 .fetch_one(&self.pool)
1157 .await?
1158 .0;
1159
1160 let failed_effects = sqlx::query_as::<_, (i64,)>(
1161 "SELECT COUNT(*) FROM seesaw_effect_executions WHERE status = 'failed'",
1162 )
1163 .fetch_one(&self.pool)
1164 .await?
1165 .0;
1166
1167 Ok(InsightStats {
1168 total_events,
1169 active_effects,
1170 completed_effects,
1171 failed_effects,
1172 })
1173 }
1174
1175 async fn get_recent_events(
1176 &self,
1177 cursor: Option<i64>,
1178 limit: usize,
1179 ) -> Result<Vec<InsightEvent>> {
1180 let rows = if let Some(cursor_seq) = cursor {
1181 sqlx::query_as::<_, StreamRow>(
1182 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1183 effect_id, status, error, payload, created_at
1184 FROM seesaw_stream
1185 WHERE seq > $1
1186 ORDER BY seq ASC
1187 LIMIT $2",
1188 )
1189 .bind(cursor_seq)
1190 .bind(limit as i64)
1191 .fetch_all(&self.pool)
1192 .await?
1193 } else {
1194 sqlx::query_as::<_, StreamRow>(
1195 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1196 effect_id, status, error, payload, created_at
1197 FROM seesaw_stream
1198 ORDER BY seq DESC
1199 LIMIT $1",
1200 )
1201 .bind(limit as i64)
1202 .fetch_all(&self.pool)
1203 .await?
1204 };
1205
1206 Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
1207 }
1208
1209 async fn get_effect_logs(
1210 &self,
1211 correlation_id: Option<Uuid>,
1212 limit: usize,
1213 ) -> Result<Vec<EffectExecutionLog>> {
1214 let rows = sqlx::query_as::<_, EffectLogRow>(
1215 "SELECT
1216 correlation_id,
1217 event_id,
1218 effect_id,
1219 status,
1220 attempts,
1221 event_type,
1222 result,
1223 error,
1224 created_at,
1225 execute_at,
1226 claimed_at,
1227 last_attempted_at,
1228 completed_at
1229 FROM seesaw_effect_executions
1230 WHERE ($1::uuid IS NULL OR correlation_id = $1)
1231 ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
1232 LIMIT $2",
1233 )
1234 .bind(correlation_id)
1235 .bind(limit as i64)
1236 .fetch_all(&self.pool)
1237 .await?;
1238
1239 Ok(rows
1240 .into_iter()
1241 .map(|row| {
1242 let started_at = row.claimed_at.or(row.last_attempted_at);
1243 let duration_ms = match (started_at, row.completed_at) {
1244 (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
1245 _ => None,
1246 };
1247
1248 EffectExecutionLog {
1249 correlation_id: row.correlation_id,
1250 event_id: row.event_id,
1251 effect_id: row.effect_id,
1252 status: row.status,
1253 attempts: row.attempts,
1254 event_type: Some(row.event_type),
1255 result: row.result,
1256 error: row.error,
1257 created_at: row.created_at,
1258 execute_at: Some(row.execute_at),
1259 claimed_at: row.claimed_at,
1260 last_attempted_at: row.last_attempted_at,
1261 completed_at: row.completed_at,
1262 duration_ms,
1263 }
1264 })
1265 .collect())
1266 }
1267
1268 async fn get_dead_letters(
1269 &self,
1270 unresolved_only: bool,
1271 limit: usize,
1272 ) -> Result<Vec<DeadLetterEntry>> {
1273 let rows = sqlx::query_as::<_, DeadLetterRow>(
1274 "SELECT
1275 correlation_id,
1276 event_id,
1277 effect_id,
1278 event_type,
1279 event_payload,
1280 error,
1281 reason,
1282 attempts,
1283 failed_at,
1284 resolved_at
1285 FROM seesaw_dlq
1286 WHERE (NOT $1 OR resolved_at IS NULL)
1287 ORDER BY failed_at DESC
1288 LIMIT $2",
1289 )
1290 .bind(unresolved_only)
1291 .bind(limit as i64)
1292 .fetch_all(&self.pool)
1293 .await?;
1294
1295 Ok(rows
1296 .into_iter()
1297 .map(|row| DeadLetterEntry {
1298 correlation_id: row.correlation_id,
1299 event_id: row.event_id,
1300 effect_id: row.effect_id,
1301 event_type: row.event_type,
1302 event_payload: row.event_payload,
1303 error: row.error,
1304 reason: row.reason,
1305 attempts: row.attempts,
1306 failed_at: row.failed_at,
1307 resolved_at: row.resolved_at,
1308 })
1309 .collect())
1310 }
1311
1312 async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
1313 let rows = sqlx::query_as::<_, FailedWorkflowRow>(
1314 "WITH effect_agg AS (
1315 SELECT
1316 correlation_id,
1317 COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
1318 COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
1319 MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
1320 MAX(error) FILTER (WHERE status = 'failed') AS last_error
1321 FROM seesaw_effect_executions
1322 GROUP BY correlation_id
1323 ),
1324 dlq_agg AS (
1325 SELECT
1326 correlation_id,
1327 COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
1328 MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
1329 MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
1330 FROM seesaw_dlq
1331 GROUP BY correlation_id
1332 )
1333 SELECT
1334 COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
1335 COALESCE(e.failed_effects, 0) AS failed_effects,
1336 COALESCE(e.active_effects, 0) AS active_effects,
1337 COALESCE(d.dead_letters, 0) AS dead_letters,
1338 GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
1339 COALESCE(d.last_dlq_error, e.last_error) AS last_error
1340 FROM effect_agg e
1341 FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
1342 WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
1343 ORDER BY last_failed_at DESC NULLS LAST
1344 LIMIT $1",
1345 )
1346 .bind(limit as i64)
1347 .fetch_all(&self.pool)
1348 .await?;
1349
1350 Ok(rows
1351 .into_iter()
1352 .map(|row| FailedWorkflow {
1353 correlation_id: row.correlation_id,
1354 failed_effects: row.failed_effects,
1355 active_effects: row.active_effects,
1356 dead_letters: row.dead_letters,
1357 last_failed_at: row.last_failed_at,
1358 last_error: row.last_error,
1359 })
1360 .collect())
1361 }
1362}
1363
1364#[derive(FromRow)]
1365struct EffectTreeRow {
1366 event_id: Uuid,
1367 effect_id: String,
1368 status: String,
1369 result: Option<serde_json::Value>,
1370 error: Option<String>,
1371 attempts: i32,
1372 created_at: DateTime<Utc>,
1373 batch_id: Option<Uuid>,
1374 batch_index: Option<i32>,
1375 batch_size: Option<i32>,
1376}
1377
1378fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
1379 let stream_type = match row.stream_type.as_str() {
1380 "event_dispatched" => StreamType::EventDispatched,
1381 "effect_started" => StreamType::EffectStarted,
1382 "effect_completed" => StreamType::EffectCompleted,
1383 "effect_failed" => StreamType::EffectFailed,
1384 _ => StreamType::EventDispatched, };
1386
1387 let event_type = if stream_type == StreamType::EventDispatched {
1389 row.payload
1390 .as_ref()
1391 .and_then(|p| p.get("event_type"))
1392 .and_then(|v| v.as_str())
1393 .map(|s| s.to_string())
1394 } else {
1395 None
1396 };
1397
1398 InsightEvent {
1399 seq: row.seq,
1400 stream_type,
1401 correlation_id: row.correlation_id,
1402 event_id: row.event_id,
1403 effect_event_id: row.effect_event_id,
1404 effect_id: row.effect_id,
1405 event_type,
1406 status: row.status,
1407 error: row.error,
1408 payload: row.payload,
1409 created_at: row.created_at,
1410 }
1411}
1412
1413fn build_event_tree(
1414 events: &[EventRow],
1415 effects: &[EffectTreeRow],
1416 parent_id: Option<Uuid>,
1417 event_ids: &HashSet<Uuid>,
1418 is_root_pass: bool,
1419) -> Vec<EventNode> {
1420 events
1421 .iter()
1422 .filter(|event| {
1423 if is_root_pass {
1424 event.parent_id.is_none()
1425 || event
1426 .parent_id
1427 .map(|parent| !event_ids.contains(&parent))
1428 .unwrap_or(false)
1429 } else {
1430 event.parent_id == parent_id
1431 }
1432 })
1433 .map(|event| {
1434 let event_effects: Vec<EffectNode> = effects
1436 .iter()
1437 .filter(|eff| eff.event_id == event.event_id)
1438 .map(|eff| EffectNode {
1439 effect_id: eff.effect_id.clone(),
1440 event_id: eff.event_id,
1441 status: eff.status.clone(),
1442 result: eff.result.clone(),
1443 error: eff.error.clone(),
1444 attempts: eff.attempts,
1445 created_at: eff.created_at,
1446 batch_id: eff.batch_id,
1447 batch_index: eff.batch_index,
1448 batch_size: eff.batch_size,
1449 })
1450 .collect();
1451
1452 let children =
1454 build_event_tree(events, effects, Some(event.event_id), event_ids, false);
1455
1456 EventNode {
1457 event_id: event.event_id,
1458 event_type: event.event_type.clone(),
1459 payload: event.payload.clone(),
1460 created_at: event.created_at,
1461 batch_id: event.batch_id,
1462 batch_index: event.batch_index,
1463 batch_size: event.batch_size,
1464 children,
1465 effects: event_effects,
1466 }
1467 })
1468 .collect()
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473 use super::*;
1474 use chrono::{TimeZone, Timelike};
1475
1476 #[test]
1477 fn emitted_event_created_at_is_midnight_on_parent_day() {
1478 let parent = Utc
1479 .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
1480 .single()
1481 .expect("valid timestamp");
1482
1483 let emitted = emitted_event_created_at(parent);
1484
1485 assert_eq!(emitted.date_naive(), parent.date_naive());
1486 assert_eq!(emitted.hour(), 0);
1487 assert_eq!(emitted.minute(), 0);
1488 assert_eq!(emitted.second(), 0);
1489 }
1490
1491 #[test]
1492 fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
1493 let first_parent = Utc
1494 .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
1495 .single()
1496 .expect("valid timestamp");
1497 let second_parent = Utc
1498 .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
1499 .single()
1500 .expect("valid timestamp");
1501
1502 let first_emitted = emitted_event_created_at(first_parent);
1503 let second_emitted = emitted_event_created_at(second_parent);
1504
1505 assert_eq!(first_emitted, second_emitted);
1506 }
1507
1508 #[test]
1509 fn effect_retry_delay_seconds_uses_exponential_backoff() {
1510 assert_eq!(effect_retry_delay_seconds(1), 1);
1511 assert_eq!(effect_retry_delay_seconds(2), 2);
1512 assert_eq!(effect_retry_delay_seconds(3), 4);
1513 assert_eq!(effect_retry_delay_seconds(4), 8);
1514 }
1515
1516 #[test]
1517 fn effect_retry_delay_seconds_is_capped() {
1518 assert_eq!(effect_retry_delay_seconds(9), 256);
1519 assert_eq!(effect_retry_delay_seconds(50), 256);
1520 }
1521
1522 #[test]
1523 fn build_event_tree_treats_orphan_parent_as_root() {
1524 let correlation_id = Uuid::new_v4();
1525 let event_id = Uuid::new_v4();
1526 let missing_parent = Uuid::new_v4();
1527 let now = Utc::now();
1528
1529 let events = vec![EventRow {
1530 id: 1,
1531 event_id,
1532 parent_id: Some(missing_parent),
1533 correlation_id,
1534 event_type: "OrphanEvent".to_string(),
1535 payload: serde_json::json!({"ok": true}),
1536 hops: 1,
1537 batch_id: None,
1538 batch_index: None,
1539 batch_size: None,
1540 created_at: now,
1541 }];
1542
1543 let effects: Vec<EffectTreeRow> = Vec::new();
1544 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1545
1546 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1547
1548 assert_eq!(roots.len(), 1);
1549 assert_eq!(roots[0].event_id, event_id);
1550 }
1551}