Skip to main content

seesaw_postgres/
lib.rs

1pub mod event_store;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use seesaw_core::insight::*;
7use sqlx::{FromRow, PgPool};
8use std::collections::HashSet;
9use uuid::Uuid;
10
11/// PostgreSQL store for Seesaw.
12///
13/// Provides InsightStore implementation for observability and the ES
14/// event store (via the `event_store` module).
15pub struct PostgresStore {
16    pool: PgPool,
17}
18
19impl PostgresStore {
20    pub fn new(pool: PgPool) -> Self {
21        Self { pool }
22    }
23
24    pub fn pool(&self) -> &PgPool {
25        &self.pool
26    }
27
28    /// Check if an event has already been processed (for idempotency).
29    pub async fn is_processed(&self, event_id: Uuid) -> Result<bool> {
30        let result: bool = sqlx::query_scalar(
31            "SELECT EXISTS(SELECT 1 FROM seesaw_processed WHERE event_id = $1)",
32        )
33        .bind(event_id)
34        .fetch_one(&self.pool)
35        .await?;
36
37        Ok(result)
38    }
39}
40
41impl Clone for PostgresStore {
42    fn clone(&self) -> Self {
43        Self {
44            pool: self.pool.clone(),
45        }
46    }
47}
48
49// ============================================================================
50// Row types for InsightStore queries
51// ============================================================================
52
53#[derive(FromRow)]
54struct EventRow {
55    #[allow(dead_code)]
56    id: i64,
57    event_id: Uuid,
58    parent_id: Option<Uuid>,
59    #[allow(dead_code)]
60    correlation_id: Uuid,
61    event_type: String,
62    payload: serde_json::Value,
63    #[allow(dead_code)]
64    hops: i32,
65    batch_id: Option<Uuid>,
66    batch_index: Option<i32>,
67    batch_size: Option<i32>,
68    created_at: DateTime<Utc>,
69}
70
71#[derive(FromRow)]
72struct EffectTreeRow {
73    event_id: Uuid,
74    handler_id: String,
75    status: String,
76    result: Option<serde_json::Value>,
77    error: Option<String>,
78    attempts: i32,
79    created_at: DateTime<Utc>,
80    batch_id: Option<Uuid>,
81    batch_index: Option<i32>,
82    batch_size: Option<i32>,
83}
84
85#[derive(FromRow)]
86struct StreamRow {
87    seq: i64,
88    stream_type: String,
89    correlation_id: Uuid,
90    event_id: Option<Uuid>,
91    effect_event_id: Option<Uuid>,
92    handler_id: Option<String>,
93    status: Option<String>,
94    error: Option<String>,
95    payload: Option<serde_json::Value>,
96    created_at: DateTime<Utc>,
97}
98
99#[derive(FromRow)]
100struct EffectLogRow {
101    correlation_id: Uuid,
102    event_id: Uuid,
103    handler_id: String,
104    status: String,
105    attempts: i32,
106    event_type: String,
107    result: Option<serde_json::Value>,
108    error: Option<String>,
109    created_at: DateTime<Utc>,
110    execute_at: DateTime<Utc>,
111    claimed_at: Option<DateTime<Utc>>,
112    last_attempted_at: Option<DateTime<Utc>>,
113    completed_at: Option<DateTime<Utc>>,
114}
115
116#[derive(FromRow)]
117struct DeadLetterRow {
118    correlation_id: Uuid,
119    event_id: Uuid,
120    handler_id: String,
121    event_type: String,
122    event_payload: serde_json::Value,
123    error: String,
124    reason: String,
125    attempts: i32,
126    failed_at: DateTime<Utc>,
127    resolved_at: Option<DateTime<Utc>>,
128}
129
130#[derive(FromRow)]
131struct FailedWorkflowRow {
132    correlation_id: Uuid,
133    failed_effects: i64,
134    active_effects: i64,
135    dead_letters: i64,
136    last_failed_at: Option<DateTime<Utc>>,
137    last_error: Option<String>,
138}
139
140// ============================================================================
141// InsightStore implementation
142// ============================================================================
143
144#[async_trait]
145impl InsightStore for PostgresStore {
146    async fn subscribe_events(
147        &self,
148    ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
149        use futures::stream::StreamExt;
150        use sqlx::postgres::PgListener;
151
152        // Create a new listener connection
153        let mut listener = PgListener::connect_with(&self.pool).await?;
154        listener.listen("seesaw_stream").await?;
155
156        // Convert listener into a stream of InsightEvent
157        let pool = self.pool.clone();
158        let stream = listener.into_stream().filter_map(move |result| {
159            let pool = pool.clone();
160            Box::pin(async move {
161                match result {
162                    Ok(_notification) => {
163                        // Fetch latest entry from stream table
164                        // (notification payload is just correlation_id for wake-up)
165                        if let Ok(row) = sqlx::query_as::<_, StreamRow>(
166                            "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
167                                    handler_id, status, error, payload, created_at
168                             FROM seesaw_stream
169                             ORDER BY seq DESC
170                             LIMIT 1",
171                        )
172                        .fetch_one(&pool)
173                        .await
174                        {
175                            Some(stream_row_to_insight_event(row))
176                        } else {
177                            None
178                        }
179                    }
180                    Err(_) => None,
181                }
182            })
183        });
184
185        Ok(Box::new(stream))
186    }
187
188    async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<seesaw_core::WorkflowTree> {
189        // Get all events for this correlation
190        let events = sqlx::query_as::<_, EventRow>(
191            "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
192                    batch_id, batch_index, batch_size, created_at
193             FROM seesaw_events
194             WHERE correlation_id = $1
195             ORDER BY created_at ASC",
196        )
197        .bind(correlation_id)
198        .fetch_all(&self.pool)
199        .await?;
200
201        // Get all effects for this correlation
202        let effects = sqlx::query_as::<_, EffectTreeRow>(
203            "SELECT event_id, handler_id, status, result, error, attempts, created_at,
204                    batch_id, batch_index, batch_size
205             FROM seesaw_handler_executions
206             WHERE correlation_id = $1
207             ORDER BY created_at ASC",
208        )
209        .bind(correlation_id)
210        .fetch_all(&self.pool)
211        .await?;
212
213        // Build tree structure
214        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
215        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
216
217        Ok(seesaw_core::WorkflowTree {
218            correlation_id,
219            roots,
220            event_count: events.len(),
221            effect_count: effects.len(),
222        })
223    }
224
225    async fn get_stats(&self) -> Result<seesaw_core::InsightStats> {
226        let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
227            .fetch_one(&self.pool)
228            .await?
229            .0;
230
231        let active_effects = sqlx::query_as::<_, (i64,)>(
232            "SELECT COUNT(*) FROM seesaw_handler_executions
233             WHERE status IN ('pending', 'executing')",
234        )
235        .fetch_one(&self.pool)
236        .await?
237        .0;
238
239        let completed_effects = sqlx::query_as::<_, (i64,)>(
240            "SELECT COUNT(*) FROM seesaw_handler_executions WHERE status = 'completed'",
241        )
242        .fetch_one(&self.pool)
243        .await?
244        .0;
245
246        let failed_effects = sqlx::query_as::<_, (i64,)>(
247            "SELECT COUNT(*) FROM seesaw_handler_executions WHERE status = 'failed'",
248        )
249        .fetch_one(&self.pool)
250        .await?
251        .0;
252
253        Ok(seesaw_core::InsightStats {
254            total_events,
255            active_effects,
256            completed_effects,
257            failed_effects,
258        })
259    }
260
261    async fn get_recent_events(
262        &self,
263        cursor: Option<i64>,
264        limit: usize,
265    ) -> Result<Vec<InsightEvent>> {
266        let rows = if let Some(cursor_seq) = cursor {
267            sqlx::query_as::<_, StreamRow>(
268                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
269                        handler_id, status, error, payload, created_at
270                 FROM seesaw_stream
271                 WHERE seq > $1
272                 ORDER BY seq ASC
273                 LIMIT $2",
274            )
275            .bind(cursor_seq)
276            .bind(limit as i64)
277            .fetch_all(&self.pool)
278            .await?
279        } else {
280            sqlx::query_as::<_, StreamRow>(
281                "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
282                        handler_id, status, error, payload, created_at
283                 FROM seesaw_stream
284                 ORDER BY seq DESC
285                 LIMIT $1",
286            )
287            .bind(limit as i64)
288            .fetch_all(&self.pool)
289            .await?
290        };
291
292        Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
293    }
294
295    async fn get_effect_logs(
296        &self,
297        correlation_id: Option<Uuid>,
298        limit: usize,
299    ) -> Result<Vec<EffectExecutionLog>> {
300        let rows = sqlx::query_as::<_, EffectLogRow>(
301            "SELECT
302                correlation_id,
303                event_id,
304                handler_id,
305                status,
306                attempts,
307                event_type,
308                result,
309                error,
310                created_at,
311                execute_at,
312                claimed_at,
313                last_attempted_at,
314                completed_at
315             FROM seesaw_handler_executions
316             WHERE ($1::uuid IS NULL OR correlation_id = $1)
317             ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
318             LIMIT $2",
319        )
320        .bind(correlation_id)
321        .bind(limit as i64)
322        .fetch_all(&self.pool)
323        .await?;
324
325        Ok(rows
326            .into_iter()
327            .map(|row| {
328                let started_at = row.claimed_at.or(row.last_attempted_at);
329                let duration_ms = match (started_at, row.completed_at) {
330                    (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
331                    _ => None,
332                };
333
334                EffectExecutionLog {
335                    correlation_id: row.correlation_id,
336                    event_id: row.event_id,
337                    handler_id: row.handler_id,
338                    status: row.status,
339                    attempts: row.attempts,
340                    event_type: Some(row.event_type),
341                    result: row.result,
342                    error: row.error,
343                    created_at: row.created_at,
344                    execute_at: Some(row.execute_at),
345                    claimed_at: row.claimed_at,
346                    last_attempted_at: row.last_attempted_at,
347                    completed_at: row.completed_at,
348                    duration_ms,
349                }
350            })
351            .collect())
352    }
353
354    async fn get_dead_letters(
355        &self,
356        unresolved_only: bool,
357        limit: usize,
358    ) -> Result<Vec<DeadLetterEntry>> {
359        let rows = sqlx::query_as::<_, DeadLetterRow>(
360            "SELECT
361                correlation_id,
362                event_id,
363                handler_id,
364                event_type,
365                event_payload,
366                error,
367                reason,
368                attempts,
369                failed_at,
370                resolved_at
371             FROM seesaw_dlq
372             WHERE (NOT $1 OR resolved_at IS NULL)
373             ORDER BY failed_at DESC
374             LIMIT $2",
375        )
376        .bind(unresolved_only)
377        .bind(limit as i64)
378        .fetch_all(&self.pool)
379        .await?;
380
381        Ok(rows
382            .into_iter()
383            .map(|row| DeadLetterEntry {
384                correlation_id: row.correlation_id,
385                event_id: row.event_id,
386                handler_id: row.handler_id,
387                event_type: row.event_type,
388                event_payload: row.event_payload,
389                error: row.error,
390                reason: row.reason,
391                attempts: row.attempts,
392                failed_at: row.failed_at,
393                resolved_at: row.resolved_at,
394            })
395            .collect())
396    }
397
398    async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
399        let rows = sqlx::query_as::<_, FailedWorkflowRow>(
400            "WITH effect_agg AS (
401                SELECT
402                    correlation_id,
403                    COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
404                    COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
405                    MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
406                    MAX(error) FILTER (WHERE status = 'failed') AS last_error
407                FROM seesaw_handler_executions
408                GROUP BY correlation_id
409             ),
410             dlq_agg AS (
411                SELECT
412                    correlation_id,
413                    COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
414                    MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
415                    MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
416                FROM seesaw_dlq
417                GROUP BY correlation_id
418             )
419             SELECT
420                COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
421                COALESCE(e.failed_effects, 0) AS failed_effects,
422                COALESCE(e.active_effects, 0) AS active_effects,
423                COALESCE(d.dead_letters, 0) AS dead_letters,
424                GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
425                COALESCE(d.last_dlq_error, e.last_error) AS last_error
426             FROM effect_agg e
427             FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
428             WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
429             ORDER BY last_failed_at DESC NULLS LAST
430             LIMIT $1",
431        )
432        .bind(limit as i64)
433        .fetch_all(&self.pool)
434        .await?;
435
436        Ok(rows
437            .into_iter()
438            .map(|row| FailedWorkflow {
439                correlation_id: row.correlation_id,
440                failed_effects: row.failed_effects,
441                active_effects: row.active_effects,
442                dead_letters: row.dead_letters,
443                last_failed_at: row.last_failed_at,
444                last_error: row.last_error,
445            })
446            .collect())
447    }
448}
449
450// ============================================================================
451// Helper functions
452// ============================================================================
453
454fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
455    let stream_type = match row.stream_type.as_str() {
456        "event_dispatched" => StreamType::EventDispatched,
457        "effect_started" => StreamType::EffectStarted,
458        "effect_completed" => StreamType::EffectCompleted,
459        "effect_failed" => StreamType::EffectFailed,
460        _ => StreamType::EventDispatched, // Default fallback
461    };
462
463    // Extract event_type from payload if it's an event
464    let event_type = if stream_type == StreamType::EventDispatched {
465        row.payload
466            .as_ref()
467            .and_then(|p| p.get("event_type"))
468            .and_then(|v| v.as_str())
469            .map(|s| s.to_string())
470    } else {
471        None
472    };
473
474    InsightEvent {
475        seq: row.seq,
476        stream_type,
477        correlation_id: row.correlation_id,
478        event_id: row.event_id,
479        effect_event_id: row.effect_event_id,
480        handler_id: row.handler_id,
481        event_type,
482        status: row.status,
483        error: row.error,
484        payload: row.payload,
485        created_at: row.created_at,
486    }
487}
488
489fn build_event_tree(
490    events: &[EventRow],
491    effects: &[EffectTreeRow],
492    parent_id: Option<Uuid>,
493    event_ids: &HashSet<Uuid>,
494    is_root_pass: bool,
495) -> Vec<EventNode> {
496    events
497        .iter()
498        .filter(|event| {
499            if is_root_pass {
500                event.parent_id.is_none()
501                    || event
502                        .parent_id
503                        .map(|parent| !event_ids.contains(&parent))
504                        .unwrap_or(false)
505            } else {
506                event.parent_id == parent_id
507            }
508        })
509        .map(|event| {
510            // Get effects for this event
511            let event_effects: Vec<HandlerNode> = effects
512                .iter()
513                .filter(|eff| eff.event_id == event.event_id)
514                .map(|eff| HandlerNode {
515                    handler_id: eff.handler_id.clone(),
516                    event_id: eff.event_id,
517                    status: eff.status.clone(),
518                    result: eff.result.clone(),
519                    error: eff.error.clone(),
520                    attempts: eff.attempts,
521                    created_at: eff.created_at,
522                    batch_id: eff.batch_id,
523                    batch_index: eff.batch_index,
524                    batch_size: eff.batch_size,
525                })
526                .collect();
527
528            // Recursively build children
529            let children =
530                build_event_tree(events, effects, Some(event.event_id), event_ids, false);
531
532            EventNode {
533                event_id: event.event_id,
534                event_type: event.event_type.clone(),
535                payload: event.payload.clone(),
536                created_at: event.created_at,
537                batch_id: event.batch_id,
538                batch_index: event.batch_index,
539                batch_size: event.batch_size,
540                children,
541                effects: event_effects,
542            }
543        })
544        .collect()
545}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550    use std::collections::HashSet;
551
552    #[test]
553    fn build_event_tree_treats_orphan_parent_as_root() {
554        let correlation_id = Uuid::new_v4();
555        let event_id = Uuid::new_v4();
556        let missing_parent = Uuid::new_v4();
557        let now = Utc::now();
558
559        let events = vec![EventRow {
560            id: 1,
561            event_id,
562            parent_id: Some(missing_parent),
563            correlation_id,
564            event_type: "OrphanEvent".to_string(),
565            payload: serde_json::json!({"ok": true}),
566            hops: 1,
567            batch_id: None,
568            batch_index: None,
569            batch_size: None,
570            created_at: now,
571        }];
572
573        let effects: Vec<EffectTreeRow> = Vec::new();
574        let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
575
576        let roots = build_event_tree(&events, &effects, None, &event_ids, true);
577
578        assert_eq!(roots.len(), 1);
579        assert_eq!(roots[0].event_id, event_id);
580    }
581}