1pub mod event_store;
2
3use anyhow::Result;
7use async_trait::async_trait;
8use chrono::{DateTime, Duration, Utc};
9use seesaw_core::{
10 insight::*, EmittedEvent, EventProcessingCommit, ExpiredJoinWindow, JoinEntry, QueuedEvent,
11 QueuedHandlerExecution, Store, NAMESPACE_SEESAW,
12 DeadLetter as CoreDeadLetter, DlqStats, DlqStatus,
13};
14use serde_json::Value;
15use sqlx::{FromRow, PgPool};
16use std::collections::HashSet;
17use uuid::Uuid;
18
19const EVENT_CLAIM_SECONDS: i64 = 30;
20
21fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
22 parent_created_at
23 .date_naive()
24 .and_hms_opt(0, 0, 0)
25 .expect("midnight should always be a valid UTC timestamp")
26 .and_utc()
27}
28
29fn handler_retry_delay_seconds(attempts: i32) -> i64 {
30 let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
31 1_i64 << exponent
32}
33
34pub struct PostgresStore {
36 pool: PgPool,
37}
38
39impl PostgresStore {
40 pub fn new(pool: PgPool) -> Self {
41 Self { pool }
42 }
43
44 pub fn pool(&self) -> &PgPool {
45 &self.pool
46 }
47
48 pub async fn is_processed(&self, event_id: Uuid) -> Result<bool> {
53 let result: bool = sqlx::query_scalar(
54 "SELECT EXISTS(SELECT 1 FROM seesaw_processed WHERE event_id = $1)"
55 )
56 .bind(event_id)
57 .fetch_one(&self.pool)
58 .await?;
59
60 Ok(result)
61 }
62}
63
64impl Clone for PostgresStore {
65 fn clone(&self) -> Self {
66 Self {
67 pool: self.pool.clone(),
68 }
69 }
70}
71
72#[derive(FromRow)]
73struct EventRow {
74 id: i64,
75 event_id: Uuid,
76 parent_id: Option<Uuid>,
77 correlation_id: Uuid,
78 event_type: String,
79 payload: serde_json::Value,
80 hops: i32,
81 retry_count: i32,
82 batch_id: Option<Uuid>,
83 batch_index: Option<i32>,
84 batch_size: Option<i32>,
85 created_at: DateTime<Utc>,
86}
87
88#[derive(FromRow)]
89struct HandlerRow {
90 event_id: Uuid,
91 handler_id: String,
92 correlation_id: Uuid,
93 event_type: String,
94 event_payload: serde_json::Value,
95 parent_event_id: Option<Uuid>,
96 batch_id: Option<Uuid>,
97 batch_index: Option<i32>,
98 batch_size: Option<i32>,
99 execute_at: DateTime<Utc>,
100 timeout_seconds: i32,
101 max_attempts: i32,
102 priority: i32,
103 attempts: i32,
104 join_window_timeout_seconds: Option<i32>,
105}
106
107#[derive(FromRow)]
108struct ParentEventRow {
109 hops: i32,
110 created_at: DateTime<Utc>,
111}
112
113#[derive(FromRow)]
114struct DlqSourceEventRow {
115 correlation_id: Uuid,
116 event_type: String,
117 payload: serde_json::Value,
118 batch_id: Option<Uuid>,
119 batch_index: Option<i32>,
120 batch_size: Option<i32>,
121 hops: i32,
122 created_at: DateTime<Utc>,
123}
124
125#[derive(FromRow)]
126struct WorkflowEventRow {
127 id: i64,
128 event_id: Uuid,
129 correlation_id: Uuid,
130 event_type: String,
131 payload: serde_json::Value,
132 created_at: DateTime<Utc>,
133}
134
135#[async_trait]
136impl Store for PostgresStore {
137 async fn publish(&self, event: QueuedEvent) -> Result<()> {
138 let mut tx = self.pool.begin().await?;
139
140 let inserted: Option<Uuid> = sqlx::query_scalar(
143 "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
144 VALUES ($1, $2, $3)
145 ON CONFLICT (event_id) DO NOTHING
146 RETURNING event_id",
147 )
148 .bind(event.event_id)
149 .bind(event.correlation_id)
150 .bind(event.created_at)
151 .fetch_optional(&mut *tx)
152 .await?;
153
154 if inserted.is_none() {
155 tx.commit().await?;
156 return Ok(());
157 }
158
159 sqlx::query(
160 "INSERT INTO seesaw_events (
161 event_id, parent_id, correlation_id, event_type, payload, hops, retry_count,
162 batch_id, batch_index, batch_size, created_at
163 )
164 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
165 )
166 .bind(event.event_id)
167 .bind(event.parent_id)
168 .bind(event.correlation_id)
169 .bind(event.event_type)
170 .bind(event.payload)
171 .bind(event.hops)
172 .bind(event.retry_count)
173 .bind(event.batch_id)
174 .bind(event.batch_index)
175 .bind(event.batch_size)
176 .bind(event.created_at)
177 .execute(&mut *tx)
178 .await?;
179
180 tx.commit().await?;
181
182 Ok(())
183 }
184
185 async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
186 let row: Option<EventRow> = sqlx::query_as(
187 "WITH next_event AS (
188 SELECT e.id
189 FROM seesaw_events e
190 WHERE e.processed_at IS NULL
191 AND (e.locked_until IS NULL OR e.locked_until < NOW())
192 AND NOT EXISTS (
193 SELECT 1
194 FROM seesaw_events older
195 WHERE older.correlation_id = e.correlation_id
196 AND older.processed_at IS NULL
197 AND (
198 older.created_at < e.created_at
199 OR (older.created_at = e.created_at AND older.id < e.id)
200 )
201 )
202 ORDER BY e.created_at ASC, e.id ASC
203 LIMIT 1
204 FOR UPDATE SKIP LOCKED
205 )
206 UPDATE seesaw_events e
207 SET locked_until = NOW() + ($1 * INTERVAL '1 second')
208 FROM next_event
209 WHERE e.id = next_event.id
210 RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload,
211 e.hops, e.retry_count, e.batch_id, e.batch_index, e.batch_size, e.created_at",
212 )
213 .bind(EVENT_CLAIM_SECONDS)
214 .fetch_optional(&self.pool)
215 .await?;
216
217 Ok(row.map(|r| QueuedEvent {
218 id: r.id,
219 event_id: r.event_id,
220 parent_id: r.parent_id,
221 correlation_id: r.correlation_id,
222 event_type: r.event_type,
223 payload: r.payload,
224 hops: r.hops,
225 retry_count: r.retry_count,
226 batch_id: r.batch_id,
227 batch_index: r.batch_index,
228 batch_size: r.batch_size,
229 created_at: r.created_at,
230 }))
231 }
232
233 async fn ack(&self, id: i64) -> Result<()> {
234 sqlx::query(
235 "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
236 )
237 .bind(id)
238 .execute(&self.pool)
239 .await?;
240 Ok(())
241 }
242
243 async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
244 let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
245 sqlx::query(
246 "UPDATE seesaw_events
247 SET retry_count = retry_count + 1,
248 locked_until = $2
249 WHERE id = $1",
250 )
251 .bind(id)
252 .bind(locked_until)
253 .execute(&self.pool)
254 .await?;
255 Ok(())
256 }
257
258 async fn commit_event_processing(&self, commit: EventProcessingCommit) -> Result<()> {
259 let EventProcessingCommit {
260 event_row_id,
261 event_id,
262 correlation_id,
263 event_type,
264 event_payload,
265 queued_effect_intents,
266 inline_effect_failures,
267 emitted_events,
268 } = commit;
269
270 let mut tx = self.pool.begin().await?;
271
272 for intent in queued_effect_intents {
273 sqlx::query(
274 "INSERT INTO seesaw_handler_executions (
275 event_id, handler_id, correlation_id, status,
276 event_type, event_payload, parent_event_id,
277 batch_id, batch_index, batch_size,
278 execute_at, timeout_seconds, max_attempts, priority,
279 join_window_timeout_seconds
280 )
281 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
282 ON CONFLICT (event_id, handler_id) DO NOTHING",
283 )
284 .bind(event_id)
285 .bind(intent.handler_id)
286 .bind(correlation_id)
287 .bind(&event_type)
288 .bind(&event_payload)
289 .bind(intent.parent_event_id)
290 .bind(intent.batch_id)
291 .bind(intent.batch_index)
292 .bind(intent.batch_size)
293 .bind(intent.execute_at)
294 .bind(intent.timeout_seconds)
295 .bind(intent.max_attempts)
296 .bind(intent.priority)
297 .bind(intent.join_window_timeout_seconds)
298 .execute(&mut *tx)
299 .await?;
300 }
301
302 for event in emitted_events {
303 let inserted: Option<Uuid> = sqlx::query_scalar(
304 "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
305 VALUES ($1, $2, $3)
306 ON CONFLICT (event_id) DO NOTHING
307 RETURNING event_id",
308 )
309 .bind(event.event_id)
310 .bind(event.correlation_id)
311 .bind(event.created_at)
312 .fetch_optional(&mut *tx)
313 .await?;
314
315 if inserted.is_none() {
316 continue;
317 }
318
319 sqlx::query(
320 "INSERT INTO seesaw_events (
321 event_id, parent_id, correlation_id, event_type, payload, hops, retry_count,
322 batch_id, batch_index, batch_size, created_at
323 )
324 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
325 )
326 .bind(event.event_id)
327 .bind(event.parent_id)
328 .bind(event.correlation_id)
329 .bind(event.event_type)
330 .bind(event.payload)
331 .bind(event.hops)
332 .bind(event.retry_count)
333 .bind(event.batch_id)
334 .bind(event.batch_index)
335 .bind(event.batch_size)
336 .bind(event.created_at)
337 .execute(&mut *tx)
338 .await?;
339 }
340
341 let source_event: Option<DlqSourceEventRow> = if inline_effect_failures.is_empty() {
342 None
343 } else {
344 sqlx::query_as(
345 "SELECT correlation_id, event_type, payload, batch_id, batch_index, batch_size, hops, created_at
346 FROM seesaw_events
347 WHERE event_id = $1
348 ORDER BY created_at ASC, id ASC
349 LIMIT 1",
350 )
351 .bind(event_id)
352 .fetch_optional(&mut *tx)
353 .await?
354 };
355
356 for failure in inline_effect_failures {
357 sqlx::query(
358 "INSERT INTO seesaw_dlq (
359 event_id, handler_id, correlation_id, error, event_type, event_payload, reason, attempts
360 )
361 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
362 )
363 .bind(event_id)
364 .bind(&failure.handler_id)
365 .bind(correlation_id)
366 .bind(&failure.error)
367 .bind(&event_type)
368 .bind(&event_payload)
369 .bind(&failure.reason)
370 .bind(failure.attempts)
371 .execute(&mut *tx)
372 .await?;
373
374 if let Some(source) = source_event.as_ref() {
375 if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
376 (source.batch_id, source.batch_index, source.batch_size)
377 {
378 let synthetic_event_id = Uuid::new_v5(
379 &NAMESPACE_SEESAW,
380 format!("{}-{}-dlq-terminal", event_id, failure.handler_id).as_bytes(),
381 );
382 let synthetic_created_at = emitted_event_created_at(source.created_at);
383
384 sqlx::query(
385 "INSERT INTO seesaw_events (
386 event_id, parent_id, correlation_id, event_type, payload, hops,
387 batch_id, batch_index, batch_size, created_at
388 )
389 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
390 ON CONFLICT (event_id, created_at) DO NOTHING",
391 )
392 .bind(synthetic_event_id)
393 .bind(Some(event_id))
394 .bind(correlation_id)
395 .bind(&event_type)
396 .bind(&event_payload)
397 .bind(source.hops + 1)
398 .bind(Some(batch_id))
399 .bind(Some(batch_index))
400 .bind(Some(batch_size))
401 .bind(synthetic_created_at)
402 .execute(&mut *tx)
403 .await?;
404 }
405 }
406 }
407
408 let ack_result = sqlx::query(
409 "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
410 )
411 .bind(event_row_id)
412 .execute(&mut *tx)
413 .await?;
414 if ack_result.rows_affected() != 1 {
415 anyhow::bail!(
416 "atomic event commit failed to ack source event row {}",
417 event_row_id
418 );
419 }
420
421 tx.commit().await?;
422 Ok(())
423 }
424
425 async fn insert_effect_intent(
426 &self,
427 event_id: Uuid,
428 handler_id: String,
429 correlation_id: Uuid,
430 event_type: String,
431 event_payload: serde_json::Value,
432 parent_event_id: Option<Uuid>,
433 batch_id: Option<Uuid>,
434 batch_index: Option<i32>,
435 batch_size: Option<i32>,
436 execute_at: DateTime<Utc>,
437 timeout_seconds: i32,
438 max_attempts: i32,
439 priority: i32,
440 join_window_timeout_seconds: Option<i32>,
441 ) -> Result<()> {
442 sqlx::query(
443 "INSERT INTO seesaw_handler_executions (
444 event_id, handler_id, correlation_id, status,
445 event_type, event_payload, parent_event_id,
446 batch_id, batch_index, batch_size,
447 execute_at, timeout_seconds, max_attempts, priority,
448 join_window_timeout_seconds
449 )
450 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
451 ON CONFLICT (event_id, handler_id) DO NOTHING",
452 )
453 .bind(event_id)
454 .bind(handler_id)
455 .bind(correlation_id)
456 .bind(event_type)
457 .bind(event_payload)
458 .bind(parent_event_id)
459 .bind(batch_id)
460 .bind(batch_index)
461 .bind(batch_size)
462 .bind(execute_at)
463 .bind(timeout_seconds)
464 .bind(max_attempts)
465 .bind(priority)
466 .bind(join_window_timeout_seconds)
467 .execute(&self.pool)
468 .await?;
469
470 Ok(())
471 }
472
473 async fn poll_next_effect(&self) -> Result<Option<QueuedHandlerExecution>> {
474 let row: Option<HandlerRow> = sqlx::query_as(
475 "WITH next_effect AS (
476 SELECT event_id, handler_id
477 FROM seesaw_handler_executions
478 WHERE (
479 status = 'pending'
480 OR (status = 'failed' AND attempts < max_attempts)
481 )
482 AND execute_at <= NOW()
483 ORDER BY priority ASC, execute_at ASC, event_id ASC, handler_id ASC
484 LIMIT 1
485 FOR UPDATE SKIP LOCKED
486 )
487 UPDATE seesaw_handler_executions e
488 SET status = 'executing',
489 claimed_at = NOW(),
490 last_attempted_at = NOW(),
491 attempts = e.attempts + 1
492 FROM next_effect
493 WHERE e.event_id = next_effect.event_id
494 AND e.handler_id = next_effect.handler_id
495 RETURNING
496 e.event_id, e.handler_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
497 e.batch_id, e.batch_index, e.batch_size,
498 e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts,
499 e.join_window_timeout_seconds",
500 )
501 .fetch_optional(&self.pool)
502 .await?;
503
504 if let Some(r) = row {
505 Ok(Some(QueuedHandlerExecution {
506 event_id: r.event_id,
507 handler_id: r.handler_id,
508 correlation_id: r.correlation_id,
509 event_type: r.event_type,
510 event_payload: r.event_payload,
511 parent_event_id: r.parent_event_id,
512 batch_id: r.batch_id,
513 batch_index: r.batch_index,
514 batch_size: r.batch_size,
515 execute_at: r.execute_at,
516 timeout_seconds: r.timeout_seconds,
517 max_attempts: r.max_attempts,
518 priority: r.priority,
519 attempts: r.attempts,
520 join_window_timeout_seconds: r.join_window_timeout_seconds,
521 }))
522 } else {
523 Ok(None)
524 }
525 }
526
527 async fn complete_effect(
528 &self,
529 event_id: Uuid,
530 handler_id: String,
531 result: serde_json::Value,
532 ) -> Result<()> {
533 sqlx::query(
534 "UPDATE seesaw_handler_executions
535 SET status = 'completed',
536 result = $3,
537 completed_at = NOW()
538 WHERE event_id = $1 AND handler_id = $2",
539 )
540 .bind(event_id)
541 .bind(handler_id)
542 .bind(result)
543 .execute(&self.pool)
544 .await?;
545
546 Ok(())
547 }
548
549 async fn complete_effect_with_events(
550 &self,
551 event_id: Uuid,
552 handler_id: String,
553 result: serde_json::Value,
554 emitted_events: Vec<EmittedEvent>,
555 ) -> Result<()> {
556 let effect: HandlerRow = sqlx::query_as(
558 "SELECT event_id, handler_id, correlation_id, event_type, event_payload, parent_event_id,
559 batch_id, batch_index, batch_size,
560 execute_at, timeout_seconds, max_attempts, priority, attempts, join_window_timeout_seconds
561 FROM seesaw_handler_executions
562 WHERE event_id = $1 AND handler_id = $2",
563 )
564 .bind(event_id)
565 .bind(&handler_id)
566 .fetch_one(&self.pool)
567 .await?;
568
569 let parent: ParentEventRow = sqlx::query_as(
571 "SELECT hops, created_at
572 FROM seesaw_events
573 WHERE event_id = $1
574 ORDER BY created_at ASC, id ASC
575 LIMIT 1",
576 )
577 .bind(event_id)
578 .fetch_one(&self.pool)
579 .await?;
580
581 let mut tx = self.pool.begin().await?;
583
584 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
586 let deterministic_id = Uuid::new_v5(
589 &NAMESPACE_SEESAW,
590 format!(
591 "{}-{}-{}-{}",
592 event_id, handler_id, emitted.event_type, emitted_index
593 )
594 .as_bytes(),
595 );
596
597 let deterministic_timestamp = emitted_event_created_at(parent.created_at);
600
601 sqlx::query(
603 "INSERT INTO seesaw_events (
604 event_id, parent_id, correlation_id, event_type, payload, hops,
605 batch_id, batch_index, batch_size, created_at
606 )
607 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
608 ON CONFLICT (event_id, created_at) DO NOTHING",
609 )
610 .bind(deterministic_id)
611 .bind(Some(event_id))
612 .bind(effect.correlation_id)
613 .bind(&emitted.event_type)
614 .bind(emitted.payload)
615 .bind(parent.hops + 1)
616 .bind(emitted.batch_id)
617 .bind(emitted.batch_index)
618 .bind(emitted.batch_size)
619 .bind(deterministic_timestamp)
620 .execute(&mut *tx)
621 .await?;
622 }
623
624 sqlx::query(
626 "UPDATE seesaw_handler_executions
627 SET status = 'completed',
628 result = $3,
629 completed_at = NOW()
630 WHERE event_id = $1 AND handler_id = $2",
631 )
632 .bind(event_id)
633 .bind(handler_id)
634 .bind(result)
635 .execute(&mut *tx)
636 .await?;
637
638 tx.commit().await?;
640
641 Ok(())
642 }
643
644 async fn fail_effect(
645 &self,
646 event_id: Uuid,
647 handler_id: String,
648 error: String,
649 attempts: i32,
650 ) -> Result<()> {
651 let retry_at = Utc::now() + Duration::seconds(handler_retry_delay_seconds(attempts));
652 sqlx::query(
653 "UPDATE seesaw_handler_executions
654 SET status = 'failed',
655 error = $3,
656 execute_at = $5,
657 claimed_at = NULL
658 WHERE event_id = $1 AND handler_id = $2 AND attempts >= $4",
659 )
660 .bind(event_id)
661 .bind(handler_id)
662 .bind(error)
663 .bind(attempts)
664 .bind(retry_at)
665 .execute(&self.pool)
666 .await?;
667
668 Ok(())
669 }
670
671 async fn dlq_effect(
672 &self,
673 event_id: Uuid,
674 handler_id: String,
675 error: String,
676 reason: String,
677 attempts: i32,
678 ) -> Result<()> {
679 self.dlq_effect_with_events(event_id, handler_id, error, reason, attempts, Vec::new())
680 .await
681 }
682
683 async fn dlq_effect_with_events(
684 &self,
685 event_id: Uuid,
686 handler_id: String,
687 error: String,
688 reason: String,
689 attempts: i32,
690 emitted_events: Vec<EmittedEvent>,
691 ) -> Result<()> {
692 let effect = sqlx::query_as::<_, HandlerRow>(
695 "SELECT event_id, handler_id, correlation_id, event_type, event_payload, parent_event_id,
696 batch_id, batch_index, batch_size,
697 execute_at, timeout_seconds, max_attempts, priority, attempts, join_window_timeout_seconds
698 FROM seesaw_handler_executions
699 WHERE event_id = $1 AND handler_id = $2",
700 )
701 .bind(event_id)
702 .bind(&handler_id)
703 .fetch_optional(&self.pool)
704 .await?;
705
706 let source_event = sqlx::query_as::<_, DlqSourceEventRow>(
707 "SELECT correlation_id, event_type, payload, batch_id, batch_index, batch_size, hops, created_at
708 FROM seesaw_events
709 WHERE event_id = $1
710 ORDER BY created_at ASC, id ASC
711 LIMIT 1",
712 )
713 .bind(event_id)
714 .fetch_optional(&self.pool)
715 .await?;
716
717 let (
718 source_correlation_id,
719 source_event_type,
720 source_event_payload,
721 source_batch_id,
722 source_batch_index,
723 source_batch_size,
724 ) = if let Some(effect) = effect {
725 (
726 effect.correlation_id,
727 effect.event_type,
728 effect.event_payload,
729 effect.batch_id,
730 effect.batch_index,
731 effect.batch_size,
732 )
733 } else if let Some(source) = source_event.as_ref() {
734 (
735 source.correlation_id,
736 source.event_type.clone(),
737 source.payload.clone(),
738 source.batch_id,
739 source.batch_index,
740 source.batch_size,
741 )
742 } else {
743 anyhow::bail!(
744 "cannot DLQ unknown effect {} for missing event {}",
745 handler_id,
746 event_id
747 );
748 };
749
750 let mut tx = self.pool.begin().await?;
751
752 sqlx::query(
754 "INSERT INTO seesaw_dlq (
755 event_id, handler_id, correlation_id, error, event_type, event_payload, reason, attempts
756 )
757 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
758 )
759 .bind(event_id)
760 .bind(&handler_id)
761 .bind(source_correlation_id)
762 .bind(&error)
763 .bind(&source_event_type)
764 .bind(&source_event_payload)
765 .bind(&reason)
766 .bind(attempts)
767 .execute(&mut *tx)
768 .await?;
769 let preserve_batch_terminal = reason != "accumulate_timeout";
770
771 let synthetic_created_at = source_event
772 .as_ref()
773 .map(|row| emitted_event_created_at(row.created_at))
774 .unwrap_or_else(Utc::now);
775 let synthetic_hops = source_event.as_ref().map(|row| row.hops + 1).unwrap_or(0);
776
777 if emitted_events.is_empty() {
778 if !preserve_batch_terminal {
779 sqlx::query(
780 "DELETE FROM seesaw_handler_executions WHERE event_id = $1 AND handler_id = $2",
781 )
782 .bind(event_id)
783 .bind(&handler_id)
784 .execute(&mut *tx)
785 .await?;
786 tx.commit().await?;
787 return Ok(());
788 }
789 if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
790 (source_batch_id, source_batch_index, source_batch_size)
791 {
792 let synthetic_event_id = Uuid::new_v5(
793 &NAMESPACE_SEESAW,
794 format!("{}-{}-dlq-terminal", event_id, handler_id).as_bytes(),
795 );
796
797 sqlx::query(
798 "INSERT INTO seesaw_events (
799 event_id, parent_id, correlation_id, event_type, payload, hops,
800 batch_id, batch_index, batch_size, created_at
801 )
802 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
803 ON CONFLICT (event_id, created_at) DO NOTHING",
804 )
805 .bind(synthetic_event_id)
806 .bind(Some(event_id))
807 .bind(source_correlation_id)
808 .bind(&source_event_type)
809 .bind(&source_event_payload)
810 .bind(synthetic_hops)
811 .bind(Some(batch_id))
812 .bind(Some(batch_index))
813 .bind(Some(batch_size))
814 .bind(synthetic_created_at)
815 .execute(&mut *tx)
816 .await?;
817 }
818 } else {
819 for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
820 let synthetic_event_id = Uuid::new_v5(
821 &NAMESPACE_SEESAW,
822 format!(
823 "{}-{}-dlq-terminal-{}-{}",
824 event_id, handler_id, emitted.event_type, emitted_index
825 )
826 .as_bytes(),
827 );
828
829 sqlx::query(
830 "INSERT INTO seesaw_events (
831 event_id, parent_id, correlation_id, event_type, payload, hops,
832 batch_id, batch_index, batch_size, created_at
833 )
834 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
835 ON CONFLICT (event_id, created_at) DO NOTHING",
836 )
837 .bind(synthetic_event_id)
838 .bind(Some(event_id))
839 .bind(source_correlation_id)
840 .bind(&emitted.event_type)
841 .bind(emitted.payload)
842 .bind(synthetic_hops)
843 .bind(emitted.batch_id.or(if preserve_batch_terminal {
844 source_batch_id
845 } else {
846 None
847 }))
848 .bind(emitted.batch_index.or(if preserve_batch_terminal {
849 source_batch_index
850 } else {
851 None
852 }))
853 .bind(emitted.batch_size.or(if preserve_batch_terminal {
854 source_batch_size
855 } else {
856 None
857 }))
858 .bind(synthetic_created_at)
859 .execute(&mut *tx)
860 .await?;
861 }
862 }
863
864 sqlx::query(
866 "DELETE FROM seesaw_handler_executions WHERE event_id = $1 AND handler_id = $2",
867 )
868 .bind(event_id)
869 .bind(&handler_id)
870 .execute(&mut *tx)
871 .await?;
872
873 tx.commit().await?;
874
875 Ok(())
876 }
877
878 async fn subscribe_workflow_events(
879 &self,
880 correlation_id: Uuid,
881 ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
882 use sqlx::postgres::PgListener;
883
884 let channel = format!("seesaw_workflow_{}", correlation_id);
885 const PAGE_SIZE: i64 = 256;
886 const CATCH_UP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
887
888 let initial_cursor: Option<(DateTime<Utc>, i64)> = sqlx::query_as(
891 "SELECT created_at, id
892 FROM seesaw_events
893 WHERE correlation_id = $1
894 ORDER BY created_at DESC, id DESC
895 LIMIT 1",
896 )
897 .bind(correlation_id)
898 .fetch_optional(&self.pool)
899 .await?;
900
901 let mut listener = PgListener::connect_with(&self.pool).await?;
903 listener.listen(&channel).await?;
904
905 let pool = self.pool.clone();
906 let (tx, rx) = futures::channel::mpsc::unbounded::<seesaw_core::WorkflowEvent>();
907
908 tokio::spawn(async move {
909 let mut cursor = initial_cursor;
910 let mut drain_pending = true;
911
912 loop {
913 if !drain_pending {
914 match tokio::time::timeout(CATCH_UP_INTERVAL, listener.recv()).await {
915 Ok(Ok(_notification)) => {}
916 Ok(Err(error)) => {
917 tracing::warn!(
918 "workflow listener recv failed for {}: {}",
919 correlation_id,
920 error
921 );
922 return;
923 }
924 Err(_) => {}
925 }
926 }
927 drain_pending = false;
928
929 loop {
930 let rows_result: std::result::Result<Vec<WorkflowEventRow>, sqlx::Error> =
931 if let Some((created_at, id)) = cursor {
932 sqlx::query_as(
933 "SELECT id, event_id, correlation_id, event_type, payload, created_at
934 FROM seesaw_events
935 WHERE correlation_id = $1
936 AND (
937 created_at > $2
938 OR (created_at = $2 AND id > $3)
939 )
940 ORDER BY created_at ASC, id ASC
941 LIMIT $4",
942 )
943 .bind(correlation_id)
944 .bind(created_at)
945 .bind(id)
946 .bind(PAGE_SIZE)
947 .fetch_all(&pool)
948 .await
949 } else {
950 sqlx::query_as(
951 "SELECT id, event_id, correlation_id, event_type, payload, created_at
952 FROM seesaw_events
953 WHERE correlation_id = $1
954 ORDER BY created_at ASC, id ASC
955 LIMIT $2",
956 )
957 .bind(correlation_id)
958 .bind(PAGE_SIZE)
959 .fetch_all(&pool)
960 .await
961 };
962
963 let rows = match rows_result {
964 Ok(rows) => rows,
965 Err(error) => {
966 tracing::warn!(
967 "workflow event query failed for {}: {}",
968 correlation_id,
969 error
970 );
971 return;
972 }
973 };
974
975 if rows.is_empty() {
976 break;
977 }
978
979 for row in rows {
980 cursor = Some((row.created_at, row.id));
981 if tx
982 .unbounded_send(seesaw_core::WorkflowEvent {
983 event_id: row.event_id,
984 correlation_id: row.correlation_id,
985 event_type: row.event_type,
986 payload: row.payload,
987 })
988 .is_err()
989 {
990 return;
991 }
992 }
993 }
994 }
995 });
996
997 Ok(Box::new(rx))
998 }
999
1000 async fn get_workflow_status(
1001 &self,
1002 correlation_id: Uuid,
1003 ) -> Result<seesaw_core::WorkflowStatus> {
1004 let pending_effects = sqlx::query_as::<_, (i64,)>(
1005 "SELECT COUNT(*) FROM seesaw_handler_executions
1006 WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
1007 )
1008 .bind(correlation_id)
1009 .fetch_one(&self.pool)
1010 .await?
1011 .0;
1012
1013 let last_event = sqlx::query_as::<_, (String,)>(
1014 "SELECT event_type FROM seesaw_events
1015 WHERE correlation_id = $1
1016 ORDER BY created_at DESC, id DESC
1017 LIMIT 1",
1018 )
1019 .bind(correlation_id)
1020 .fetch_optional(&self.pool)
1021 .await?
1022 .map(|r| r.0);
1023
1024 Ok(seesaw_core::WorkflowStatus {
1025 correlation_id,
1026 pending_effects,
1027 is_settled: pending_effects == 0,
1028 last_event,
1029 })
1030 }
1031
1032 async fn join_same_batch_append_and_maybe_claim(
1033 &self,
1034 join_handler_id: String,
1035 correlation_id: Uuid,
1036 source_event_id: Uuid,
1037 source_event_type: String,
1038 source_payload: serde_json::Value,
1039 source_created_at: DateTime<Utc>,
1040 batch_id: Uuid,
1041 batch_index: i32,
1042 batch_size: i32,
1043 join_window_timeout_seconds: Option<i32>,
1044 ) -> Result<Option<Vec<JoinEntry>>> {
1045 let mut tx = self.pool.begin().await?;
1046
1047 sqlx::query(
1048 "INSERT INTO seesaw_join_entries (
1049 join_handler_id, correlation_id, source_event_id, source_event_type, source_payload,
1050 source_created_at, batch_id, batch_index, batch_size
1051 )
1052 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1053 ON CONFLICT (join_handler_id, correlation_id, source_event_id) DO NOTHING",
1054 )
1055 .bind(&join_handler_id)
1056 .bind(correlation_id)
1057 .bind(source_event_id)
1058 .bind(&source_event_type)
1059 .bind(source_payload)
1060 .bind(source_created_at)
1061 .bind(batch_id)
1062 .bind(batch_index)
1063 .bind(batch_size)
1064 .execute(&mut *tx)
1065 .await?;
1066
1067 sqlx::query(
1068 "INSERT INTO seesaw_join_windows (
1069 join_handler_id, correlation_id, mode, batch_id, target_count, status,
1070 window_timeout_seconds, expires_at
1071 )
1072 VALUES (
1073 $1,
1074 $2,
1075 'same_batch',
1076 $3,
1077 $4,
1078 'open',
1079 $5,
1080 CASE
1081 WHEN $5 IS NULL THEN NULL
1082 ELSE NOW() + ($5::int * INTERVAL '1 second')
1083 END
1084 )
1085 ON CONFLICT (join_handler_id, correlation_id, batch_id) DO NOTHING",
1086 )
1087 .bind(&join_handler_id)
1088 .bind(correlation_id)
1089 .bind(batch_id)
1090 .bind(batch_size)
1091 .bind(join_window_timeout_seconds)
1092 .execute(&mut *tx)
1093 .await?;
1094
1095 sqlx::query(
1096 "UPDATE seesaw_join_windows
1097 SET target_count = $4,
1098 updated_at = NOW()
1099 WHERE join_handler_id = $1
1100 AND correlation_id = $2
1101 AND batch_id = $3
1102 AND target_count <> $4",
1103 )
1104 .bind(&join_handler_id)
1105 .bind(correlation_id)
1106 .bind(batch_id)
1107 .bind(batch_size)
1108 .execute(&mut *tx)
1109 .await?;
1110
1111 let claimed: Option<(String,)> = sqlx::query_as(
1112 "UPDATE seesaw_join_windows w
1113 SET status = 'processing',
1114 sealed_at = COALESCE(w.sealed_at, NOW()),
1115 processing_started_at = NOW(),
1116 updated_at = NOW(),
1117 last_error = NULL
1118 WHERE w.join_handler_id = $1
1119 AND w.correlation_id = $2
1120 AND w.batch_id = $3
1121 AND w.status = 'open'
1122 AND (w.expires_at IS NULL OR w.expires_at > NOW())
1123 AND (
1124 SELECT COUNT(*)::int
1125 FROM seesaw_join_entries e
1126 WHERE e.join_handler_id = w.join_handler_id
1127 AND e.correlation_id = w.correlation_id
1128 AND e.batch_id = w.batch_id
1129 ) >= w.target_count
1130 RETURNING w.join_handler_id",
1131 )
1132 .bind(&join_handler_id)
1133 .bind(correlation_id)
1134 .bind(batch_id)
1135 .fetch_optional(&mut *tx)
1136 .await?;
1137
1138 if claimed.is_none() {
1139 tx.commit().await?;
1140 return Ok(None);
1141 }
1142
1143 let rows = sqlx::query_as::<_, (Uuid, String, serde_json::Value, Uuid, i32, i32, DateTime<Utc>)>(
1144 "SELECT source_event_id, source_event_type, source_payload, batch_id, batch_index, batch_size, source_created_at
1145 FROM seesaw_join_entries
1146 WHERE join_handler_id = $1
1147 AND correlation_id = $2
1148 AND batch_id = $3
1149 ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
1150 )
1151 .bind(&join_handler_id)
1152 .bind(correlation_id)
1153 .bind(batch_id)
1154 .fetch_all(&mut *tx)
1155 .await?;
1156
1157 let entries = rows
1158 .into_iter()
1159 .map(
1160 |(
1161 source_event_id,
1162 event_type,
1163 payload,
1164 batch_id,
1165 batch_index,
1166 batch_size,
1167 created_at,
1168 )| JoinEntry {
1169 source_event_id,
1170 event_type,
1171 payload,
1172 batch_id,
1173 batch_index,
1174 batch_size,
1175 created_at,
1176 },
1177 )
1178 .collect::<Vec<_>>();
1179
1180 tx.commit().await?;
1181 Ok(Some(entries))
1182 }
1183
1184 async fn join_same_batch_complete(
1185 &self,
1186 join_handler_id: String,
1187 correlation_id: Uuid,
1188 batch_id: Uuid,
1189 ) -> Result<()> {
1190 let mut tx = self.pool.begin().await?;
1191
1192 sqlx::query(
1193 "UPDATE seesaw_join_windows
1194 SET status = 'completed',
1195 completed_at = NOW(),
1196 updated_at = NOW()
1197 WHERE join_handler_id = $1
1198 AND correlation_id = $2
1199 AND batch_id = $3",
1200 )
1201 .bind(&join_handler_id)
1202 .bind(correlation_id)
1203 .bind(batch_id)
1204 .execute(&mut *tx)
1205 .await?;
1206
1207 sqlx::query(
1208 "DELETE FROM seesaw_join_entries
1209 WHERE join_handler_id = $1
1210 AND correlation_id = $2
1211 AND batch_id = $3",
1212 )
1213 .bind(&join_handler_id)
1214 .bind(correlation_id)
1215 .bind(batch_id)
1216 .execute(&mut *tx)
1217 .await?;
1218
1219 sqlx::query(
1220 "DELETE FROM seesaw_join_windows
1221 WHERE join_handler_id = $1
1222 AND correlation_id = $2
1223 AND batch_id = $3",
1224 )
1225 .bind(&join_handler_id)
1226 .bind(correlation_id)
1227 .bind(batch_id)
1228 .execute(&mut *tx)
1229 .await?;
1230
1231 tx.commit().await?;
1232 Ok(())
1233 }
1234
1235 async fn join_same_batch_release(
1236 &self,
1237 join_handler_id: String,
1238 correlation_id: Uuid,
1239 batch_id: Uuid,
1240 error: String,
1241 ) -> Result<()> {
1242 sqlx::query(
1243 "UPDATE seesaw_join_windows
1244 SET status = 'open',
1245 processing_started_at = NULL,
1246 last_error = $4,
1247 updated_at = NOW()
1248 WHERE join_handler_id = $1
1249 AND correlation_id = $2
1250 AND batch_id = $3
1251 AND status = 'processing'",
1252 )
1253 .bind(&join_handler_id)
1254 .bind(correlation_id)
1255 .bind(batch_id)
1256 .bind(error)
1257 .execute(&self.pool)
1258 .await?;
1259
1260 Ok(())
1261 }
1262
1263 async fn expire_same_batch_windows(
1264 &self,
1265 now: DateTime<Utc>,
1266 ) -> Result<Vec<ExpiredJoinWindow>> {
1267 let mut tx = self.pool.begin().await?;
1268
1269 let windows = sqlx::query_as::<_, (String, Uuid, Uuid)>(
1270 "SELECT join_handler_id, correlation_id, batch_id
1271 FROM seesaw_join_windows
1272 WHERE status = 'open'
1273 AND expires_at IS NOT NULL
1274 AND expires_at <= $1
1275 FOR UPDATE SKIP LOCKED",
1276 )
1277 .bind(now)
1278 .fetch_all(&mut *tx)
1279 .await?;
1280
1281 let mut expired = Vec::with_capacity(windows.len());
1282 for (join_handler_id, correlation_id, batch_id) in windows {
1283 let source_event_ids = sqlx::query_as::<_, (Uuid,)>(
1284 "SELECT source_event_id
1285 FROM seesaw_join_entries
1286 WHERE join_handler_id = $1
1287 AND correlation_id = $2
1288 AND batch_id = $3
1289 ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
1290 )
1291 .bind(&join_handler_id)
1292 .bind(correlation_id)
1293 .bind(batch_id)
1294 .fetch_all(&mut *tx)
1295 .await?
1296 .into_iter()
1297 .map(|row| row.0)
1298 .collect::<Vec<_>>();
1299
1300 sqlx::query(
1301 "DELETE FROM seesaw_join_entries
1302 WHERE join_handler_id = $1
1303 AND correlation_id = $2
1304 AND batch_id = $3",
1305 )
1306 .bind(&join_handler_id)
1307 .bind(correlation_id)
1308 .bind(batch_id)
1309 .execute(&mut *tx)
1310 .await?;
1311
1312 sqlx::query(
1313 "DELETE FROM seesaw_join_windows
1314 WHERE join_handler_id = $1
1315 AND correlation_id = $2
1316 AND batch_id = $3",
1317 )
1318 .bind(&join_handler_id)
1319 .bind(correlation_id)
1320 .bind(batch_id)
1321 .execute(&mut *tx)
1322 .await?;
1323
1324 expired.push(ExpiredJoinWindow {
1325 join_handler_id,
1326 correlation_id,
1327 batch_id,
1328 source_event_ids,
1329 });
1330 }
1331
1332 tx.commit().await?;
1333 Ok(expired)
1334 }
1335}
1336
1337#[derive(FromRow)]
1338struct StreamRow {
1339 seq: i64,
1340 stream_type: String,
1341 correlation_id: Uuid,
1342 event_id: Option<Uuid>,
1343 effect_event_id: Option<Uuid>,
1344 handler_id: Option<String>,
1345 status: Option<String>,
1346 error: Option<String>,
1347 payload: Option<serde_json::Value>,
1348 created_at: DateTime<Utc>,
1349}
1350
1351#[derive(FromRow)]
1352struct EffectLogRow {
1353 correlation_id: Uuid,
1354 event_id: Uuid,
1355 handler_id: String,
1356 status: String,
1357 attempts: i32,
1358 event_type: String,
1359 result: Option<serde_json::Value>,
1360 error: Option<String>,
1361 created_at: DateTime<Utc>,
1362 execute_at: DateTime<Utc>,
1363 claimed_at: Option<DateTime<Utc>>,
1364 last_attempted_at: Option<DateTime<Utc>>,
1365 completed_at: Option<DateTime<Utc>>,
1366}
1367
1368#[derive(FromRow)]
1369struct DeadLetterRow {
1370 correlation_id: Uuid,
1371 event_id: Uuid,
1372 handler_id: String,
1373 event_type: String,
1374 event_payload: serde_json::Value,
1375 error: String,
1376 reason: String,
1377 attempts: i32,
1378 failed_at: DateTime<Utc>,
1379 resolved_at: Option<DateTime<Utc>>,
1380}
1381
1382#[derive(FromRow)]
1383struct FailedWorkflowRow {
1384 correlation_id: Uuid,
1385 failed_effects: i64,
1386 active_effects: i64,
1387 dead_letters: i64,
1388 last_failed_at: Option<DateTime<Utc>>,
1389 last_error: Option<String>,
1390}
1391
1392#[async_trait]
1393impl InsightStore for PostgresStore {
1394 async fn subscribe_events(
1395 &self,
1396 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
1397 use futures::stream::StreamExt;
1398 use sqlx::postgres::PgListener;
1399
1400 let mut listener = PgListener::connect_with(&self.pool).await?;
1402 listener.listen("seesaw_stream").await?;
1403
1404 let pool = self.pool.clone();
1406 let stream = listener.into_stream().filter_map(move |result| {
1407 let pool = pool.clone();
1408 Box::pin(async move {
1409 match result {
1410 Ok(_notification) => {
1411 if let Ok(row) = sqlx::query_as::<_, StreamRow>(
1414 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1415 handler_id, status, error, payload, created_at
1416 FROM seesaw_stream
1417 ORDER BY seq DESC
1418 LIMIT 1",
1419 )
1420 .fetch_one(&pool)
1421 .await
1422 {
1423 Some(stream_row_to_insight_event(row))
1424 } else {
1425 None
1426 }
1427 }
1428 Err(_) => None,
1429 }
1430 })
1431 });
1432
1433 Ok(Box::new(stream))
1434 }
1435
1436 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<seesaw_core::WorkflowTree> {
1437 let events = sqlx::query_as::<_, EventRow>(
1439 "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
1440 batch_id, batch_index, batch_size, created_at
1441 FROM seesaw_events
1442 WHERE correlation_id = $1
1443 ORDER BY created_at ASC",
1444 )
1445 .bind(correlation_id)
1446 .fetch_all(&self.pool)
1447 .await?;
1448
1449 let effects = sqlx::query_as::<_, EffectTreeRow>(
1451 "SELECT event_id, handler_id, status, result, error, attempts, created_at,
1452 batch_id, batch_index, batch_size
1453 FROM seesaw_handler_executions
1454 WHERE correlation_id = $1
1455 ORDER BY created_at ASC",
1456 )
1457 .bind(correlation_id)
1458 .fetch_all(&self.pool)
1459 .await?;
1460
1461 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1463 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1464
1465 Ok(seesaw_core::WorkflowTree {
1466 correlation_id,
1467 roots,
1468 event_count: events.len(),
1469 effect_count: effects.len(),
1470 })
1471 }
1472
1473 async fn get_stats(&self) -> Result<seesaw_core::InsightStats> {
1474 let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
1475 .fetch_one(&self.pool)
1476 .await?
1477 .0;
1478
1479 let active_effects = sqlx::query_as::<_, (i64,)>(
1480 "SELECT COUNT(*) FROM seesaw_handler_executions
1481 WHERE status IN ('pending', 'executing')",
1482 )
1483 .fetch_one(&self.pool)
1484 .await?
1485 .0;
1486
1487 let completed_effects = sqlx::query_as::<_, (i64,)>(
1488 "SELECT COUNT(*) FROM seesaw_handler_executions WHERE status = 'completed'",
1489 )
1490 .fetch_one(&self.pool)
1491 .await?
1492 .0;
1493
1494 let failed_effects = sqlx::query_as::<_, (i64,)>(
1495 "SELECT COUNT(*) FROM seesaw_handler_executions WHERE status = 'failed'",
1496 )
1497 .fetch_one(&self.pool)
1498 .await?
1499 .0;
1500
1501 Ok(seesaw_core::InsightStats {
1502 total_events,
1503 active_effects,
1504 completed_effects,
1505 failed_effects,
1506 })
1507 }
1508
1509 async fn get_recent_events(
1510 &self,
1511 cursor: Option<i64>,
1512 limit: usize,
1513 ) -> Result<Vec<InsightEvent>> {
1514 let rows = if let Some(cursor_seq) = cursor {
1515 sqlx::query_as::<_, StreamRow>(
1516 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1517 handler_id, status, error, payload, created_at
1518 FROM seesaw_stream
1519 WHERE seq > $1
1520 ORDER BY seq ASC
1521 LIMIT $2",
1522 )
1523 .bind(cursor_seq)
1524 .bind(limit as i64)
1525 .fetch_all(&self.pool)
1526 .await?
1527 } else {
1528 sqlx::query_as::<_, StreamRow>(
1529 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1530 handler_id, status, error, payload, created_at
1531 FROM seesaw_stream
1532 ORDER BY seq DESC
1533 LIMIT $1",
1534 )
1535 .bind(limit as i64)
1536 .fetch_all(&self.pool)
1537 .await?
1538 };
1539
1540 Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
1541 }
1542
1543 async fn get_effect_logs(
1544 &self,
1545 correlation_id: Option<Uuid>,
1546 limit: usize,
1547 ) -> Result<Vec<EffectExecutionLog>> {
1548 let rows = sqlx::query_as::<_, EffectLogRow>(
1549 "SELECT
1550 correlation_id,
1551 event_id,
1552 handler_id,
1553 status,
1554 attempts,
1555 event_type,
1556 result,
1557 error,
1558 created_at,
1559 execute_at,
1560 claimed_at,
1561 last_attempted_at,
1562 completed_at
1563 FROM seesaw_handler_executions
1564 WHERE ($1::uuid IS NULL OR correlation_id = $1)
1565 ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
1566 LIMIT $2",
1567 )
1568 .bind(correlation_id)
1569 .bind(limit as i64)
1570 .fetch_all(&self.pool)
1571 .await?;
1572
1573 Ok(rows
1574 .into_iter()
1575 .map(|row| {
1576 let started_at = row.claimed_at.or(row.last_attempted_at);
1577 let duration_ms = match (started_at, row.completed_at) {
1578 (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
1579 _ => None,
1580 };
1581
1582 EffectExecutionLog {
1583 correlation_id: row.correlation_id,
1584 event_id: row.event_id,
1585 handler_id: row.handler_id,
1586 status: row.status,
1587 attempts: row.attempts,
1588 event_type: Some(row.event_type),
1589 result: row.result,
1590 error: row.error,
1591 created_at: row.created_at,
1592 execute_at: Some(row.execute_at),
1593 claimed_at: row.claimed_at,
1594 last_attempted_at: row.last_attempted_at,
1595 completed_at: row.completed_at,
1596 duration_ms,
1597 }
1598 })
1599 .collect())
1600 }
1601
1602 async fn get_dead_letters(
1603 &self,
1604 unresolved_only: bool,
1605 limit: usize,
1606 ) -> Result<Vec<DeadLetterEntry>> {
1607 let rows = sqlx::query_as::<_, DeadLetterRow>(
1608 "SELECT
1609 correlation_id,
1610 event_id,
1611 handler_id,
1612 event_type,
1613 event_payload,
1614 error,
1615 reason,
1616 attempts,
1617 failed_at,
1618 resolved_at
1619 FROM seesaw_dlq
1620 WHERE (NOT $1 OR resolved_at IS NULL)
1621 ORDER BY failed_at DESC
1622 LIMIT $2",
1623 )
1624 .bind(unresolved_only)
1625 .bind(limit as i64)
1626 .fetch_all(&self.pool)
1627 .await?;
1628
1629 Ok(rows
1630 .into_iter()
1631 .map(|row| DeadLetterEntry {
1632 correlation_id: row.correlation_id,
1633 event_id: row.event_id,
1634 handler_id: row.handler_id,
1635 event_type: row.event_type,
1636 event_payload: row.event_payload,
1637 error: row.error,
1638 reason: row.reason,
1639 attempts: row.attempts,
1640 failed_at: row.failed_at,
1641 resolved_at: row.resolved_at,
1642 })
1643 .collect())
1644 }
1645
1646 async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
1647 let rows = sqlx::query_as::<_, FailedWorkflowRow>(
1648 "WITH effect_agg AS (
1649 SELECT
1650 correlation_id,
1651 COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
1652 COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
1653 MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
1654 MAX(error) FILTER (WHERE status = 'failed') AS last_error
1655 FROM seesaw_handler_executions
1656 GROUP BY correlation_id
1657 ),
1658 dlq_agg AS (
1659 SELECT
1660 correlation_id,
1661 COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
1662 MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
1663 MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
1664 FROM seesaw_dlq
1665 GROUP BY correlation_id
1666 )
1667 SELECT
1668 COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
1669 COALESCE(e.failed_effects, 0) AS failed_effects,
1670 COALESCE(e.active_effects, 0) AS active_effects,
1671 COALESCE(d.dead_letters, 0) AS dead_letters,
1672 GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
1673 COALESCE(d.last_dlq_error, e.last_error) AS last_error
1674 FROM effect_agg e
1675 FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
1676 WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
1677 ORDER BY last_failed_at DESC NULLS LAST
1678 LIMIT $1",
1679 )
1680 .bind(limit as i64)
1681 .fetch_all(&self.pool)
1682 .await?;
1683
1684 Ok(rows
1685 .into_iter()
1686 .map(|row| FailedWorkflow {
1687 correlation_id: row.correlation_id,
1688 failed_effects: row.failed_effects,
1689 active_effects: row.active_effects,
1690 dead_letters: row.dead_letters,
1691 last_failed_at: row.last_failed_at,
1692 last_error: row.last_error,
1693 })
1694 .collect())
1695 }
1696}
1697
1698#[derive(FromRow)]
1699struct EffectTreeRow {
1700 event_id: Uuid,
1701 handler_id: String,
1702 status: String,
1703 result: Option<serde_json::Value>,
1704 error: Option<String>,
1705 attempts: i32,
1706 created_at: DateTime<Utc>,
1707 batch_id: Option<Uuid>,
1708 batch_index: Option<i32>,
1709 batch_size: Option<i32>,
1710}
1711
1712fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
1713 let stream_type = match row.stream_type.as_str() {
1714 "event_dispatched" => StreamType::EventDispatched,
1715 "effect_started" => StreamType::EffectStarted,
1716 "effect_completed" => StreamType::EffectCompleted,
1717 "effect_failed" => StreamType::EffectFailed,
1718 _ => StreamType::EventDispatched, };
1720
1721 let event_type = if stream_type == StreamType::EventDispatched {
1723 row.payload
1724 .as_ref()
1725 .and_then(|p| p.get("event_type"))
1726 .and_then(|v| v.as_str())
1727 .map(|s| s.to_string())
1728 } else {
1729 None
1730 };
1731
1732 InsightEvent {
1733 seq: row.seq,
1734 stream_type,
1735 correlation_id: row.correlation_id,
1736 event_id: row.event_id,
1737 effect_event_id: row.effect_event_id,
1738 handler_id: row.handler_id,
1739 event_type,
1740 status: row.status,
1741 error: row.error,
1742 payload: row.payload,
1743 created_at: row.created_at,
1744 }
1745}
1746
1747fn build_event_tree(
1748 events: &[EventRow],
1749 effects: &[EffectTreeRow],
1750 parent_id: Option<Uuid>,
1751 event_ids: &HashSet<Uuid>,
1752 is_root_pass: bool,
1753) -> Vec<EventNode> {
1754 events
1755 .iter()
1756 .filter(|event| {
1757 if is_root_pass {
1758 event.parent_id.is_none()
1759 || event
1760 .parent_id
1761 .map(|parent| !event_ids.contains(&parent))
1762 .unwrap_or(false)
1763 } else {
1764 event.parent_id == parent_id
1765 }
1766 })
1767 .map(|event| {
1768 let event_effects: Vec<HandlerNode> = effects
1770 .iter()
1771 .filter(|eff| eff.event_id == event.event_id)
1772 .map(|eff| HandlerNode {
1773 handler_id: eff.handler_id.clone(),
1774 event_id: eff.event_id,
1775 status: eff.status.clone(),
1776 result: eff.result.clone(),
1777 error: eff.error.clone(),
1778 attempts: eff.attempts,
1779 created_at: eff.created_at,
1780 batch_id: eff.batch_id,
1781 batch_index: eff.batch_index,
1782 batch_size: eff.batch_size,
1783 })
1784 .collect();
1785
1786 let children =
1788 build_event_tree(events, effects, Some(event.event_id), event_ids, false);
1789
1790 EventNode {
1791 event_id: event.event_id,
1792 event_type: event.event_type.clone(),
1793 payload: event.payload.clone(),
1794 created_at: event.created_at,
1795 batch_id: event.batch_id,
1796 batch_index: event.batch_index,
1797 batch_size: event.batch_size,
1798 children,
1799 effects: event_effects,
1800 }
1801 })
1802 .collect()
1803}
1804
1805use seesaw_core::backend::{Backend, BackendServeConfig, DispatchedEvent};
1810use seesaw_core::backend::capability::*;
1811use seesaw_core::backend::job_executor::JobExecutor;
1812use seesaw_core::DirectRunner;
1813use std::sync::Arc;
1814use tokio_util::sync::CancellationToken;
1815
1816static DIRECT_RUNNER: DirectRunner = DirectRunner;
1817
1818pub struct PostgresBackend {
1822 store: PostgresStore,
1823}
1824
1825impl PostgresBackend {
1826 pub fn new(pool: PgPool) -> Self {
1828 Self {
1829 store: PostgresStore::new(pool),
1830 }
1831 }
1832
1833 pub fn pool(&self) -> &PgPool {
1835 self.store.pool()
1836 }
1837}
1838
1839impl Clone for PostgresBackend {
1840 fn clone(&self) -> Self {
1841 Self {
1842 store: self.store.clone(),
1843 }
1844 }
1845}
1846
1847#[async_trait]
1848impl Backend for PostgresBackend {
1849 fn name(&self) -> &'static str {
1850 "postgres"
1851 }
1852
1853 async fn publish(&self, event: DispatchedEvent) -> Result<()> {
1854 let queued_event = QueuedEvent {
1855 id: 0, event_id: event.event_id,
1857 parent_id: event.parent_id,
1858 correlation_id: event.correlation_id,
1859 event_type: event.event_type,
1860 payload: event.payload,
1861 hops: event.hops,
1862 retry_count: event.retry_count,
1863 batch_id: event.batch_id,
1864 batch_index: event.batch_index,
1865 batch_size: event.batch_size,
1866 created_at: event.created_at,
1867 };
1868
1869 self.store.publish(queued_event).await
1870 }
1871
1872 async fn serve<D>(
1873 &self,
1874 executor: Arc<JobExecutor<D>>,
1875 config: BackendServeConfig,
1876 shutdown: CancellationToken,
1877 ) -> Result<()>
1878 where
1879 D: Send + Sync + 'static,
1880 {
1881 use seesaw_core::backend::job_executor::HandlerStatus;
1882 use tokio::time::sleep;
1883
1884 let mut handles = Vec::new();
1885
1886 for i in 0..config.event_workers {
1888 let store = self.store.clone();
1889 let executor = executor.clone();
1890 let config = config.event_worker.clone();
1891 let shutdown = shutdown.clone();
1892
1893 let handle: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
1894 tracing::info!("Event worker {} started", i);
1895
1896 while !shutdown.is_cancelled() {
1897 match store.poll_next().await {
1899 Ok(Some(event)) => {
1900 match executor.execute_event(&event, &config, &DIRECT_RUNNER).await {
1902 Ok(commit) => {
1903 let store_commit = EventProcessingCommit {
1905 event_row_id: commit.event_row_id,
1906 event_id: commit.event_id,
1907 correlation_id: commit.correlation_id,
1908 event_type: commit.event_type,
1909 event_payload: commit.event_payload,
1910 queued_effect_intents: commit.queued_effect_intents,
1911 inline_effect_failures: commit.inline_effect_failures.into_iter().map(|f| {
1912 seesaw_core::InlineHandlerFailure {
1913 handler_id: f.handler_id,
1914 error: f.error,
1915 reason: f.reason,
1916 attempts: f.attempts,
1917 }
1918 }).collect(),
1919 emitted_events: commit.emitted_events,
1920 };
1921
1922 if let Err(e) = store.commit_event_processing(store_commit).await {
1923 tracing::error!("Failed to commit event processing: {}", e);
1924 }
1925 }
1926 Err(e) => {
1927 tracing::warn!("Event processing failed: {}, nacking event", e);
1928 let _ = store.nack(event.id, 1).await;
1929 }
1930 }
1931 }
1932 Ok(None) => {
1933 sleep(config.poll_interval).await;
1934 }
1935 Err(e) => {
1936 tracing::error!("Error polling events: {}", e);
1937 sleep(config.poll_interval).await;
1938 }
1939 }
1940 }
1941
1942 tracing::info!("Event worker {} stopped", i);
1943 Ok(())
1944 });
1945
1946 handles.push(handle);
1947 }
1948
1949 for i in 0..config.handler_workers {
1951 let store = self.store.clone();
1952 let executor = executor.clone();
1953 let config = config.handler_worker.clone();
1954 let shutdown = shutdown.clone();
1955
1956 let handle = tokio::spawn(async move {
1957 tracing::info!("Handler worker {} started", i);
1958
1959 while !shutdown.is_cancelled() {
1960 match store.poll_next_effect().await {
1962 Ok(Some(execution)) => {
1963 match executor.execute_handler(execution.clone(), &config, &DIRECT_RUNNER).await {
1965 Ok(result) => {
1966 match result.status {
1967 HandlerStatus::Success => {
1968 if result.emitted_events.is_empty() {
1969 if let Err(e) = store.complete_effect(
1970 execution.event_id,
1971 execution.handler_id,
1972 result.result,
1973 ).await {
1974 tracing::error!("Failed to complete effect: {}", e);
1975 }
1976 } else {
1977 if let Err(e) = store.complete_effect_with_events(
1978 execution.event_id,
1979 execution.handler_id,
1980 result.result,
1981 result.emitted_events,
1982 ).await {
1983 tracing::error!("Failed to complete effect with events: {}", e);
1984 }
1985 }
1986 }
1987 HandlerStatus::Failed { error, attempts } | HandlerStatus::Retry { error, attempts } => {
1988 if attempts >= execution.max_attempts {
1989 if let Err(e) = store.dlq_effect(
1990 execution.event_id,
1991 execution.handler_id,
1992 error,
1993 "max_retries_exceeded".to_string(),
1994 attempts,
1995 ).await {
1996 tracing::error!("Failed to DLQ effect: {}", e);
1997 }
1998 } else {
1999 if let Err(e) = store.fail_effect(
2000 execution.event_id,
2001 execution.handler_id,
2002 error,
2003 attempts,
2004 ).await {
2005 tracing::error!("Failed to mark effect as failed: {}", e);
2006 }
2007 }
2008 }
2009 HandlerStatus::Timeout => {
2010 tracing::warn!("Handler timed out: {}", execution.handler_id);
2011 if execution.attempts >= execution.max_attempts {
2012 if let Err(e) = store.dlq_effect(
2013 execution.event_id,
2014 execution.handler_id,
2015 "Handler execution timed out".to_string(),
2016 "timeout".to_string(),
2017 execution.attempts,
2018 ).await {
2019 tracing::error!("Failed to DLQ timed out effect: {}", e);
2020 }
2021 } else {
2022 if let Err(e) = store.fail_effect(
2023 execution.event_id,
2024 execution.handler_id,
2025 "Handler execution timed out".to_string(),
2026 execution.attempts,
2027 ).await {
2028 tracing::error!("Failed to mark timed out effect as failed: {}", e);
2029 }
2030 }
2031 }
2032 HandlerStatus::JoinWaiting => {
2033 if let Err(e) = store.complete_effect(
2035 execution.event_id,
2036 execution.handler_id,
2037 result.result,
2038 ).await {
2039 tracing::error!("Failed to complete join_waiting effect: {}", e);
2040 }
2041 }
2042 }
2043 }
2044 Err(e) => {
2045 tracing::error!("Handler execution failed: {}", e);
2046 if execution.attempts >= execution.max_attempts {
2047 if let Err(e) = store.dlq_effect(
2048 execution.event_id,
2049 execution.handler_id,
2050 format!("Handler execution error: {}", e),
2051 "execution_error".to_string(),
2052 execution.attempts,
2053 ).await {
2054 tracing::error!("Failed to DLQ effect: {}", e);
2055 }
2056 } else {
2057 if let Err(e) = store.fail_effect(
2058 execution.event_id,
2059 execution.handler_id,
2060 format!("Handler execution error: {}", e),
2061 execution.attempts,
2062 ).await {
2063 tracing::error!("Failed to mark effect as failed: {}", e);
2064 }
2065 }
2066 }
2067 }
2068 }
2069 Ok(None) => {
2070 sleep(config.poll_interval).await;
2071 }
2072 Err(e) => {
2073 tracing::error!("Error polling effects: {}", e);
2074 sleep(config.poll_interval).await;
2075 }
2076 }
2077 }
2078
2079 tracing::info!("Handler worker {} stopped", i);
2080 Ok(())
2081 });
2082
2083 handles.push(handle);
2084 }
2085
2086 shutdown.cancelled().await;
2088 tracing::info!("Shutdown signal received, draining workers...");
2089
2090 let drain_timeout = config.graceful_shutdown_timeout;
2092 tokio::time::timeout(drain_timeout, async {
2093 for handle in handles {
2094 let _ = handle.await;
2095 }
2096 })
2097 .await
2098 .ok();
2099
2100 tracing::info!("PostgresBackend shutdown complete");
2101 Ok(())
2102 }
2103}
2104
2105#[async_trait]
2108impl WorkflowStatusBackend for PostgresBackend {
2109 async fn get_workflow_status(&self, correlation_id: Uuid) -> Result<seesaw_core::WorkflowStatus> {
2110 self.store.get_workflow_status(correlation_id).await
2111 }
2112}
2113
2114#[async_trait]
2115impl WorkflowSubscriptionBackend for PostgresBackend {
2116 async fn subscribe_workflow_events(
2117 &self,
2118 correlation_id: Uuid,
2119 ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
2120 self.store.subscribe_workflow_events(correlation_id).await
2121 }
2122}
2123
2124#[async_trait]
2125impl DeadLetterQueueBackend for PostgresBackend {
2126 async fn list_dlq(&self, _filters: DlqFilters) -> Result<Vec<DeadLetter>> {
2127 Ok(Vec::new())
2130 }
2131
2132 async fn retry_dlq(&self, _event_id: Uuid, _handler_id: String) -> Result<()> {
2133 Ok(())
2135 }
2136}
2137
2138#[async_trait]
2139impl InsightBackend for PostgresBackend {
2140 async fn get_workflow_tree(
2141 &self,
2142 correlation_id: Uuid,
2143 ) -> Result<seesaw_core::backend::capability::WorkflowTree> {
2144 Ok(seesaw_core::backend::capability::WorkflowTree {
2147 correlation_id,
2148 nodes: Vec::new(),
2149 })
2150 }
2151
2152 async fn get_insight_stats(&self) -> Result<seesaw_core::backend::capability::InsightStats> {
2153 Ok(seesaw_core::backend::capability::InsightStats {
2156 total_workflows: 0,
2157 active_workflows: 0,
2158 settled_workflows: 0,
2159 total_events: 0,
2160 total_handlers_executed: 0,
2161 total_dlq_entries: 0,
2162 })
2163 }
2164}
2165
2166#[derive(Debug, Clone, sqlx::FromRow)]
2172struct DlqRow {
2173 id: Uuid,
2174 event_id: Uuid,
2175 handler_id: String,
2176 intent_id: Uuid,
2177 error_message: String,
2178 error_details: Option<Value>,
2179 retry_count: i32,
2180 first_failed_at: DateTime<Utc>,
2181 last_failed_at: DateTime<Utc>,
2182 event_payload: Value,
2183 status: String,
2184 retry_attempts: i32,
2185 last_retry_at: Option<DateTime<Utc>>,
2186 resolved_at: Option<DateTime<Utc>>,
2187 resolution_note: Option<String>,
2188 created_at: DateTime<Utc>,
2189}
2190
2191impl From<DlqRow> for CoreDeadLetter {
2192 fn from(row: DlqRow) -> Self {
2193 let status = match row.status.as_str() {
2194 "open" => DlqStatus::Open,
2195 "retrying" => DlqStatus::Retrying,
2196 "replayed" => DlqStatus::Replayed,
2197 "resolved" => DlqStatus::Resolved,
2198 _ => DlqStatus::Open, };
2200
2201 CoreDeadLetter {
2202 id: row.id,
2203 event_id: row.event_id,
2204 handler_id: row.handler_id,
2205 intent_id: row.intent_id,
2206 error_message: row.error_message,
2207 error_details: row.error_details,
2208 retry_count: row.retry_count,
2209 first_failed_at: row.first_failed_at,
2210 last_failed_at: row.last_failed_at,
2211 event_payload: row.event_payload,
2212 status,
2213 retry_attempts: row.retry_attempts,
2214 last_retry_at: row.last_retry_at,
2215 resolved_at: row.resolved_at,
2216 resolution_note: row.resolution_note,
2217 created_at: row.created_at,
2218 }
2219 }
2220}
2221
2222#[derive(Clone)]
2224pub struct DeadLetterQueue {
2225 pool: PgPool,
2226}
2227
2228impl DeadLetterQueue {
2229 pub fn new(pool: PgPool) -> Self {
2230 Self { pool }
2231 }
2232
2233 pub async fn insert(
2235 &self,
2236 event_id: Uuid,
2237 handler_id: &str,
2238 intent_id: Uuid,
2239 error_message: &str,
2240 error_details: Option<Value>,
2241 retry_count: i32,
2242 first_failed_at: DateTime<Utc>,
2243 last_failed_at: DateTime<Utc>,
2244 event_payload: Value,
2245 ) -> Result<Uuid> {
2246 let id = sqlx::query_scalar::<_, Uuid>(
2247 "INSERT INTO seesaw_dead_letter_queue
2248 (event_id, handler_id, intent_id, error_message, error_details,
2249 retry_count, first_failed_at, last_failed_at, event_payload, status)
2250 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'open')
2251 ON CONFLICT (intent_id) DO UPDATE SET
2252 error_message = EXCLUDED.error_message,
2253 error_details = EXCLUDED.error_details,
2254 retry_count = EXCLUDED.retry_count,
2255 last_failed_at = EXCLUDED.last_failed_at,
2256 status = 'open'
2257 RETURNING id",
2258 )
2259 .bind(event_id)
2260 .bind(handler_id)
2261 .bind(intent_id)
2262 .bind(error_message)
2263 .bind(error_details)
2264 .bind(retry_count)
2265 .bind(first_failed_at)
2266 .bind(last_failed_at)
2267 .bind(event_payload)
2268 .fetch_one(&self.pool)
2269 .await?;
2270
2271 Ok(id)
2272 }
2273
2274 pub async fn list(&self, limit: i64) -> Result<Vec<CoreDeadLetter>> {
2276 let entries = sqlx::query_as::<_, DlqRow>(
2277 "SELECT * FROM seesaw_dead_letter_queue
2278 WHERE status IN ('open', 'retrying')
2279 ORDER BY created_at DESC
2280 LIMIT $1",
2281 )
2282 .bind(limit)
2283 .fetch_all(&self.pool)
2284 .await?;
2285
2286 Ok(entries.into_iter().map(Into::into).collect())
2287 }
2288
2289 pub async fn list_by_handler(&self, handler_id: &str, limit: i64) -> Result<Vec<CoreDeadLetter>> {
2291 let entries = sqlx::query_as::<_, DlqRow>(
2292 "SELECT * FROM seesaw_dead_letter_queue
2293 WHERE handler_id = $1 AND status IN ('open', 'retrying')
2294 ORDER BY created_at DESC
2295 LIMIT $2",
2296 )
2297 .bind(handler_id)
2298 .bind(limit)
2299 .fetch_all(&self.pool)
2300 .await?;
2301
2302 Ok(entries.into_iter().map(Into::into).collect())
2303 }
2304
2305 pub async fn get(&self, dlq_id: Uuid) -> Result<CoreDeadLetter> {
2307 let entry =
2308 sqlx::query_as::<_, DlqRow>("SELECT * FROM seesaw_dead_letter_queue WHERE id = $1")
2309 .bind(dlq_id)
2310 .fetch_one(&self.pool)
2311 .await?;
2312
2313 Ok(entry.into())
2314 }
2315
2316 pub async fn start_retry(&self, dlq_id: Uuid) -> Result<CoreDeadLetter> {
2318 let mut tx = self.pool.begin().await?;
2319
2320 let entry = sqlx::query_as::<_, DlqRow>(
2322 "SELECT * FROM seesaw_dead_letter_queue
2323 WHERE id = $1
2324 FOR UPDATE",
2325 )
2326 .bind(dlq_id)
2327 .fetch_one(&mut *tx)
2328 .await?;
2329
2330 sqlx::query(
2332 "UPDATE seesaw_dead_letter_queue
2333 SET status = 'retrying',
2334 retry_attempts = retry_attempts + 1,
2335 last_retry_at = NOW()
2336 WHERE id = $1",
2337 )
2338 .bind(dlq_id)
2339 .execute(&mut *tx)
2340 .await?;
2341
2342 tx.commit().await?;
2343
2344 Ok(entry.into())
2345 }
2346
2347 pub async fn mark_replayed(&self, dlq_id: Uuid, resolution_note: Option<&str>) -> Result<()> {
2349 sqlx::query(
2350 "UPDATE seesaw_dead_letter_queue
2351 SET status = 'replayed',
2352 resolved_at = NOW(),
2353 resolution_note = $2
2354 WHERE id = $1",
2355 )
2356 .bind(dlq_id)
2357 .bind(resolution_note)
2358 .execute(&self.pool)
2359 .await?;
2360
2361 Ok(())
2362 }
2363
2364 pub async fn mark_retry_failed(
2366 &self,
2367 dlq_id: Uuid,
2368 error_message: &str,
2369 error_details: Option<Value>,
2370 ) -> Result<()> {
2371 sqlx::query(
2372 "UPDATE seesaw_dead_letter_queue
2373 SET status = 'open',
2374 last_failed_at = NOW(),
2375 error_message = $2,
2376 error_details = $3
2377 WHERE id = $1",
2378 )
2379 .bind(dlq_id)
2380 .bind(error_message)
2381 .bind(error_details)
2382 .execute(&self.pool)
2383 .await?;
2384
2385 Ok(())
2386 }
2387
2388 pub async fn mark_resolved(&self, dlq_id: Uuid, resolution_note: &str) -> Result<()> {
2390 sqlx::query(
2391 "UPDATE seesaw_dead_letter_queue
2392 SET status = 'resolved',
2393 resolved_at = NOW(),
2394 resolution_note = $2
2395 WHERE id = $1",
2396 )
2397 .bind(dlq_id)
2398 .bind(resolution_note)
2399 .execute(&self.pool)
2400 .await?;
2401
2402 Ok(())
2403 }
2404
2405 pub async fn stats(&self) -> Result<DlqStats> {
2407 #[derive(sqlx::FromRow)]
2408 struct StatsRow {
2409 open: Option<i64>,
2410 retrying: Option<i64>,
2411 replayed: Option<i64>,
2412 resolved: Option<i64>,
2413 total: Option<i64>,
2414 }
2415
2416 let row: StatsRow = sqlx::query_as(
2417 "SELECT
2418 COUNT(*) FILTER (WHERE status = 'open') as open,
2419 COUNT(*) FILTER (WHERE status = 'retrying') as retrying,
2420 COUNT(*) FILTER (WHERE status = 'replayed') as replayed,
2421 COUNT(*) FILTER (WHERE status = 'resolved') as resolved,
2422 COUNT(*) as total
2423 FROM seesaw_dead_letter_queue",
2424 )
2425 .fetch_one(&self.pool)
2426 .await?;
2427
2428 Ok(DlqStats {
2429 open: row.open.unwrap_or(0) as usize,
2430 retrying: row.retrying.unwrap_or(0) as usize,
2431 replayed: row.replayed.unwrap_or(0) as usize,
2432 resolved: row.resolved.unwrap_or(0) as usize,
2433 total: row.total.unwrap_or(0) as usize,
2434 })
2435 }
2436}
2437
2438#[cfg(test)]
2439mod tests {
2440 use super::*;
2441 use chrono::{TimeZone, Timelike};
2442
2443 #[test]
2444 fn emitted_event_created_at_is_midnight_on_parent_day() {
2445 let parent = Utc
2446 .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
2447 .single()
2448 .expect("valid timestamp");
2449
2450 let emitted = emitted_event_created_at(parent);
2451
2452 assert_eq!(emitted.date_naive(), parent.date_naive());
2453 assert_eq!(emitted.hour(), 0);
2454 assert_eq!(emitted.minute(), 0);
2455 assert_eq!(emitted.second(), 0);
2456 }
2457
2458 #[test]
2459 fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
2460 let first_parent = Utc
2461 .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
2462 .single()
2463 .expect("valid timestamp");
2464 let second_parent = Utc
2465 .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
2466 .single()
2467 .expect("valid timestamp");
2468
2469 let first_emitted = emitted_event_created_at(first_parent);
2470 let second_emitted = emitted_event_created_at(second_parent);
2471
2472 assert_eq!(first_emitted, second_emitted);
2473 }
2474
2475 #[test]
2476 fn handler_retry_delay_seconds_uses_exponential_backoff() {
2477 assert_eq!(handler_retry_delay_seconds(1), 1);
2478 assert_eq!(handler_retry_delay_seconds(2), 2);
2479 assert_eq!(handler_retry_delay_seconds(3), 4);
2480 assert_eq!(handler_retry_delay_seconds(4), 8);
2481 }
2482
2483 #[test]
2484 fn handler_retry_delay_seconds_is_capped() {
2485 assert_eq!(handler_retry_delay_seconds(9), 256);
2486 assert_eq!(handler_retry_delay_seconds(50), 256);
2487 }
2488
2489 #[test]
2490 fn build_event_tree_treats_orphan_parent_as_root() {
2491 let correlation_id = Uuid::new_v4();
2492 let event_id = Uuid::new_v4();
2493 let missing_parent = Uuid::new_v4();
2494 let now = Utc::now();
2495
2496 let events = vec![EventRow {
2497 id: 1,
2498 event_id,
2499 parent_id: Some(missing_parent),
2500 correlation_id,
2501 event_type: "OrphanEvent".to_string(),
2502 payload: serde_json::json!({"ok": true}),
2503 hops: 1,
2504 retry_count: 0,
2505 batch_id: None,
2506 batch_index: None,
2507 batch_size: None,
2508 created_at: now,
2509 }];
2510
2511 let effects: Vec<EffectTreeRow> = Vec::new();
2512 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
2513
2514 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
2515
2516 assert_eq!(roots.len(), 1);
2517 assert_eq!(roots[0].event_id, event_id);
2518 }
2519}