Skip to main content

seesaw_postgres/
lib.rs

1pub mod event_store;
2
3// Simplified PostgresStore without compile-time checked queries
4// Uses dynamic queries for easier testing
5
6use anyhow::Result;
7use async_trait::async_trait;
8use chrono::{DateTime, Duration, Utc};
9use seesaw_core::{
10    insight::*, EmittedEvent, EventProcessingCommit, ExpiredJoinWindow, JoinEntry, QueuedEvent,
11    QueuedHandlerExecution, Store, NAMESPACE_SEESAW,
12    DeadLetter as CoreDeadLetter, DlqStats, DlqStatus,
13};
14use serde_json::Value;
15use sqlx::{FromRow, PgPool};
16use std::collections::HashSet;
17use uuid::Uuid;
18
19const EVENT_CLAIM_SECONDS: i64 = 30;
20
21fn emitted_event_created_at(parent_created_at: DateTime<Utc>) -> DateTime<Utc> {
22    parent_created_at
23        .date_naive()
24        .and_hms_opt(0, 0, 0)
25        .expect("midnight should always be a valid UTC timestamp")
26        .and_utc()
27}
28
29fn handler_retry_delay_seconds(attempts: i32) -> i64 {
30    let exponent = attempts.saturating_sub(1).clamp(0, 8) as u32;
31    1_i64 << exponent
32}
33
34/// PostgreSQL implementation of Store trait
35pub struct PostgresStore {
36    pool: PgPool,
37}
38
39impl PostgresStore {
40    pub fn new(pool: PgPool) -> Self {
41        Self { pool }
42    }
43
44    pub fn pool(&self) -> &PgPool {
45        &self.pool
46    }
47
48    /// Check if an event has already been processed (for idempotency).
49    ///
50    /// This is used by the Kafka backend to prevent duplicate processing
51    /// when Kafka redelivers messages.
52    pub async fn is_processed(&self, event_id: Uuid) -> Result<bool> {
53        let result: bool = sqlx::query_scalar(
54            "SELECT EXISTS(SELECT 1 FROM seesaw_processed WHERE event_id = $1)"
55        )
56        .bind(event_id)
57        .fetch_one(&self.pool)
58        .await?;
59
60        Ok(result)
61    }
62}
63
64impl Clone for PostgresStore {
65    fn clone(&self) -> Self {
66        Self {
67            pool: self.pool.clone(),
68        }
69    }
70}
71
72#[derive(FromRow)]
73struct EventRow {
74    id: i64,
75    event_id: Uuid,
76    parent_id: Option<Uuid>,
77    correlation_id: Uuid,
78    event_type: String,
79    payload: serde_json::Value,
80    hops: i32,
81    retry_count: i32,
82    batch_id: Option<Uuid>,
83    batch_index: Option<i32>,
84    batch_size: Option<i32>,
85    created_at: DateTime<Utc>,
86}
87
88#[derive(FromRow)]
89struct HandlerRow {
90    event_id: Uuid,
91    handler_id: String,
92    correlation_id: Uuid,
93    event_type: String,
94    event_payload: serde_json::Value,
95    parent_event_id: Option<Uuid>,
96    batch_id: Option<Uuid>,
97    batch_index: Option<i32>,
98    batch_size: Option<i32>,
99    execute_at: DateTime<Utc>,
100    timeout_seconds: i32,
101    max_attempts: i32,
102    priority: i32,
103    attempts: i32,
104    join_window_timeout_seconds: Option<i32>,
105}
106
107#[derive(FromRow)]
108struct ParentEventRow {
109    hops: i32,
110    created_at: DateTime<Utc>,
111}
112
113#[derive(FromRow)]
114struct DlqSourceEventRow {
115    correlation_id: Uuid,
116    event_type: String,
117    payload: serde_json::Value,
118    batch_id: Option<Uuid>,
119    batch_index: Option<i32>,
120    batch_size: Option<i32>,
121    hops: i32,
122    created_at: DateTime<Utc>,
123}
124
125#[derive(FromRow)]
126struct WorkflowEventRow {
127    id: i64,
128    event_id: Uuid,
129    correlation_id: Uuid,
130    event_type: String,
131    payload: serde_json::Value,
132    created_at: DateTime<Utc>,
133}
134
135#[async_trait]
136impl Store for PostgresStore {
137    async fn publish(&self, event: QueuedEvent) -> Result<()> {
138        let mut tx = self.pool.begin().await?;
139
140        // Use the non-partitioned ledger as the idempotency guard. This keeps
141        // webhook/process_with_id dedupe stable even when created_at differs.
142        let inserted: Option<Uuid> = sqlx::query_scalar(
143            "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
144             VALUES ($1, $2, $3)
145             ON CONFLICT (event_id) DO NOTHING
146             RETURNING event_id",
147        )
148        .bind(event.event_id)
149        .bind(event.correlation_id)
150        .bind(event.created_at)
151        .fetch_optional(&mut *tx)
152        .await?;
153
154        if inserted.is_none() {
155            tx.commit().await?;
156            return Ok(());
157        }
158
159        sqlx::query(
160            "INSERT INTO seesaw_events (
161                event_id, parent_id, correlation_id, event_type, payload, hops, retry_count,
162                batch_id, batch_index, batch_size, created_at
163             )
164             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
165        )
166        .bind(event.event_id)
167        .bind(event.parent_id)
168        .bind(event.correlation_id)
169        .bind(event.event_type)
170        .bind(event.payload)
171        .bind(event.hops)
172        .bind(event.retry_count)
173        .bind(event.batch_id)
174        .bind(event.batch_index)
175        .bind(event.batch_size)
176        .bind(event.created_at)
177        .execute(&mut *tx)
178        .await?;
179
180        tx.commit().await?;
181
182        Ok(())
183    }
184
185    async fn poll_next(&self) -> Result<Option<QueuedEvent>> {
186        let row: Option<EventRow> = sqlx::query_as(
187            "WITH next_event AS (
188                SELECT e.id
189                FROM seesaw_events e
190                WHERE e.processed_at IS NULL
191                  AND (e.locked_until IS NULL OR e.locked_until < NOW())
192                  AND NOT EXISTS (
193                    SELECT 1
194                    FROM seesaw_events older
195                    WHERE older.correlation_id = e.correlation_id
196                      AND older.processed_at IS NULL
197                      AND (
198                        older.created_at < e.created_at
199                        OR (older.created_at = e.created_at AND older.id < e.id)
200                      )
201                  )
202                ORDER BY e.created_at ASC, e.id ASC
203                LIMIT 1
204                FOR UPDATE SKIP LOCKED
205            )
206            UPDATE seesaw_events e
207            SET locked_until = NOW() + ($1 * INTERVAL '1 second')
208            FROM next_event
209            WHERE e.id = next_event.id
210            RETURNING e.id, e.event_id, e.parent_id, e.correlation_id, e.event_type, e.payload,
211                      e.hops, e.retry_count, e.batch_id, e.batch_index, e.batch_size, e.created_at",
212        )
213        .bind(EVENT_CLAIM_SECONDS)
214        .fetch_optional(&self.pool)
215        .await?;
216
217        Ok(row.map(|r| QueuedEvent {
218            id: r.id,
219            event_id: r.event_id,
220            parent_id: r.parent_id,
221            correlation_id: r.correlation_id,
222            event_type: r.event_type,
223            payload: r.payload,
224            hops: r.hops,
225            retry_count: r.retry_count,
226            batch_id: r.batch_id,
227            batch_index: r.batch_index,
228            batch_size: r.batch_size,
229            created_at: r.created_at,
230        }))
231    }
232
233    async fn ack(&self, id: i64) -> Result<()> {
234        sqlx::query(
235            "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
236        )
237        .bind(id)
238        .execute(&self.pool)
239        .await?;
240        Ok(())
241    }
242
243    async fn nack(&self, id: i64, retry_after_secs: u64) -> Result<()> {
244        let locked_until = Utc::now() + Duration::seconds(retry_after_secs as i64);
245        sqlx::query(
246            "UPDATE seesaw_events
247             SET retry_count = retry_count + 1,
248                 locked_until = $2
249             WHERE id = $1",
250        )
251        .bind(id)
252        .bind(locked_until)
253        .execute(&self.pool)
254        .await?;
255        Ok(())
256    }
257
258    async fn commit_event_processing(&self, commit: EventProcessingCommit) -> Result<()> {
259        let EventProcessingCommit {
260            event_row_id,
261            event_id,
262            correlation_id,
263            event_type,
264            event_payload,
265            queued_effect_intents,
266            inline_effect_failures,
267            emitted_events,
268        } = commit;
269
270        let mut tx = self.pool.begin().await?;
271
272        for intent in queued_effect_intents {
273            sqlx::query(
274                "INSERT INTO seesaw_handler_executions (
275                    event_id, handler_id, correlation_id, status,
276                    event_type, event_payload, parent_event_id,
277                    batch_id, batch_index, batch_size,
278                    execute_at, timeout_seconds, max_attempts, priority,
279                    join_window_timeout_seconds
280                 )
281                 VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
282                 ON CONFLICT (event_id, handler_id) DO NOTHING",
283            )
284            .bind(event_id)
285            .bind(intent.handler_id)
286            .bind(correlation_id)
287            .bind(&event_type)
288            .bind(&event_payload)
289            .bind(intent.parent_event_id)
290            .bind(intent.batch_id)
291            .bind(intent.batch_index)
292            .bind(intent.batch_size)
293            .bind(intent.execute_at)
294            .bind(intent.timeout_seconds)
295            .bind(intent.max_attempts)
296            .bind(intent.priority)
297            .bind(intent.join_window_timeout_seconds)
298            .execute(&mut *tx)
299            .await?;
300        }
301
302        for event in emitted_events {
303            let inserted: Option<Uuid> = sqlx::query_scalar(
304                "INSERT INTO seesaw_processed (event_id, correlation_id, created_at)
305                 VALUES ($1, $2, $3)
306                 ON CONFLICT (event_id) DO NOTHING
307                 RETURNING event_id",
308            )
309            .bind(event.event_id)
310            .bind(event.correlation_id)
311            .bind(event.created_at)
312            .fetch_optional(&mut *tx)
313            .await?;
314
315            if inserted.is_none() {
316                continue;
317            }
318
319            sqlx::query(
320                "INSERT INTO seesaw_events (
321                    event_id, parent_id, correlation_id, event_type, payload, hops, retry_count,
322                    batch_id, batch_index, batch_size, created_at
323                 )
324                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
325            )
326            .bind(event.event_id)
327            .bind(event.parent_id)
328            .bind(event.correlation_id)
329            .bind(event.event_type)
330            .bind(event.payload)
331            .bind(event.hops)
332            .bind(event.retry_count)
333            .bind(event.batch_id)
334            .bind(event.batch_index)
335            .bind(event.batch_size)
336            .bind(event.created_at)
337            .execute(&mut *tx)
338            .await?;
339        }
340
341        let source_event: Option<DlqSourceEventRow> = if inline_effect_failures.is_empty() {
342            None
343        } else {
344            sqlx::query_as(
345                "SELECT correlation_id, event_type, payload, batch_id, batch_index, batch_size, hops, created_at
346                 FROM seesaw_events
347                 WHERE event_id = $1
348                 ORDER BY created_at ASC, id ASC
349                 LIMIT 1",
350            )
351            .bind(event_id)
352            .fetch_optional(&mut *tx)
353            .await?
354        };
355
356        for failure in inline_effect_failures {
357            sqlx::query(
358                "INSERT INTO seesaw_dlq (
359                    event_id, handler_id, correlation_id, error, event_type, event_payload, reason, attempts
360                 )
361                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
362            )
363            .bind(event_id)
364            .bind(&failure.handler_id)
365            .bind(correlation_id)
366            .bind(&failure.error)
367            .bind(&event_type)
368            .bind(&event_payload)
369            .bind(&failure.reason)
370            .bind(failure.attempts)
371            .execute(&mut *tx)
372            .await?;
373
374            if let Some(source) = source_event.as_ref() {
375                if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
376                    (source.batch_id, source.batch_index, source.batch_size)
377                {
378                    let synthetic_event_id = Uuid::new_v5(
379                        &NAMESPACE_SEESAW,
380                        format!("{}-{}-dlq-terminal", event_id, failure.handler_id).as_bytes(),
381                    );
382                    let synthetic_created_at = emitted_event_created_at(source.created_at);
383
384                    sqlx::query(
385                        "INSERT INTO seesaw_events (
386                            event_id, parent_id, correlation_id, event_type, payload, hops,
387                            batch_id, batch_index, batch_size, created_at
388                         )
389                         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
390                         ON CONFLICT (event_id, created_at) DO NOTHING",
391                    )
392                    .bind(synthetic_event_id)
393                    .bind(Some(event_id))
394                    .bind(correlation_id)
395                    .bind(&event_type)
396                    .bind(&event_payload)
397                    .bind(source.hops + 1)
398                    .bind(Some(batch_id))
399                    .bind(Some(batch_index))
400                    .bind(Some(batch_size))
401                    .bind(synthetic_created_at)
402                    .execute(&mut *tx)
403                    .await?;
404                }
405            }
406        }
407
408        let ack_result = sqlx::query(
409            "UPDATE seesaw_events SET processed_at = NOW(), locked_until = NULL WHERE id = $1",
410        )
411        .bind(event_row_id)
412        .execute(&mut *tx)
413        .await?;
414        if ack_result.rows_affected() != 1 {
415            anyhow::bail!(
416                "atomic event commit failed to ack source event row {}",
417                event_row_id
418            );
419        }
420
421        tx.commit().await?;
422        Ok(())
423    }
424
425    async fn insert_effect_intent(
426        &self,
427        event_id: Uuid,
428        handler_id: String,
429        correlation_id: Uuid,
430        event_type: String,
431        event_payload: serde_json::Value,
432        parent_event_id: Option<Uuid>,
433        batch_id: Option<Uuid>,
434        batch_index: Option<i32>,
435        batch_size: Option<i32>,
436        execute_at: DateTime<Utc>,
437        timeout_seconds: i32,
438        max_attempts: i32,
439        priority: i32,
440        join_window_timeout_seconds: Option<i32>,
441    ) -> Result<()> {
442        sqlx::query(
443            "INSERT INTO seesaw_handler_executions (
444                event_id, handler_id, correlation_id, status,
445                event_type, event_payload, parent_event_id,
446                batch_id, batch_index, batch_size,
447                execute_at, timeout_seconds, max_attempts, priority,
448                join_window_timeout_seconds
449             )
450             VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
451             ON CONFLICT (event_id, handler_id) DO NOTHING",
452        )
453        .bind(event_id)
454        .bind(handler_id)
455        .bind(correlation_id)
456        .bind(event_type)
457        .bind(event_payload)
458        .bind(parent_event_id)
459        .bind(batch_id)
460        .bind(batch_index)
461        .bind(batch_size)
462        .bind(execute_at)
463        .bind(timeout_seconds)
464        .bind(max_attempts)
465        .bind(priority)
466        .bind(join_window_timeout_seconds)
467        .execute(&self.pool)
468        .await?;
469
470        Ok(())
471    }
472
473    async fn poll_next_effect(&self) -> Result<Option<QueuedHandlerExecution>> {
474        let row: Option<HandlerRow> = sqlx::query_as(
475            "WITH next_effect AS (
476                SELECT event_id, handler_id
477                FROM seesaw_handler_executions
478                WHERE (
479                    status = 'pending'
480                    OR (status = 'failed' AND attempts < max_attempts)
481                )
482                  AND execute_at <= NOW()
483                ORDER BY priority ASC, execute_at ASC, event_id ASC, handler_id ASC
484                LIMIT 1
485                FOR UPDATE SKIP LOCKED
486            )
487            UPDATE seesaw_handler_executions e
488            SET status = 'executing',
489                claimed_at = NOW(),
490                last_attempted_at = NOW(),
491                attempts = e.attempts + 1
492            FROM next_effect
493            WHERE e.event_id = next_effect.event_id
494              AND e.handler_id = next_effect.handler_id
495            RETURNING
496                e.event_id, e.handler_id, e.correlation_id, e.event_type, e.event_payload, e.parent_event_id,
497                e.batch_id, e.batch_index, e.batch_size,
498                e.execute_at, e.timeout_seconds, e.max_attempts, e.priority, e.attempts,
499                e.join_window_timeout_seconds",
500        )
501        .fetch_optional(&self.pool)
502        .await?;
503
504        if let Some(r) = row {
505            Ok(Some(QueuedHandlerExecution {
506                event_id: r.event_id,
507                handler_id: r.handler_id,
508                correlation_id: r.correlation_id,
509                event_type: r.event_type,
510                event_payload: r.event_payload,
511                parent_event_id: r.parent_event_id,
512                batch_id: r.batch_id,
513                batch_index: r.batch_index,
514                batch_size: r.batch_size,
515                execute_at: r.execute_at,
516                timeout_seconds: r.timeout_seconds,
517                max_attempts: r.max_attempts,
518                priority: r.priority,
519                attempts: r.attempts,
520                join_window_timeout_seconds: r.join_window_timeout_seconds,
521            }))
522        } else {
523            Ok(None)
524        }
525    }
526
527    async fn complete_effect(
528        &self,
529        event_id: Uuid,
530        handler_id: String,
531        result: serde_json::Value,
532    ) -> Result<()> {
533        sqlx::query(
534            "UPDATE seesaw_handler_executions
535             SET status = 'completed',
536                 result = $3,
537                 completed_at = NOW()
538             WHERE event_id = $1 AND handler_id = $2",
539        )
540        .bind(event_id)
541        .bind(handler_id)
542        .bind(result)
543        .execute(&self.pool)
544        .await?;
545
546        Ok(())
547    }
548
549    async fn complete_effect_with_events(
550        &self,
551        event_id: Uuid,
552        handler_id: String,
553        result: serde_json::Value,
554        emitted_events: Vec<EmittedEvent>,
555    ) -> Result<()> {
556        // Get correlation_id and hops for emitted events
557        let effect: HandlerRow = sqlx::query_as(
558            "SELECT event_id, handler_id, correlation_id, event_type, event_payload, parent_event_id,
559                    batch_id, batch_index, batch_size,
560                    execute_at, timeout_seconds, max_attempts, priority, attempts, join_window_timeout_seconds
561             FROM seesaw_handler_executions
562             WHERE event_id = $1 AND handler_id = $2",
563        )
564        .bind(event_id)
565        .bind(&handler_id)
566        .fetch_one(&self.pool)
567        .await?;
568
569        // Read parent metadata for deterministic hop increment and timestamp.
570        let parent: ParentEventRow = sqlx::query_as(
571            "SELECT hops, created_at
572             FROM seesaw_events
573             WHERE event_id = $1
574             ORDER BY created_at ASC, id ASC
575             LIMIT 1",
576        )
577        .bind(event_id)
578        .fetch_one(&self.pool)
579        .await?;
580
581        // Start transaction for atomicity
582        let mut tx = self.pool.begin().await?;
583
584        // Insert emitted events with deterministic IDs
585        for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
586            // Generate deterministic event_id from
587            // hash(parent_event_id, handler_id, event_type, emitted_index)
588            let deterministic_id = Uuid::new_v5(
589                &NAMESPACE_SEESAW,
590                format!(
591                    "{}-{}-{}-{}",
592                    event_id, handler_id, emitted.event_type, emitted_index
593                )
594                .as_bytes(),
595            );
596
597            // Deterministic timestamp keeps retries idempotent while staying in
598            // the same partition day as the parent event.
599            let deterministic_timestamp = emitted_event_created_at(parent.created_at);
600
601            // Insert event (idempotent via ON CONFLICT on (event_id, created_at))
602            sqlx::query(
603                "INSERT INTO seesaw_events (
604                    event_id, parent_id, correlation_id, event_type, payload, hops,
605                    batch_id, batch_index, batch_size, created_at
606                 )
607                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
608                 ON CONFLICT (event_id, created_at) DO NOTHING",
609            )
610            .bind(deterministic_id)
611            .bind(Some(event_id))
612            .bind(effect.correlation_id)
613            .bind(&emitted.event_type)
614            .bind(emitted.payload)
615            .bind(parent.hops + 1)
616            .bind(emitted.batch_id)
617            .bind(emitted.batch_index)
618            .bind(emitted.batch_size)
619            .bind(deterministic_timestamp)
620            .execute(&mut *tx)
621            .await?;
622        }
623
624        // Mark effect as completed (same transaction)
625        sqlx::query(
626            "UPDATE seesaw_handler_executions
627             SET status = 'completed',
628                 result = $3,
629                 completed_at = NOW()
630             WHERE event_id = $1 AND handler_id = $2",
631        )
632        .bind(event_id)
633        .bind(handler_id)
634        .bind(result)
635        .execute(&mut *tx)
636        .await?;
637
638        // Commit transaction - both succeed or both fail
639        tx.commit().await?;
640
641        Ok(())
642    }
643
644    async fn fail_effect(
645        &self,
646        event_id: Uuid,
647        handler_id: String,
648        error: String,
649        attempts: i32,
650    ) -> Result<()> {
651        let retry_at = Utc::now() + Duration::seconds(handler_retry_delay_seconds(attempts));
652        sqlx::query(
653            "UPDATE seesaw_handler_executions
654             SET status = 'failed',
655                 error = $3,
656                 execute_at = $5,
657                 claimed_at = NULL
658             WHERE event_id = $1 AND handler_id = $2 AND attempts >= $4",
659        )
660        .bind(event_id)
661        .bind(handler_id)
662        .bind(error)
663        .bind(attempts)
664        .bind(retry_at)
665        .execute(&self.pool)
666        .await?;
667
668        Ok(())
669    }
670
671    async fn dlq_effect(
672        &self,
673        event_id: Uuid,
674        handler_id: String,
675        error: String,
676        reason: String,
677        attempts: i32,
678    ) -> Result<()> {
679        self.dlq_effect_with_events(event_id, handler_id, error, reason, attempts, Vec::new())
680            .await
681    }
682
683    async fn dlq_effect_with_events(
684        &self,
685        event_id: Uuid,
686        handler_id: String,
687        error: String,
688        reason: String,
689        attempts: i32,
690        emitted_events: Vec<EmittedEvent>,
691    ) -> Result<()> {
692        // Effect details may be missing for inline/synthetic failures. Fall back to
693        // parent event data so DLQ writes still succeed.
694        let effect = sqlx::query_as::<_, HandlerRow>(
695            "SELECT event_id, handler_id, correlation_id, event_type, event_payload, parent_event_id,
696                    batch_id, batch_index, batch_size,
697                    execute_at, timeout_seconds, max_attempts, priority, attempts, join_window_timeout_seconds
698             FROM seesaw_handler_executions
699             WHERE event_id = $1 AND handler_id = $2",
700        )
701        .bind(event_id)
702        .bind(&handler_id)
703        .fetch_optional(&self.pool)
704        .await?;
705
706        let source_event = sqlx::query_as::<_, DlqSourceEventRow>(
707            "SELECT correlation_id, event_type, payload, batch_id, batch_index, batch_size, hops, created_at
708             FROM seesaw_events
709             WHERE event_id = $1
710             ORDER BY created_at ASC, id ASC
711             LIMIT 1",
712        )
713        .bind(event_id)
714        .fetch_optional(&self.pool)
715        .await?;
716
717        let (
718            source_correlation_id,
719            source_event_type,
720            source_event_payload,
721            source_batch_id,
722            source_batch_index,
723            source_batch_size,
724        ) = if let Some(effect) = effect {
725            (
726                effect.correlation_id,
727                effect.event_type,
728                effect.event_payload,
729                effect.batch_id,
730                effect.batch_index,
731                effect.batch_size,
732            )
733        } else if let Some(source) = source_event.as_ref() {
734            (
735                source.correlation_id,
736                source.event_type.clone(),
737                source.payload.clone(),
738                source.batch_id,
739                source.batch_index,
740                source.batch_size,
741            )
742        } else {
743            anyhow::bail!(
744                "cannot DLQ unknown effect {} for missing event {}",
745                handler_id,
746                event_id
747            );
748        };
749
750        let mut tx = self.pool.begin().await?;
751
752        // Insert into DLQ
753        sqlx::query(
754            "INSERT INTO seesaw_dlq (
755                event_id, handler_id, correlation_id, error, event_type, event_payload, reason, attempts
756             )
757             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
758        )
759        .bind(event_id)
760        .bind(&handler_id)
761        .bind(source_correlation_id)
762        .bind(&error)
763        .bind(&source_event_type)
764        .bind(&source_event_payload)
765        .bind(&reason)
766        .bind(attempts)
767        .execute(&mut *tx)
768        .await?;
769        let preserve_batch_terminal = reason != "accumulate_timeout";
770
771        let synthetic_created_at = source_event
772            .as_ref()
773            .map(|row| emitted_event_created_at(row.created_at))
774            .unwrap_or_else(Utc::now);
775        let synthetic_hops = source_event.as_ref().map(|row| row.hops + 1).unwrap_or(0);
776
777        if emitted_events.is_empty() {
778            if !preserve_batch_terminal {
779                sqlx::query(
780                    "DELETE FROM seesaw_handler_executions WHERE event_id = $1 AND handler_id = $2",
781                )
782                .bind(event_id)
783                .bind(&handler_id)
784                .execute(&mut *tx)
785                .await?;
786                tx.commit().await?;
787                return Ok(());
788            }
789            if let (Some(batch_id), Some(batch_index), Some(batch_size)) =
790                (source_batch_id, source_batch_index, source_batch_size)
791            {
792                let synthetic_event_id = Uuid::new_v5(
793                    &NAMESPACE_SEESAW,
794                    format!("{}-{}-dlq-terminal", event_id, handler_id).as_bytes(),
795                );
796
797                sqlx::query(
798                    "INSERT INTO seesaw_events (
799                        event_id, parent_id, correlation_id, event_type, payload, hops,
800                        batch_id, batch_index, batch_size, created_at
801                     )
802                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
803                     ON CONFLICT (event_id, created_at) DO NOTHING",
804                )
805                .bind(synthetic_event_id)
806                .bind(Some(event_id))
807                .bind(source_correlation_id)
808                .bind(&source_event_type)
809                .bind(&source_event_payload)
810                .bind(synthetic_hops)
811                .bind(Some(batch_id))
812                .bind(Some(batch_index))
813                .bind(Some(batch_size))
814                .bind(synthetic_created_at)
815                .execute(&mut *tx)
816                .await?;
817            }
818        } else {
819            for (emitted_index, emitted) in emitted_events.into_iter().enumerate() {
820                let synthetic_event_id = Uuid::new_v5(
821                    &NAMESPACE_SEESAW,
822                    format!(
823                        "{}-{}-dlq-terminal-{}-{}",
824                        event_id, handler_id, emitted.event_type, emitted_index
825                    )
826                    .as_bytes(),
827                );
828
829                sqlx::query(
830                    "INSERT INTO seesaw_events (
831                        event_id, parent_id, correlation_id, event_type, payload, hops,
832                        batch_id, batch_index, batch_size, created_at
833                     )
834                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
835                     ON CONFLICT (event_id, created_at) DO NOTHING",
836                )
837                .bind(synthetic_event_id)
838                .bind(Some(event_id))
839                .bind(source_correlation_id)
840                .bind(&emitted.event_type)
841                .bind(emitted.payload)
842                .bind(synthetic_hops)
843                .bind(emitted.batch_id.or(if preserve_batch_terminal {
844                    source_batch_id
845                } else {
846                    None
847                }))
848                .bind(emitted.batch_index.or(if preserve_batch_terminal {
849                    source_batch_index
850                } else {
851                    None
852                }))
853                .bind(emitted.batch_size.or(if preserve_batch_terminal {
854                    source_batch_size
855                } else {
856                    None
857                }))
858                .bind(synthetic_created_at)
859                .execute(&mut *tx)
860                .await?;
861            }
862        }
863
864        // Delete from executions table
865        sqlx::query(
866            "DELETE FROM seesaw_handler_executions WHERE event_id = $1 AND handler_id = $2",
867        )
868        .bind(event_id)
869        .bind(&handler_id)
870        .execute(&mut *tx)
871        .await?;
872
873        tx.commit().await?;
874
875        Ok(())
876    }
877
878    async fn subscribe_workflow_events(
879        &self,
880        correlation_id: Uuid,
881    ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
882        use sqlx::postgres::PgListener;
883
884        let channel = format!("seesaw_workflow_{}", correlation_id);
885        const PAGE_SIZE: i64 = 256;
886        const CATCH_UP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
887
888        // Establish a cursor at subscribe time so callers only receive new
889        // workflow events emitted after this subscription starts.
890        let initial_cursor: Option<(DateTime<Utc>, i64)> = sqlx::query_as(
891            "SELECT created_at, id
892             FROM seesaw_events
893             WHERE correlation_id = $1
894             ORDER BY created_at DESC, id DESC
895             LIMIT 1",
896        )
897        .bind(correlation_id)
898        .fetch_optional(&self.pool)
899        .await?;
900
901        // Create a new listener connection
902        let mut listener = PgListener::connect_with(&self.pool).await?;
903        listener.listen(&channel).await?;
904
905        let pool = self.pool.clone();
906        let (tx, rx) = futures::channel::mpsc::unbounded::<seesaw_core::WorkflowEvent>();
907
908        tokio::spawn(async move {
909            let mut cursor = initial_cursor;
910            let mut drain_pending = true;
911
912            loop {
913                if !drain_pending {
914                    match tokio::time::timeout(CATCH_UP_INTERVAL, listener.recv()).await {
915                        Ok(Ok(_notification)) => {}
916                        Ok(Err(error)) => {
917                            tracing::warn!(
918                                "workflow listener recv failed for {}: {}",
919                                correlation_id,
920                                error
921                            );
922                            return;
923                        }
924                        Err(_) => {}
925                    }
926                }
927                drain_pending = false;
928
929                loop {
930                    let rows_result: std::result::Result<Vec<WorkflowEventRow>, sqlx::Error> =
931                        if let Some((created_at, id)) = cursor {
932                            sqlx::query_as(
933                                "SELECT id, event_id, correlation_id, event_type, payload, created_at
934                                 FROM seesaw_events
935                                 WHERE correlation_id = $1
936                                   AND (
937                                        created_at > $2
938                                        OR (created_at = $2 AND id > $3)
939                                   )
940                                 ORDER BY created_at ASC, id ASC
941                                 LIMIT $4",
942                            )
943                            .bind(correlation_id)
944                            .bind(created_at)
945                            .bind(id)
946                            .bind(PAGE_SIZE)
947                            .fetch_all(&pool)
948                            .await
949                        } else {
950                            sqlx::query_as(
951                                "SELECT id, event_id, correlation_id, event_type, payload, created_at
952                                 FROM seesaw_events
953                                 WHERE correlation_id = $1
954                                 ORDER BY created_at ASC, id ASC
955                                 LIMIT $2",
956                            )
957                            .bind(correlation_id)
958                            .bind(PAGE_SIZE)
959                            .fetch_all(&pool)
960                            .await
961                        };
962
963                    let rows = match rows_result {
964                        Ok(rows) => rows,
965                        Err(error) => {
966                            tracing::warn!(
967                                "workflow event query failed for {}: {}",
968                                correlation_id,
969                                error
970                            );
971                            return;
972                        }
973                    };
974
975                    if rows.is_empty() {
976                        break;
977                    }
978
979                    for row in rows {
980                        cursor = Some((row.created_at, row.id));
981                        if tx
982                            .unbounded_send(seesaw_core::WorkflowEvent {
983                                event_id: row.event_id,
984                                correlation_id: row.correlation_id,
985                                event_type: row.event_type,
986                                payload: row.payload,
987                            })
988                            .is_err()
989                        {
990                            return;
991                        }
992                    }
993                }
994            }
995        });
996
997        Ok(Box::new(rx))
998    }
999
1000    async fn get_workflow_status(
1001        &self,
1002        correlation_id: Uuid,
1003    ) -> Result<seesaw_core::WorkflowStatus> {
1004        let pending_effects = sqlx::query_as::<_, (i64,)>(
1005            "SELECT COUNT(*) FROM seesaw_handler_executions
1006             WHERE correlation_id = $1 AND status IN ('pending', 'executing', 'failed')",
1007        )
1008        .bind(correlation_id)
1009        .fetch_one(&self.pool)
1010        .await?
1011        .0;
1012
1013        let last_event = sqlx::query_as::<_, (String,)>(
1014            "SELECT event_type FROM seesaw_events
1015             WHERE correlation_id = $1
1016             ORDER BY created_at DESC, id DESC
1017             LIMIT 1",
1018        )
1019        .bind(correlation_id)
1020        .fetch_optional(&self.pool)
1021        .await?
1022        .map(|r| r.0);
1023
1024        Ok(seesaw_core::WorkflowStatus {
1025            correlation_id,
1026            pending_effects,
1027            is_settled: pending_effects == 0,
1028            last_event,
1029        })
1030    }
1031
1032    async fn join_same_batch_append_and_maybe_claim(
1033        &self,
1034        join_handler_id: String,
1035        correlation_id: Uuid,
1036        source_event_id: Uuid,
1037        source_event_type: String,
1038        source_payload: serde_json::Value,
1039        source_created_at: DateTime<Utc>,
1040        batch_id: Uuid,
1041        batch_index: i32,
1042        batch_size: i32,
1043        join_window_timeout_seconds: Option<i32>,
1044    ) -> Result<Option<Vec<JoinEntry>>> {
1045        let mut tx = self.pool.begin().await?;
1046
1047        sqlx::query(
1048            "INSERT INTO seesaw_join_entries (
1049                join_handler_id, correlation_id, source_event_id, source_event_type, source_payload,
1050                source_created_at, batch_id, batch_index, batch_size
1051             )
1052             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1053             ON CONFLICT (join_handler_id, correlation_id, source_event_id) DO NOTHING",
1054        )
1055        .bind(&join_handler_id)
1056        .bind(correlation_id)
1057        .bind(source_event_id)
1058        .bind(&source_event_type)
1059        .bind(source_payload)
1060        .bind(source_created_at)
1061        .bind(batch_id)
1062        .bind(batch_index)
1063        .bind(batch_size)
1064        .execute(&mut *tx)
1065        .await?;
1066
1067        sqlx::query(
1068            "INSERT INTO seesaw_join_windows (
1069                join_handler_id, correlation_id, mode, batch_id, target_count, status,
1070                window_timeout_seconds, expires_at
1071             )
1072             VALUES (
1073                $1,
1074                $2,
1075                'same_batch',
1076                $3,
1077                $4,
1078                'open',
1079                $5,
1080                CASE
1081                    WHEN $5 IS NULL THEN NULL
1082                    ELSE NOW() + ($5::int * INTERVAL '1 second')
1083                END
1084             )
1085             ON CONFLICT (join_handler_id, correlation_id, batch_id) DO NOTHING",
1086        )
1087        .bind(&join_handler_id)
1088        .bind(correlation_id)
1089        .bind(batch_id)
1090        .bind(batch_size)
1091        .bind(join_window_timeout_seconds)
1092        .execute(&mut *tx)
1093        .await?;
1094
1095        sqlx::query(
1096            "UPDATE seesaw_join_windows
1097             SET target_count = $4,
1098                 updated_at = NOW()
1099             WHERE join_handler_id = $1
1100               AND correlation_id = $2
1101               AND batch_id = $3
1102               AND target_count <> $4",
1103        )
1104        .bind(&join_handler_id)
1105        .bind(correlation_id)
1106        .bind(batch_id)
1107        .bind(batch_size)
1108        .execute(&mut *tx)
1109        .await?;
1110
1111        let claimed: Option<(String,)> = sqlx::query_as(
1112            "UPDATE seesaw_join_windows w
1113             SET status = 'processing',
1114                 sealed_at = COALESCE(w.sealed_at, NOW()),
1115                 processing_started_at = NOW(),
1116                 updated_at = NOW(),
1117                 last_error = NULL
1118             WHERE w.join_handler_id = $1
1119               AND w.correlation_id = $2
1120               AND w.batch_id = $3
1121               AND w.status = 'open'
1122               AND (w.expires_at IS NULL OR w.expires_at > NOW())
1123               AND (
1124                    SELECT COUNT(*)::int
1125                    FROM seesaw_join_entries e
1126                    WHERE e.join_handler_id = w.join_handler_id
1127                      AND e.correlation_id = w.correlation_id
1128                      AND e.batch_id = w.batch_id
1129               ) >= w.target_count
1130             RETURNING w.join_handler_id",
1131        )
1132        .bind(&join_handler_id)
1133        .bind(correlation_id)
1134        .bind(batch_id)
1135        .fetch_optional(&mut *tx)
1136        .await?;
1137
1138        if claimed.is_none() {
1139            tx.commit().await?;
1140            return Ok(None);
1141        }
1142
1143        let rows = sqlx::query_as::<_, (Uuid, String, serde_json::Value, Uuid, i32, i32, DateTime<Utc>)>(
1144            "SELECT source_event_id, source_event_type, source_payload, batch_id, batch_index, batch_size, source_created_at
1145             FROM seesaw_join_entries
1146             WHERE join_handler_id = $1
1147               AND correlation_id = $2
1148               AND batch_id = $3
1149             ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
1150        )
1151        .bind(&join_handler_id)
1152        .bind(correlation_id)
1153        .bind(batch_id)
1154        .fetch_all(&mut *tx)
1155        .await?;
1156
1157        let entries = rows
1158            .into_iter()
1159            .map(
1160                |(
1161                    source_event_id,
1162                    event_type,
1163                    payload,
1164                    batch_id,
1165                    batch_index,
1166                    batch_size,
1167                    created_at,
1168                )| JoinEntry {
1169                    source_event_id,
1170                    event_type,
1171                    payload,
1172                    batch_id,
1173                    batch_index,
1174                    batch_size,
1175                    created_at,
1176                },
1177            )
1178            .collect::<Vec<_>>();
1179
1180        tx.commit().await?;
1181        Ok(Some(entries))
1182    }
1183
1184    async fn join_same_batch_complete(
1185        &self,
1186        join_handler_id: String,
1187        correlation_id: Uuid,
1188        batch_id: Uuid,
1189    ) -> Result<()> {
1190        let mut tx = self.pool.begin().await?;
1191
1192        sqlx::query(
1193            "UPDATE seesaw_join_windows
1194             SET status = 'completed',
1195                 completed_at = NOW(),
1196                 updated_at = NOW()
1197             WHERE join_handler_id = $1
1198               AND correlation_id = $2
1199               AND batch_id = $3",
1200        )
1201        .bind(&join_handler_id)
1202        .bind(correlation_id)
1203        .bind(batch_id)
1204        .execute(&mut *tx)
1205        .await?;
1206
1207        sqlx::query(
1208            "DELETE FROM seesaw_join_entries
1209             WHERE join_handler_id = $1
1210               AND correlation_id = $2
1211               AND batch_id = $3",
1212        )
1213        .bind(&join_handler_id)
1214        .bind(correlation_id)
1215        .bind(batch_id)
1216        .execute(&mut *tx)
1217        .await?;
1218
1219        sqlx::query(
1220            "DELETE FROM seesaw_join_windows
1221             WHERE join_handler_id = $1
1222               AND correlation_id = $2
1223               AND batch_id = $3",
1224        )
1225        .bind(&join_handler_id)
1226        .bind(correlation_id)
1227        .bind(batch_id)
1228        .execute(&mut *tx)
1229        .await?;
1230
1231        tx.commit().await?;
1232        Ok(())
1233    }
1234
1235    async fn join_same_batch_release(
1236        &self,
1237        join_handler_id: String,
1238        correlation_id: Uuid,
1239        batch_id: Uuid,
1240        error: String,
1241    ) -> Result<()> {
1242        sqlx::query(
1243            "UPDATE seesaw_join_windows
1244             SET status = 'open',
1245                 processing_started_at = NULL,
1246                 last_error = $4,
1247                 updated_at = NOW()
1248             WHERE join_handler_id = $1
1249               AND correlation_id = $2
1250               AND batch_id = $3
1251               AND status = 'processing'",
1252        )
1253        .bind(&join_handler_id)
1254        .bind(correlation_id)
1255        .bind(batch_id)
1256        .bind(error)
1257        .execute(&self.pool)
1258        .await?;
1259
1260        Ok(())
1261    }
1262
1263    async fn expire_same_batch_windows(
1264        &self,
1265        now: DateTime<Utc>,
1266    ) -> Result<Vec<ExpiredJoinWindow>> {
1267        let mut tx = self.pool.begin().await?;
1268
1269        let windows = sqlx::query_as::<_, (String, Uuid, Uuid)>(
1270            "SELECT join_handler_id, correlation_id, batch_id
1271             FROM seesaw_join_windows
1272             WHERE status = 'open'
1273               AND expires_at IS NOT NULL
1274               AND expires_at <= $1
1275             FOR UPDATE SKIP LOCKED",
1276        )
1277        .bind(now)
1278        .fetch_all(&mut *tx)
1279        .await?;
1280
1281        let mut expired = Vec::with_capacity(windows.len());
1282        for (join_handler_id, correlation_id, batch_id) in windows {
1283            let source_event_ids = sqlx::query_as::<_, (Uuid,)>(
1284                "SELECT source_event_id
1285                 FROM seesaw_join_entries
1286                 WHERE join_handler_id = $1
1287                   AND correlation_id = $2
1288                   AND batch_id = $3
1289                 ORDER BY batch_index ASC, source_created_at ASC, source_event_id ASC",
1290            )
1291            .bind(&join_handler_id)
1292            .bind(correlation_id)
1293            .bind(batch_id)
1294            .fetch_all(&mut *tx)
1295            .await?
1296            .into_iter()
1297            .map(|row| row.0)
1298            .collect::<Vec<_>>();
1299
1300            sqlx::query(
1301                "DELETE FROM seesaw_join_entries
1302                 WHERE join_handler_id = $1
1303                   AND correlation_id = $2
1304                   AND batch_id = $3",
1305            )
1306            .bind(&join_handler_id)
1307            .bind(correlation_id)
1308            .bind(batch_id)
1309            .execute(&mut *tx)
1310            .await?;
1311
1312            sqlx::query(
1313                "DELETE FROM seesaw_join_windows
1314                 WHERE join_handler_id = $1
1315                   AND correlation_id = $2
1316                   AND batch_id = $3",
1317            )
1318            .bind(&join_handler_id)
1319            .bind(correlation_id)
1320            .bind(batch_id)
1321            .execute(&mut *tx)
1322            .await?;
1323
1324            expired.push(ExpiredJoinWindow {
1325                join_handler_id,
1326                correlation_id,
1327                batch_id,
1328                source_event_ids,
1329            });
1330        }
1331
1332        tx.commit().await?;
1333        Ok(expired)
1334    }
1335}
1336
1337#[derive(FromRow)]
1338struct StreamRow {
1339    seq: i64,
1340    stream_type: String,
1341    correlation_id: Uuid,
1342    event_id: Option<Uuid>,
1343    effect_event_id: Option<Uuid>,
1344    handler_id: Option<String>,
1345    status: Option<String>,
1346    error: Option<String>,
1347    payload: Option<serde_json::Value>,
1348    created_at: DateTime<Utc>,
1349}
1350
1351#[derive(FromRow)]
1352struct EffectLogRow {
1353    correlation_id: Uuid,
1354    event_id: Uuid,
1355    handler_id: String,
1356    status: String,
1357    attempts: i32,
1358    event_type: String,
1359    result: Option<serde_json::Value>,
1360    error: Option<String>,
1361    created_at: DateTime<Utc>,
1362    execute_at: DateTime<Utc>,
1363    claimed_at: Option<DateTime<Utc>>,
1364    last_attempted_at: Option<DateTime<Utc>>,
1365    completed_at: Option<DateTime<Utc>>,
1366}
1367
1368#[derive(FromRow)]
1369struct DeadLetterRow {
1370    correlation_id: Uuid,
1371    event_id: Uuid,
1372    handler_id: String,
1373    event_type: String,
1374    event_payload: serde_json::Value,
1375    error: String,
1376    reason: String,
1377    attempts: i32,
1378    failed_at: DateTime<Utc>,
1379    resolved_at: Option<DateTime<Utc>>,
1380}
1381
1382#[derive(FromRow)]
1383struct FailedWorkflowRow {
1384    correlation_id: Uuid,
1385    failed_effects: i64,
1386    active_effects: i64,
1387    dead_letters: i64,
1388    last_failed_at: Option<DateTime<Utc>>,
1389    last_error: Option<String>,
1390}
1391
1392#[async_trait]
1393impl InsightStore for PostgresStore {
1394    async fn subscribe_events(
1395        &self,
1396    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
1397        use futures::stream::StreamExt;
1398        use sqlx::postgres::PgListener;
1399
1400        // Create a new listener connection
1401        let mut listener = PgListener::connect_with(&self.pool).await?;
1402        listener.listen("seesaw_stream").await?;
1403
1404        // Convert listener into a stream of InsightEvent
1405        let pool = self.pool.clone();
1406        let stream = listener.into_stream().filter_map(move |result| {
1407            let pool = pool.clone();
1408            Box::pin(async move {
1409                match result {
1410                    Ok(_notification) => {
1411                        // Fetch latest entry from stream table
1412                        // (notification payload is just correlation_id for wake-up)
1413                        if let Ok(row) = sqlx::query_as::<_, StreamRow>(
1414                            "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1415                                    handler_id, status, error, payload, created_at
1416                             FROM seesaw_stream
1417                             ORDER BY seq DESC
1418                             LIMIT 1",
1419                        )
1420                        .fetch_one(&pool)
1421                        .await
1422                        {
1423                            Some(stream_row_to_insight_event(row))
1424                        } else {
1425                            None
1426                        }
1427                    }
1428                    Err(_) => None,
1429                }
1430            })
1431        });
1432
1433        Ok(Box::new(stream))
1434    }
1435
1436    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<seesaw_core::WorkflowTree> {
1437        // Get all events for this correlation
1438        let events = sqlx::query_as::<_, EventRow>(
1439            "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
1440                    batch_id, batch_index, batch_size, created_at
1441             FROM seesaw_events
1442             WHERE correlation_id = $1
1443             ORDER BY created_at ASC",
1444        )
1445        .bind(correlation_id)
1446        .fetch_all(&self.pool)
1447        .await?;
1448
1449        // Get all effects for this correlation
1450        let effects = sqlx::query_as::<_, EffectTreeRow>(
1451            "SELECT event_id, handler_id, status, result, error, attempts, created_at,
1452                    batch_id, batch_index, batch_size
1453             FROM seesaw_handler_executions
1454             WHERE correlation_id = $1
1455             ORDER BY created_at ASC",
1456        )
1457        .bind(correlation_id)
1458        .fetch_all(&self.pool)
1459        .await?;
1460
1461        // Build tree structure
1462        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
1463        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
1464
1465        Ok(seesaw_core::WorkflowTree {
1466            correlation_id,
1467            roots,
1468            event_count: events.len(),
1469            effect_count: effects.len(),
1470        })
1471    }
1472
1473    async fn get_stats(&self) -> Result<seesaw_core::InsightStats> {
1474        let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
1475            .fetch_one(&self.pool)
1476            .await?
1477            .0;
1478
1479        let active_effects = sqlx::query_as::<_, (i64,)>(
1480            "SELECT COUNT(*) FROM seesaw_handler_executions
1481             WHERE status IN ('pending', 'executing')",
1482        )
1483        .fetch_one(&self.pool)
1484        .await?
1485        .0;
1486
1487        let completed_effects = sqlx::query_as::<_, (i64,)>(
1488            "SELECT COUNT(*) FROM seesaw_handler_executions WHERE status = 'completed'",
1489        )
1490        .fetch_one(&self.pool)
1491        .await?
1492        .0;
1493
1494        let failed_effects = sqlx::query_as::<_, (i64,)>(
1495            "SELECT COUNT(*) FROM seesaw_handler_executions WHERE status = 'failed'",
1496        )
1497        .fetch_one(&self.pool)
1498        .await?
1499        .0;
1500
1501        Ok(seesaw_core::InsightStats {
1502            total_events,
1503            active_effects,
1504            completed_effects,
1505            failed_effects,
1506        })
1507    }
1508
1509    async fn get_recent_events(
1510        &self,
1511        cursor: Option<i64>,
1512        limit: usize,
1513    ) -> Result<Vec<InsightEvent>> {
1514        let rows = if let Some(cursor_seq) = cursor {
1515            sqlx::query_as::<_, StreamRow>(
1516                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1517                        handler_id, status, error, payload, created_at
1518                 FROM seesaw_stream
1519                 WHERE seq > $1
1520                 ORDER BY seq ASC
1521                 LIMIT $2",
1522            )
1523            .bind(cursor_seq)
1524            .bind(limit as i64)
1525            .fetch_all(&self.pool)
1526            .await?
1527        } else {
1528            sqlx::query_as::<_, StreamRow>(
1529                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
1530                        handler_id, status, error, payload, created_at
1531                 FROM seesaw_stream
1532                 ORDER BY seq DESC
1533                 LIMIT $1",
1534            )
1535            .bind(limit as i64)
1536            .fetch_all(&self.pool)
1537            .await?
1538        };
1539
1540        Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
1541    }
1542
1543    async fn get_effect_logs(
1544        &self,
1545        correlation_id: Option<Uuid>,
1546        limit: usize,
1547    ) -> Result<Vec<EffectExecutionLog>> {
1548        let rows = sqlx::query_as::<_, EffectLogRow>(
1549            "SELECT
1550                correlation_id,
1551                event_id,
1552                handler_id,
1553                status,
1554                attempts,
1555                event_type,
1556                result,
1557                error,
1558                created_at,
1559                execute_at,
1560                claimed_at,
1561                last_attempted_at,
1562                completed_at
1563             FROM seesaw_handler_executions
1564             WHERE ($1::uuid IS NULL OR correlation_id = $1)
1565             ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
1566             LIMIT $2",
1567        )
1568        .bind(correlation_id)
1569        .bind(limit as i64)
1570        .fetch_all(&self.pool)
1571        .await?;
1572
1573        Ok(rows
1574            .into_iter()
1575            .map(|row| {
1576                let started_at = row.claimed_at.or(row.last_attempted_at);
1577                let duration_ms = match (started_at, row.completed_at) {
1578                    (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
1579                    _ => None,
1580                };
1581
1582                EffectExecutionLog {
1583                    correlation_id: row.correlation_id,
1584                    event_id: row.event_id,
1585                    handler_id: row.handler_id,
1586                    status: row.status,
1587                    attempts: row.attempts,
1588                    event_type: Some(row.event_type),
1589                    result: row.result,
1590                    error: row.error,
1591                    created_at: row.created_at,
1592                    execute_at: Some(row.execute_at),
1593                    claimed_at: row.claimed_at,
1594                    last_attempted_at: row.last_attempted_at,
1595                    completed_at: row.completed_at,
1596                    duration_ms,
1597                }
1598            })
1599            .collect())
1600    }
1601
1602    async fn get_dead_letters(
1603        &self,
1604        unresolved_only: bool,
1605        limit: usize,
1606    ) -> Result<Vec<DeadLetterEntry>> {
1607        let rows = sqlx::query_as::<_, DeadLetterRow>(
1608            "SELECT
1609                correlation_id,
1610                event_id,
1611                handler_id,
1612                event_type,
1613                event_payload,
1614                error,
1615                reason,
1616                attempts,
1617                failed_at,
1618                resolved_at
1619             FROM seesaw_dlq
1620             WHERE (NOT $1 OR resolved_at IS NULL)
1621             ORDER BY failed_at DESC
1622             LIMIT $2",
1623        )
1624        .bind(unresolved_only)
1625        .bind(limit as i64)
1626        .fetch_all(&self.pool)
1627        .await?;
1628
1629        Ok(rows
1630            .into_iter()
1631            .map(|row| DeadLetterEntry {
1632                correlation_id: row.correlation_id,
1633                event_id: row.event_id,
1634                handler_id: row.handler_id,
1635                event_type: row.event_type,
1636                event_payload: row.event_payload,
1637                error: row.error,
1638                reason: row.reason,
1639                attempts: row.attempts,
1640                failed_at: row.failed_at,
1641                resolved_at: row.resolved_at,
1642            })
1643            .collect())
1644    }
1645
1646    async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
1647        let rows = sqlx::query_as::<_, FailedWorkflowRow>(
1648            "WITH effect_agg AS (
1649                SELECT
1650                    correlation_id,
1651                    COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
1652                    COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
1653                    MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
1654                    MAX(error) FILTER (WHERE status = 'failed') AS last_error
1655                FROM seesaw_handler_executions
1656                GROUP BY correlation_id
1657             ),
1658             dlq_agg AS (
1659                SELECT
1660                    correlation_id,
1661                    COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
1662                    MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
1663                    MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
1664                FROM seesaw_dlq
1665                GROUP BY correlation_id
1666             )
1667             SELECT
1668                COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
1669                COALESCE(e.failed_effects, 0) AS failed_effects,
1670                COALESCE(e.active_effects, 0) AS active_effects,
1671                COALESCE(d.dead_letters, 0) AS dead_letters,
1672                GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
1673                COALESCE(d.last_dlq_error, e.last_error) AS last_error
1674             FROM effect_agg e
1675             FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
1676             WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
1677             ORDER BY last_failed_at DESC NULLS LAST
1678             LIMIT $1",
1679        )
1680        .bind(limit as i64)
1681        .fetch_all(&self.pool)
1682        .await?;
1683
1684        Ok(rows
1685            .into_iter()
1686            .map(|row| FailedWorkflow {
1687                correlation_id: row.correlation_id,
1688                failed_effects: row.failed_effects,
1689                active_effects: row.active_effects,
1690                dead_letters: row.dead_letters,
1691                last_failed_at: row.last_failed_at,
1692                last_error: row.last_error,
1693            })
1694            .collect())
1695    }
1696}
1697
1698#[derive(FromRow)]
1699struct EffectTreeRow {
1700    event_id: Uuid,
1701    handler_id: String,
1702    status: String,
1703    result: Option<serde_json::Value>,
1704    error: Option<String>,
1705    attempts: i32,
1706    created_at: DateTime<Utc>,
1707    batch_id: Option<Uuid>,
1708    batch_index: Option<i32>,
1709    batch_size: Option<i32>,
1710}
1711
1712fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
1713    let stream_type = match row.stream_type.as_str() {
1714        "event_dispatched" => StreamType::EventDispatched,
1715        "effect_started" => StreamType::EffectStarted,
1716        "effect_completed" => StreamType::EffectCompleted,
1717        "effect_failed" => StreamType::EffectFailed,
1718        _ => StreamType::EventDispatched, // Default fallback
1719    };
1720
1721    // Extract event_type from payload if it's an event
1722    let event_type = if stream_type == StreamType::EventDispatched {
1723        row.payload
1724            .as_ref()
1725            .and_then(|p| p.get("event_type"))
1726            .and_then(|v| v.as_str())
1727            .map(|s| s.to_string())
1728    } else {
1729        None
1730    };
1731
1732    InsightEvent {
1733        seq: row.seq,
1734        stream_type,
1735        correlation_id: row.correlation_id,
1736        event_id: row.event_id,
1737        effect_event_id: row.effect_event_id,
1738        handler_id: row.handler_id,
1739        event_type,
1740        status: row.status,
1741        error: row.error,
1742        payload: row.payload,
1743        created_at: row.created_at,
1744    }
1745}
1746
1747fn build_event_tree(
1748    events: &[EventRow],
1749    effects: &[EffectTreeRow],
1750    parent_id: Option<Uuid>,
1751    event_ids: &HashSet<Uuid>,
1752    is_root_pass: bool,
1753) -> Vec<EventNode> {
1754    events
1755        .iter()
1756        .filter(|event| {
1757            if is_root_pass {
1758                event.parent_id.is_none()
1759                    || event
1760                        .parent_id
1761                        .map(|parent| !event_ids.contains(&parent))
1762                        .unwrap_or(false)
1763            } else {
1764                event.parent_id == parent_id
1765            }
1766        })
1767        .map(|event| {
1768            // Get effects for this event
1769            let event_effects: Vec<HandlerNode> = effects
1770                .iter()
1771                .filter(|eff| eff.event_id == event.event_id)
1772                .map(|eff| HandlerNode {
1773                    handler_id: eff.handler_id.clone(),
1774                    event_id: eff.event_id,
1775                    status: eff.status.clone(),
1776                    result: eff.result.clone(),
1777                    error: eff.error.clone(),
1778                    attempts: eff.attempts,
1779                    created_at: eff.created_at,
1780                    batch_id: eff.batch_id,
1781                    batch_index: eff.batch_index,
1782                    batch_size: eff.batch_size,
1783                })
1784                .collect();
1785
1786            // Recursively build children
1787            let children =
1788                build_event_tree(events, effects, Some(event.event_id), event_ids, false);
1789
1790            EventNode {
1791                event_id: event.event_id,
1792                event_type: event.event_type.clone(),
1793                payload: event.payload.clone(),
1794                created_at: event.created_at,
1795                batch_id: event.batch_id,
1796                batch_index: event.batch_index,
1797                batch_size: event.batch_size,
1798                children,
1799                effects: event_effects,
1800            }
1801        })
1802        .collect()
1803}
1804
1805// ============================================================================
1806// PostgresBackend - v0.11.0 Backend trait implementation
1807// ============================================================================
1808
1809use seesaw_core::backend::{Backend, BackendServeConfig, DispatchedEvent};
1810use seesaw_core::backend::capability::*;
1811use seesaw_core::backend::job_executor::JobExecutor;
1812use seesaw_core::DirectRunner;
1813use std::sync::Arc;
1814use tokio_util::sync::CancellationToken;
1815
1816static DIRECT_RUNNER: DirectRunner = DirectRunner;
1817
1818/// PostgreSQL implementation of Backend trait (v0.11.0).
1819///
1820/// Wraps PostgresStore internally and implements the new Backend interface.
1821pub struct PostgresBackend {
1822    store: PostgresStore,
1823}
1824
1825impl PostgresBackend {
1826    /// Create new PostgresBackend from connection pool.
1827    pub fn new(pool: PgPool) -> Self {
1828        Self {
1829            store: PostgresStore::new(pool),
1830        }
1831    }
1832
1833    /// Get reference to underlying pool.
1834    pub fn pool(&self) -> &PgPool {
1835        self.store.pool()
1836    }
1837}
1838
1839impl Clone for PostgresBackend {
1840    fn clone(&self) -> Self {
1841        Self {
1842            store: self.store.clone(),
1843        }
1844    }
1845}
1846
1847#[async_trait]
1848impl Backend for PostgresBackend {
1849    fn name(&self) -> &'static str {
1850        "postgres"
1851    }
1852
1853    async fn publish(&self, event: DispatchedEvent) -> Result<()> {
1854        let queued_event = QueuedEvent {
1855            id: 0, // Will be assigned by database
1856            event_id: event.event_id,
1857            parent_id: event.parent_id,
1858            correlation_id: event.correlation_id,
1859            event_type: event.event_type,
1860            payload: event.payload,
1861            hops: event.hops,
1862            retry_count: event.retry_count,
1863            batch_id: event.batch_id,
1864            batch_index: event.batch_index,
1865            batch_size: event.batch_size,
1866            created_at: event.created_at,
1867        };
1868
1869        self.store.publish(queued_event).await
1870    }
1871
1872    async fn serve<D>(
1873        &self,
1874        executor: Arc<JobExecutor<D>>,
1875        config: BackendServeConfig,
1876        shutdown: CancellationToken,
1877    ) -> Result<()>
1878    where
1879        D: Send + Sync + 'static,
1880    {
1881        use seesaw_core::backend::job_executor::HandlerStatus;
1882        use tokio::time::sleep;
1883
1884        let mut handles = Vec::new();
1885
1886        // Spawn event workers
1887        for i in 0..config.event_workers {
1888            let store = self.store.clone();
1889            let executor = executor.clone();
1890            let config = config.event_worker.clone();
1891            let shutdown = shutdown.clone();
1892
1893            let handle: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
1894                tracing::info!("Event worker {} started", i);
1895
1896                while !shutdown.is_cancelled() {
1897                    // Poll next event
1898                    match store.poll_next().await {
1899                        Ok(Some(event)) => {
1900                            // Execute using JobExecutor
1901                            match executor.execute_event(&event, &config, &DIRECT_RUNNER).await {
1902                                Ok(commit) => {
1903                                    // Convert JobCommit to Store's EventProcessingCommit
1904                                    let store_commit = EventProcessingCommit {
1905                                        event_row_id: commit.event_row_id,
1906                                        event_id: commit.event_id,
1907                                        correlation_id: commit.correlation_id,
1908                                        event_type: commit.event_type,
1909                                        event_payload: commit.event_payload,
1910                                        queued_effect_intents: commit.queued_effect_intents,
1911                                        inline_effect_failures: commit.inline_effect_failures.into_iter().map(|f| {
1912                                            seesaw_core::InlineHandlerFailure {
1913                                                handler_id: f.handler_id,
1914                                                error: f.error,
1915                                                reason: f.reason,
1916                                                attempts: f.attempts,
1917                                            }
1918                                        }).collect(),
1919                                        emitted_events: commit.emitted_events,
1920                                    };
1921
1922                                    if let Err(e) = store.commit_event_processing(store_commit).await {
1923                                        tracing::error!("Failed to commit event processing: {}", e);
1924                                    }
1925                                }
1926                                Err(e) => {
1927                                    tracing::warn!("Event processing failed: {}, nacking event", e);
1928                                    let _ = store.nack(event.id, 1).await;
1929                                }
1930                            }
1931                        }
1932                        Ok(None) => {
1933                            sleep(config.poll_interval).await;
1934                        }
1935                        Err(e) => {
1936                            tracing::error!("Error polling events: {}", e);
1937                            sleep(config.poll_interval).await;
1938                        }
1939                    }
1940                }
1941
1942                tracing::info!("Event worker {} stopped", i);
1943                Ok(())
1944            });
1945
1946            handles.push(handle);
1947        }
1948
1949        // Spawn handler workers
1950        for i in 0..config.handler_workers {
1951            let store = self.store.clone();
1952            let executor = executor.clone();
1953            let config = config.handler_worker.clone();
1954            let shutdown = shutdown.clone();
1955
1956            let handle = tokio::spawn(async move {
1957                tracing::info!("Handler worker {} started", i);
1958
1959                while !shutdown.is_cancelled() {
1960                    // Poll next handler execution
1961                    match store.poll_next_effect().await {
1962                        Ok(Some(execution)) => {
1963                            // Execute using JobExecutor
1964                            match executor.execute_handler(execution.clone(), &config, &DIRECT_RUNNER).await {
1965                                Ok(result) => {
1966                                    match result.status {
1967                                        HandlerStatus::Success => {
1968                                            if result.emitted_events.is_empty() {
1969                                                if let Err(e) = store.complete_effect(
1970                                                    execution.event_id,
1971                                                    execution.handler_id,
1972                                                    result.result,
1973                                                ).await {
1974                                                    tracing::error!("Failed to complete effect: {}", e);
1975                                                }
1976                                            } else {
1977                                                if let Err(e) = store.complete_effect_with_events(
1978                                                    execution.event_id,
1979                                                    execution.handler_id,
1980                                                    result.result,
1981                                                    result.emitted_events,
1982                                                ).await {
1983                                                    tracing::error!("Failed to complete effect with events: {}", e);
1984                                                }
1985                                            }
1986                                        }
1987                                        HandlerStatus::Failed { error, attempts } | HandlerStatus::Retry { error, attempts } => {
1988                                            if attempts >= execution.max_attempts {
1989                                                if let Err(e) = store.dlq_effect(
1990                                                    execution.event_id,
1991                                                    execution.handler_id,
1992                                                    error,
1993                                                    "max_retries_exceeded".to_string(),
1994                                                    attempts,
1995                                                ).await {
1996                                                    tracing::error!("Failed to DLQ effect: {}", e);
1997                                                }
1998                                            } else {
1999                                                if let Err(e) = store.fail_effect(
2000                                                    execution.event_id,
2001                                                    execution.handler_id,
2002                                                    error,
2003                                                    attempts,
2004                                                ).await {
2005                                                    tracing::error!("Failed to mark effect as failed: {}", e);
2006                                                }
2007                                            }
2008                                        }
2009                                        HandlerStatus::Timeout => {
2010                                            tracing::warn!("Handler timed out: {}", execution.handler_id);
2011                                            if execution.attempts >= execution.max_attempts {
2012                                                if let Err(e) = store.dlq_effect(
2013                                                    execution.event_id,
2014                                                    execution.handler_id,
2015                                                    "Handler execution timed out".to_string(),
2016                                                    "timeout".to_string(),
2017                                                    execution.attempts,
2018                                                ).await {
2019                                                    tracing::error!("Failed to DLQ timed out effect: {}", e);
2020                                                }
2021                                            } else {
2022                                                if let Err(e) = store.fail_effect(
2023                                                    execution.event_id,
2024                                                    execution.handler_id,
2025                                                    "Handler execution timed out".to_string(),
2026                                                    execution.attempts,
2027                                                ).await {
2028                                                    tracing::error!("Failed to mark timed out effect as failed: {}", e);
2029                                                }
2030                                            }
2031                                        }
2032                                        HandlerStatus::JoinWaiting => {
2033                                            // Complete with join_waiting status
2034                                            if let Err(e) = store.complete_effect(
2035                                                execution.event_id,
2036                                                execution.handler_id,
2037                                                result.result,
2038                                            ).await {
2039                                                tracing::error!("Failed to complete join_waiting effect: {}", e);
2040                                            }
2041                                        }
2042                                    }
2043                                }
2044                                Err(e) => {
2045                                    tracing::error!("Handler execution failed: {}", e);
2046                                    if execution.attempts >= execution.max_attempts {
2047                                        if let Err(e) = store.dlq_effect(
2048                                            execution.event_id,
2049                                            execution.handler_id,
2050                                            format!("Handler execution error: {}", e),
2051                                            "execution_error".to_string(),
2052                                            execution.attempts,
2053                                        ).await {
2054                                            tracing::error!("Failed to DLQ effect: {}", e);
2055                                        }
2056                                    } else {
2057                                        if let Err(e) = store.fail_effect(
2058                                            execution.event_id,
2059                                            execution.handler_id,
2060                                            format!("Handler execution error: {}", e),
2061                                            execution.attempts,
2062                                        ).await {
2063                                            tracing::error!("Failed to mark effect as failed: {}", e);
2064                                        }
2065                                    }
2066                                }
2067                            }
2068                        }
2069                        Ok(None) => {
2070                            sleep(config.poll_interval).await;
2071                        }
2072                        Err(e) => {
2073                            tracing::error!("Error polling effects: {}", e);
2074                            sleep(config.poll_interval).await;
2075                        }
2076                    }
2077                }
2078
2079                tracing::info!("Handler worker {} stopped", i);
2080                Ok(())
2081            });
2082
2083            handles.push(handle);
2084        }
2085
2086        // Wait for shutdown signal
2087        shutdown.cancelled().await;
2088        tracing::info!("Shutdown signal received, draining workers...");
2089
2090        // Wait for workers to complete (with timeout)
2091        let drain_timeout = config.graceful_shutdown_timeout;
2092        tokio::time::timeout(drain_timeout, async {
2093            for handle in handles {
2094                let _ = handle.await;
2095            }
2096        })
2097        .await
2098        .ok();
2099
2100        tracing::info!("PostgresBackend shutdown complete");
2101        Ok(())
2102    }
2103}
2104
2105// Implement capability traits
2106
2107#[async_trait]
2108impl WorkflowStatusBackend for PostgresBackend {
2109    async fn get_workflow_status(&self, correlation_id: Uuid) -> Result<seesaw_core::WorkflowStatus> {
2110        self.store.get_workflow_status(correlation_id).await
2111    }
2112}
2113
2114#[async_trait]
2115impl WorkflowSubscriptionBackend for PostgresBackend {
2116    async fn subscribe_workflow_events(
2117        &self,
2118        correlation_id: Uuid,
2119    ) -> Result<Box<dyn futures::Stream<Item = seesaw_core::WorkflowEvent> + Send + Unpin>> {
2120        self.store.subscribe_workflow_events(correlation_id).await
2121    }
2122}
2123
2124#[async_trait]
2125impl DeadLetterQueueBackend for PostgresBackend {
2126    async fn list_dlq(&self, _filters: DlqFilters) -> Result<Vec<DeadLetter>> {
2127        // TODO: Implement with actual filters
2128        // For now, return empty list as placeholder
2129        Ok(Vec::new())
2130    }
2131
2132    async fn retry_dlq(&self, _event_id: Uuid, _handler_id: String) -> Result<()> {
2133        // TODO: Implement DLQ retry from seesaw_dead_letters table
2134        Ok(())
2135    }
2136}
2137
2138#[async_trait]
2139impl InsightBackend for PostgresBackend {
2140    async fn get_workflow_tree(
2141        &self,
2142        correlation_id: Uuid,
2143    ) -> Result<seesaw_core::backend::capability::WorkflowTree> {
2144        // TODO: Implement proper workflow tree conversion
2145        // For now, return a minimal tree
2146        Ok(seesaw_core::backend::capability::WorkflowTree {
2147            correlation_id,
2148            nodes: Vec::new(),
2149        })
2150    }
2151
2152    async fn get_insight_stats(&self) -> Result<seesaw_core::backend::capability::InsightStats> {
2153        // TODO: Query actual stats from database
2154        // For now, return zeros
2155        Ok(seesaw_core::backend::capability::InsightStats {
2156            total_workflows: 0,
2157            active_workflows: 0,
2158            settled_workflows: 0,
2159            total_events: 0,
2160            total_handlers_executed: 0,
2161            total_dlq_entries: 0,
2162        })
2163    }
2164}
2165
2166// ============================================================================
2167// DeadLetterQueue - PostgreSQL implementation
2168// ============================================================================
2169
2170/// PostgreSQL-specific row for seesaw_dead_letter_queue table
2171#[derive(Debug, Clone, sqlx::FromRow)]
2172struct DlqRow {
2173    id: Uuid,
2174    event_id: Uuid,
2175    handler_id: String,
2176    intent_id: Uuid,
2177    error_message: String,
2178    error_details: Option<Value>,
2179    retry_count: i32,
2180    first_failed_at: DateTime<Utc>,
2181    last_failed_at: DateTime<Utc>,
2182    event_payload: Value,
2183    status: String,
2184    retry_attempts: i32,
2185    last_retry_at: Option<DateTime<Utc>>,
2186    resolved_at: Option<DateTime<Utc>>,
2187    resolution_note: Option<String>,
2188    created_at: DateTime<Utc>,
2189}
2190
2191impl From<DlqRow> for CoreDeadLetter {
2192    fn from(row: DlqRow) -> Self {
2193        let status = match row.status.as_str() {
2194            "open" => DlqStatus::Open,
2195            "retrying" => DlqStatus::Retrying,
2196            "replayed" => DlqStatus::Replayed,
2197            "resolved" => DlqStatus::Resolved,
2198            _ => DlqStatus::Open, // Default fallback
2199        };
2200
2201        CoreDeadLetter {
2202            id: row.id,
2203            event_id: row.event_id,
2204            handler_id: row.handler_id,
2205            intent_id: row.intent_id,
2206            error_message: row.error_message,
2207            error_details: row.error_details,
2208            retry_count: row.retry_count,
2209            first_failed_at: row.first_failed_at,
2210            last_failed_at: row.last_failed_at,
2211            event_payload: row.event_payload,
2212            status,
2213            retry_attempts: row.retry_attempts,
2214            last_retry_at: row.last_retry_at,
2215            resolved_at: row.resolved_at,
2216            resolution_note: row.resolution_note,
2217            created_at: row.created_at,
2218        }
2219    }
2220}
2221
2222/// Dead Letter Queue operations for PostgreSQL
2223#[derive(Clone)]
2224pub struct DeadLetterQueue {
2225    pool: PgPool,
2226}
2227
2228impl DeadLetterQueue {
2229    pub fn new(pool: PgPool) -> Self {
2230        Self { pool }
2231    }
2232
2233    /// Insert a handler failure into the DLQ
2234    pub async fn insert(
2235        &self,
2236        event_id: Uuid,
2237        handler_id: &str,
2238        intent_id: Uuid,
2239        error_message: &str,
2240        error_details: Option<Value>,
2241        retry_count: i32,
2242        first_failed_at: DateTime<Utc>,
2243        last_failed_at: DateTime<Utc>,
2244        event_payload: Value,
2245    ) -> Result<Uuid> {
2246        let id = sqlx::query_scalar::<_, Uuid>(
2247            "INSERT INTO seesaw_dead_letter_queue
2248             (event_id, handler_id, intent_id, error_message, error_details,
2249              retry_count, first_failed_at, last_failed_at, event_payload, status)
2250             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'open')
2251             ON CONFLICT (intent_id) DO UPDATE SET
2252               error_message = EXCLUDED.error_message,
2253               error_details = EXCLUDED.error_details,
2254               retry_count = EXCLUDED.retry_count,
2255               last_failed_at = EXCLUDED.last_failed_at,
2256               status = 'open'
2257             RETURNING id",
2258        )
2259        .bind(event_id)
2260        .bind(handler_id)
2261        .bind(intent_id)
2262        .bind(error_message)
2263        .bind(error_details)
2264        .bind(retry_count)
2265        .bind(first_failed_at)
2266        .bind(last_failed_at)
2267        .bind(event_payload)
2268        .fetch_one(&self.pool)
2269        .await?;
2270
2271        Ok(id)
2272    }
2273
2274    /// List open or retrying DLQ entries
2275    pub async fn list(&self, limit: i64) -> Result<Vec<CoreDeadLetter>> {
2276        let entries = sqlx::query_as::<_, DlqRow>(
2277            "SELECT * FROM seesaw_dead_letter_queue
2278             WHERE status IN ('open', 'retrying')
2279             ORDER BY created_at DESC
2280             LIMIT $1",
2281        )
2282        .bind(limit)
2283        .fetch_all(&self.pool)
2284        .await?;
2285
2286        Ok(entries.into_iter().map(Into::into).collect())
2287    }
2288
2289    /// List DLQ entries for a specific handler
2290    pub async fn list_by_handler(&self, handler_id: &str, limit: i64) -> Result<Vec<CoreDeadLetter>> {
2291        let entries = sqlx::query_as::<_, DlqRow>(
2292            "SELECT * FROM seesaw_dead_letter_queue
2293             WHERE handler_id = $1 AND status IN ('open', 'retrying')
2294             ORDER BY created_at DESC
2295             LIMIT $2",
2296        )
2297        .bind(handler_id)
2298        .bind(limit)
2299        .fetch_all(&self.pool)
2300        .await?;
2301
2302        Ok(entries.into_iter().map(Into::into).collect())
2303    }
2304
2305    /// Get a single DLQ entry by ID
2306    pub async fn get(&self, dlq_id: Uuid) -> Result<CoreDeadLetter> {
2307        let entry =
2308            sqlx::query_as::<_, DlqRow>("SELECT * FROM seesaw_dead_letter_queue WHERE id = $1")
2309                .bind(dlq_id)
2310                .fetch_one(&self.pool)
2311                .await?;
2312
2313        Ok(entry.into())
2314    }
2315
2316    /// Mark a DLQ entry as retrying and return it (with lock)
2317    pub async fn start_retry(&self, dlq_id: Uuid) -> Result<CoreDeadLetter> {
2318        let mut tx = self.pool.begin().await?;
2319
2320        // Lock row so only one operator retries this entry at a time
2321        let entry = sqlx::query_as::<_, DlqRow>(
2322            "SELECT * FROM seesaw_dead_letter_queue
2323             WHERE id = $1
2324             FOR UPDATE",
2325        )
2326        .bind(dlq_id)
2327        .fetch_one(&mut *tx)
2328        .await?;
2329
2330        // Update status to retrying
2331        sqlx::query(
2332            "UPDATE seesaw_dead_letter_queue
2333             SET status = 'retrying',
2334                 retry_attempts = retry_attempts + 1,
2335                 last_retry_at = NOW()
2336             WHERE id = $1",
2337        )
2338        .bind(dlq_id)
2339        .execute(&mut *tx)
2340        .await?;
2341
2342        tx.commit().await?;
2343
2344        Ok(entry.into())
2345    }
2346
2347    /// Mark a DLQ entry as successfully replayed
2348    pub async fn mark_replayed(&self, dlq_id: Uuid, resolution_note: Option<&str>) -> Result<()> {
2349        sqlx::query(
2350            "UPDATE seesaw_dead_letter_queue
2351             SET status = 'replayed',
2352                 resolved_at = NOW(),
2353                 resolution_note = $2
2354             WHERE id = $1",
2355        )
2356        .bind(dlq_id)
2357        .bind(resolution_note)
2358        .execute(&self.pool)
2359        .await?;
2360
2361        Ok(())
2362    }
2363
2364    /// Mark a DLQ entry as failed (retry failed, back to open)
2365    pub async fn mark_retry_failed(
2366        &self,
2367        dlq_id: Uuid,
2368        error_message: &str,
2369        error_details: Option<Value>,
2370    ) -> Result<()> {
2371        sqlx::query(
2372            "UPDATE seesaw_dead_letter_queue
2373             SET status = 'open',
2374                 last_failed_at = NOW(),
2375                 error_message = $2,
2376                 error_details = $3
2377             WHERE id = $1",
2378        )
2379        .bind(dlq_id)
2380        .bind(error_message)
2381        .bind(error_details)
2382        .execute(&self.pool)
2383        .await?;
2384
2385        Ok(())
2386    }
2387
2388    /// Mark a DLQ entry as resolved (won't retry, manual fix)
2389    pub async fn mark_resolved(&self, dlq_id: Uuid, resolution_note: &str) -> Result<()> {
2390        sqlx::query(
2391            "UPDATE seesaw_dead_letter_queue
2392             SET status = 'resolved',
2393                 resolved_at = NOW(),
2394                 resolution_note = $2
2395             WHERE id = $1",
2396        )
2397        .bind(dlq_id)
2398        .bind(resolution_note)
2399        .execute(&self.pool)
2400        .await?;
2401
2402        Ok(())
2403    }
2404
2405    /// Get statistics about the DLQ
2406    pub async fn stats(&self) -> Result<DlqStats> {
2407        #[derive(sqlx::FromRow)]
2408        struct StatsRow {
2409            open: Option<i64>,
2410            retrying: Option<i64>,
2411            replayed: Option<i64>,
2412            resolved: Option<i64>,
2413            total: Option<i64>,
2414        }
2415
2416        let row: StatsRow = sqlx::query_as(
2417            "SELECT
2418                COUNT(*) FILTER (WHERE status = 'open') as open,
2419                COUNT(*) FILTER (WHERE status = 'retrying') as retrying,
2420                COUNT(*) FILTER (WHERE status = 'replayed') as replayed,
2421                COUNT(*) FILTER (WHERE status = 'resolved') as resolved,
2422                COUNT(*) as total
2423             FROM seesaw_dead_letter_queue",
2424        )
2425        .fetch_one(&self.pool)
2426        .await?;
2427
2428        Ok(DlqStats {
2429            open: row.open.unwrap_or(0) as usize,
2430            retrying: row.retrying.unwrap_or(0) as usize,
2431            replayed: row.replayed.unwrap_or(0) as usize,
2432            resolved: row.resolved.unwrap_or(0) as usize,
2433            total: row.total.unwrap_or(0) as usize,
2434        })
2435    }
2436}
2437
2438#[cfg(test)]
2439mod tests {
2440    use super::*;
2441    use chrono::{TimeZone, Timelike};
2442
2443    #[test]
2444    fn emitted_event_created_at_is_midnight_on_parent_day() {
2445        let parent = Utc
2446            .with_ymd_and_hms(2026, 2, 5, 18, 45, 12)
2447            .single()
2448            .expect("valid timestamp");
2449
2450        let emitted = emitted_event_created_at(parent);
2451
2452        assert_eq!(emitted.date_naive(), parent.date_naive());
2453        assert_eq!(emitted.hour(), 0);
2454        assert_eq!(emitted.minute(), 0);
2455        assert_eq!(emitted.second(), 0);
2456    }
2457
2458    #[test]
2459    fn emitted_event_created_at_is_deterministic_for_same_parent_day() {
2460        let first_parent = Utc
2461            .with_ymd_and_hms(2026, 2, 5, 0, 1, 2)
2462            .single()
2463            .expect("valid timestamp");
2464        let second_parent = Utc
2465            .with_ymd_and_hms(2026, 2, 5, 23, 59, 59)
2466            .single()
2467            .expect("valid timestamp");
2468
2469        let first_emitted = emitted_event_created_at(first_parent);
2470        let second_emitted = emitted_event_created_at(second_parent);
2471
2472        assert_eq!(first_emitted, second_emitted);
2473    }
2474
2475    #[test]
2476    fn handler_retry_delay_seconds_uses_exponential_backoff() {
2477        assert_eq!(handler_retry_delay_seconds(1), 1);
2478        assert_eq!(handler_retry_delay_seconds(2), 2);
2479        assert_eq!(handler_retry_delay_seconds(3), 4);
2480        assert_eq!(handler_retry_delay_seconds(4), 8);
2481    }
2482
2483    #[test]
2484    fn handler_retry_delay_seconds_is_capped() {
2485        assert_eq!(handler_retry_delay_seconds(9), 256);
2486        assert_eq!(handler_retry_delay_seconds(50), 256);
2487    }
2488
2489    #[test]
2490    fn build_event_tree_treats_orphan_parent_as_root() {
2491        let correlation_id = Uuid::new_v4();
2492        let event_id = Uuid::new_v4();
2493        let missing_parent = Uuid::new_v4();
2494        let now = Utc::now();
2495
2496        let events = vec![EventRow {
2497            id: 1,
2498            event_id,
2499            parent_id: Some(missing_parent),
2500            correlation_id,
2501            event_type: "OrphanEvent".to_string(),
2502            payload: serde_json::json!({"ok": true}),
2503            hops: 1,
2504            retry_count: 0,
2505            batch_id: None,
2506            batch_index: None,
2507            batch_size: None,
2508            created_at: now,
2509        }];
2510
2511        let effects: Vec<EffectTreeRow> = Vec::new();
2512        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
2513
2514        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
2515
2516        assert_eq!(roots.len(), 1);
2517        assert_eq!(roots[0].event_id, event_id);
2518    }
2519}