1pub mod event_store;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use seesaw_core::insight::*;
7use sqlx::{FromRow, PgPool};
8use std::collections::HashSet;
9use uuid::Uuid;
10
11pub struct PostgresStore {
16 pool: PgPool,
17}
18
19impl PostgresStore {
20 pub fn new(pool: PgPool) -> Self {
21 Self { pool }
22 }
23
24 pub fn pool(&self) -> &PgPool {
25 &self.pool
26 }
27
28 pub async fn is_processed(&self, event_id: Uuid) -> Result<bool> {
30 let result: bool = sqlx::query_scalar(
31 "SELECT EXISTS(SELECT 1 FROM seesaw_processed WHERE event_id = $1)",
32 )
33 .bind(event_id)
34 .fetch_one(&self.pool)
35 .await?;
36
37 Ok(result)
38 }
39}
40
41impl Clone for PostgresStore {
42 fn clone(&self) -> Self {
43 Self {
44 pool: self.pool.clone(),
45 }
46 }
47}
48
49#[derive(FromRow)]
54struct EventRow {
55 #[allow(dead_code)]
56 id: i64,
57 event_id: Uuid,
58 parent_id: Option<Uuid>,
59 #[allow(dead_code)]
60 correlation_id: Uuid,
61 event_type: String,
62 payload: serde_json::Value,
63 #[allow(dead_code)]
64 hops: i32,
65 batch_id: Option<Uuid>,
66 batch_index: Option<i32>,
67 batch_size: Option<i32>,
68 created_at: DateTime<Utc>,
69}
70
71#[derive(FromRow)]
72struct EffectTreeRow {
73 event_id: Uuid,
74 handler_id: String,
75 status: String,
76 result: Option<serde_json::Value>,
77 error: Option<String>,
78 attempts: i32,
79 created_at: DateTime<Utc>,
80 batch_id: Option<Uuid>,
81 batch_index: Option<i32>,
82 batch_size: Option<i32>,
83}
84
85#[derive(FromRow)]
86struct StreamRow {
87 seq: i64,
88 stream_type: String,
89 correlation_id: Uuid,
90 event_id: Option<Uuid>,
91 effect_event_id: Option<Uuid>,
92 handler_id: Option<String>,
93 status: Option<String>,
94 error: Option<String>,
95 payload: Option<serde_json::Value>,
96 created_at: DateTime<Utc>,
97}
98
99#[derive(FromRow)]
100struct EffectLogRow {
101 correlation_id: Uuid,
102 event_id: Uuid,
103 handler_id: String,
104 status: String,
105 attempts: i32,
106 event_type: String,
107 result: Option<serde_json::Value>,
108 error: Option<String>,
109 created_at: DateTime<Utc>,
110 execute_at: DateTime<Utc>,
111 claimed_at: Option<DateTime<Utc>>,
112 last_attempted_at: Option<DateTime<Utc>>,
113 completed_at: Option<DateTime<Utc>>,
114}
115
116#[derive(FromRow)]
117struct DeadLetterRow {
118 correlation_id: Uuid,
119 event_id: Uuid,
120 handler_id: String,
121 event_type: String,
122 event_payload: serde_json::Value,
123 error: String,
124 reason: String,
125 attempts: i32,
126 failed_at: DateTime<Utc>,
127 resolved_at: Option<DateTime<Utc>>,
128}
129
130#[derive(FromRow)]
131struct FailedWorkflowRow {
132 correlation_id: Uuid,
133 failed_effects: i64,
134 active_effects: i64,
135 dead_letters: i64,
136 last_failed_at: Option<DateTime<Utc>>,
137 last_error: Option<String>,
138}
139
140#[async_trait]
145impl InsightStore for PostgresStore {
146 async fn subscribe_events(
147 &self,
148 ) -> Result<Box<dyn futures::Stream<Item = InsightEvent> + Send + Unpin>> {
149 use futures::stream::StreamExt;
150 use sqlx::postgres::PgListener;
151
152 let mut listener = PgListener::connect_with(&self.pool).await?;
154 listener.listen("seesaw_stream").await?;
155
156 let pool = self.pool.clone();
158 let stream = listener.into_stream().filter_map(move |result| {
159 let pool = pool.clone();
160 Box::pin(async move {
161 match result {
162 Ok(_notification) => {
163 if let Ok(row) = sqlx::query_as::<_, StreamRow>(
166 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
167 handler_id, status, error, payload, created_at
168 FROM seesaw_stream
169 ORDER BY seq DESC
170 LIMIT 1",
171 )
172 .fetch_one(&pool)
173 .await
174 {
175 Some(stream_row_to_insight_event(row))
176 } else {
177 None
178 }
179 }
180 Err(_) => None,
181 }
182 })
183 });
184
185 Ok(Box::new(stream))
186 }
187
188 async fn get_workflow_tree(&self, correlation_id: Uuid) -> Result<seesaw_core::WorkflowTree> {
189 let events = sqlx::query_as::<_, EventRow>(
191 "SELECT id, event_id, parent_id, correlation_id, event_type, payload, hops,
192 batch_id, batch_index, batch_size, created_at
193 FROM seesaw_events
194 WHERE correlation_id = $1
195 ORDER BY created_at ASC",
196 )
197 .bind(correlation_id)
198 .fetch_all(&self.pool)
199 .await?;
200
201 let effects = sqlx::query_as::<_, EffectTreeRow>(
203 "SELECT event_id, handler_id, status, result, error, attempts, created_at,
204 batch_id, batch_index, batch_size
205 FROM seesaw_handler_executions
206 WHERE correlation_id = $1
207 ORDER BY created_at ASC",
208 )
209 .bind(correlation_id)
210 .fetch_all(&self.pool)
211 .await?;
212
213 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
215 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
216
217 Ok(seesaw_core::WorkflowTree {
218 correlation_id,
219 roots,
220 event_count: events.len(),
221 effect_count: effects.len(),
222 })
223 }
224
225 async fn get_stats(&self) -> Result<seesaw_core::InsightStats> {
226 let total_events = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM seesaw_events")
227 .fetch_one(&self.pool)
228 .await?
229 .0;
230
231 let active_effects = sqlx::query_as::<_, (i64,)>(
232 "SELECT COUNT(*) FROM seesaw_handler_executions
233 WHERE status IN ('pending', 'executing')",
234 )
235 .fetch_one(&self.pool)
236 .await?
237 .0;
238
239 let completed_effects = sqlx::query_as::<_, (i64,)>(
240 "SELECT COUNT(*) FROM seesaw_handler_executions WHERE status = 'completed'",
241 )
242 .fetch_one(&self.pool)
243 .await?
244 .0;
245
246 let failed_effects = sqlx::query_as::<_, (i64,)>(
247 "SELECT COUNT(*) FROM seesaw_handler_executions WHERE status = 'failed'",
248 )
249 .fetch_one(&self.pool)
250 .await?
251 .0;
252
253 Ok(seesaw_core::InsightStats {
254 total_events,
255 active_effects,
256 completed_effects,
257 failed_effects,
258 })
259 }
260
261 async fn get_recent_events(
262 &self,
263 cursor: Option<i64>,
264 limit: usize,
265 ) -> Result<Vec<InsightEvent>> {
266 let rows = if let Some(cursor_seq) = cursor {
267 sqlx::query_as::<_, StreamRow>(
268 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
269 handler_id, status, error, payload, created_at
270 FROM seesaw_stream
271 WHERE seq > $1
272 ORDER BY seq ASC
273 LIMIT $2",
274 )
275 .bind(cursor_seq)
276 .bind(limit as i64)
277 .fetch_all(&self.pool)
278 .await?
279 } else {
280 sqlx::query_as::<_, StreamRow>(
281 "SELECT seq, stream_type, correlation_id, event_id, effect_event_id,
282 handler_id, status, error, payload, created_at
283 FROM seesaw_stream
284 ORDER BY seq DESC
285 LIMIT $1",
286 )
287 .bind(limit as i64)
288 .fetch_all(&self.pool)
289 .await?
290 };
291
292 Ok(rows.into_iter().map(stream_row_to_insight_event).collect())
293 }
294
295 async fn get_effect_logs(
296 &self,
297 correlation_id: Option<Uuid>,
298 limit: usize,
299 ) -> Result<Vec<EffectExecutionLog>> {
300 let rows = sqlx::query_as::<_, EffectLogRow>(
301 "SELECT
302 correlation_id,
303 event_id,
304 handler_id,
305 status,
306 attempts,
307 event_type,
308 result,
309 error,
310 created_at,
311 execute_at,
312 claimed_at,
313 last_attempted_at,
314 completed_at
315 FROM seesaw_handler_executions
316 WHERE ($1::uuid IS NULL OR correlation_id = $1)
317 ORDER BY COALESCE(last_attempted_at, created_at) DESC, event_id DESC
318 LIMIT $2",
319 )
320 .bind(correlation_id)
321 .bind(limit as i64)
322 .fetch_all(&self.pool)
323 .await?;
324
325 Ok(rows
326 .into_iter()
327 .map(|row| {
328 let started_at = row.claimed_at.or(row.last_attempted_at);
329 let duration_ms = match (started_at, row.completed_at) {
330 (Some(start), Some(end)) => Some((end - start).num_milliseconds().max(0)),
331 _ => None,
332 };
333
334 EffectExecutionLog {
335 correlation_id: row.correlation_id,
336 event_id: row.event_id,
337 handler_id: row.handler_id,
338 status: row.status,
339 attempts: row.attempts,
340 event_type: Some(row.event_type),
341 result: row.result,
342 error: row.error,
343 created_at: row.created_at,
344 execute_at: Some(row.execute_at),
345 claimed_at: row.claimed_at,
346 last_attempted_at: row.last_attempted_at,
347 completed_at: row.completed_at,
348 duration_ms,
349 }
350 })
351 .collect())
352 }
353
354 async fn get_dead_letters(
355 &self,
356 unresolved_only: bool,
357 limit: usize,
358 ) -> Result<Vec<DeadLetterEntry>> {
359 let rows = sqlx::query_as::<_, DeadLetterRow>(
360 "SELECT
361 correlation_id,
362 event_id,
363 handler_id,
364 event_type,
365 event_payload,
366 error,
367 reason,
368 attempts,
369 failed_at,
370 resolved_at
371 FROM seesaw_dlq
372 WHERE (NOT $1 OR resolved_at IS NULL)
373 ORDER BY failed_at DESC
374 LIMIT $2",
375 )
376 .bind(unresolved_only)
377 .bind(limit as i64)
378 .fetch_all(&self.pool)
379 .await?;
380
381 Ok(rows
382 .into_iter()
383 .map(|row| DeadLetterEntry {
384 correlation_id: row.correlation_id,
385 event_id: row.event_id,
386 handler_id: row.handler_id,
387 event_type: row.event_type,
388 event_payload: row.event_payload,
389 error: row.error,
390 reason: row.reason,
391 attempts: row.attempts,
392 failed_at: row.failed_at,
393 resolved_at: row.resolved_at,
394 })
395 .collect())
396 }
397
398 async fn get_failed_workflows(&self, limit: usize) -> Result<Vec<FailedWorkflow>> {
399 let rows = sqlx::query_as::<_, FailedWorkflowRow>(
400 "WITH effect_agg AS (
401 SELECT
402 correlation_id,
403 COUNT(*) FILTER (WHERE status = 'failed')::BIGINT AS failed_effects,
404 COUNT(*) FILTER (WHERE status IN ('pending', 'executing'))::BIGINT AS active_effects,
405 MAX(last_attempted_at) FILTER (WHERE status = 'failed') AS last_failed_at,
406 MAX(error) FILTER (WHERE status = 'failed') AS last_error
407 FROM seesaw_handler_executions
408 GROUP BY correlation_id
409 ),
410 dlq_agg AS (
411 SELECT
412 correlation_id,
413 COUNT(*) FILTER (WHERE resolved_at IS NULL)::BIGINT AS dead_letters,
414 MAX(failed_at) FILTER (WHERE resolved_at IS NULL) AS last_dlq_at,
415 MAX(error) FILTER (WHERE resolved_at IS NULL) AS last_dlq_error
416 FROM seesaw_dlq
417 GROUP BY correlation_id
418 )
419 SELECT
420 COALESCE(e.correlation_id, d.correlation_id) AS correlation_id,
421 COALESCE(e.failed_effects, 0) AS failed_effects,
422 COALESCE(e.active_effects, 0) AS active_effects,
423 COALESCE(d.dead_letters, 0) AS dead_letters,
424 GREATEST(e.last_failed_at, d.last_dlq_at) AS last_failed_at,
425 COALESCE(d.last_dlq_error, e.last_error) AS last_error
426 FROM effect_agg e
427 FULL OUTER JOIN dlq_agg d ON d.correlation_id = e.correlation_id
428 WHERE COALESCE(e.failed_effects, 0) > 0 OR COALESCE(d.dead_letters, 0) > 0
429 ORDER BY last_failed_at DESC NULLS LAST
430 LIMIT $1",
431 )
432 .bind(limit as i64)
433 .fetch_all(&self.pool)
434 .await?;
435
436 Ok(rows
437 .into_iter()
438 .map(|row| FailedWorkflow {
439 correlation_id: row.correlation_id,
440 failed_effects: row.failed_effects,
441 active_effects: row.active_effects,
442 dead_letters: row.dead_letters,
443 last_failed_at: row.last_failed_at,
444 last_error: row.last_error,
445 })
446 .collect())
447 }
448}
449
450fn stream_row_to_insight_event(row: StreamRow) -> InsightEvent {
455 let stream_type = match row.stream_type.as_str() {
456 "event_dispatched" => StreamType::EventDispatched,
457 "effect_started" => StreamType::EffectStarted,
458 "effect_completed" => StreamType::EffectCompleted,
459 "effect_failed" => StreamType::EffectFailed,
460 _ => StreamType::EventDispatched, };
462
463 let event_type = if stream_type == StreamType::EventDispatched {
465 row.payload
466 .as_ref()
467 .and_then(|p| p.get("event_type"))
468 .and_then(|v| v.as_str())
469 .map(|s| s.to_string())
470 } else {
471 None
472 };
473
474 InsightEvent {
475 seq: row.seq,
476 stream_type,
477 correlation_id: row.correlation_id,
478 event_id: row.event_id,
479 effect_event_id: row.effect_event_id,
480 handler_id: row.handler_id,
481 event_type,
482 status: row.status,
483 error: row.error,
484 payload: row.payload,
485 created_at: row.created_at,
486 }
487}
488
489fn build_event_tree(
490 events: &[EventRow],
491 effects: &[EffectTreeRow],
492 parent_id: Option<Uuid>,
493 event_ids: &HashSet<Uuid>,
494 is_root_pass: bool,
495) -> Vec<EventNode> {
496 events
497 .iter()
498 .filter(|event| {
499 if is_root_pass {
500 event.parent_id.is_none()
501 || event
502 .parent_id
503 .map(|parent| !event_ids.contains(&parent))
504 .unwrap_or(false)
505 } else {
506 event.parent_id == parent_id
507 }
508 })
509 .map(|event| {
510 let event_effects: Vec<HandlerNode> = effects
512 .iter()
513 .filter(|eff| eff.event_id == event.event_id)
514 .map(|eff| HandlerNode {
515 handler_id: eff.handler_id.clone(),
516 event_id: eff.event_id,
517 status: eff.status.clone(),
518 result: eff.result.clone(),
519 error: eff.error.clone(),
520 attempts: eff.attempts,
521 created_at: eff.created_at,
522 batch_id: eff.batch_id,
523 batch_index: eff.batch_index,
524 batch_size: eff.batch_size,
525 })
526 .collect();
527
528 let children =
530 build_event_tree(events, effects, Some(event.event_id), event_ids, false);
531
532 EventNode {
533 event_id: event.event_id,
534 event_type: event.event_type.clone(),
535 payload: event.payload.clone(),
536 created_at: event.created_at,
537 batch_id: event.batch_id,
538 batch_index: event.batch_index,
539 batch_size: event.batch_size,
540 children,
541 effects: event_effects,
542 }
543 })
544 .collect()
545}
546
547#[cfg(test)]
548mod tests {
549 use super::*;
550 use std::collections::HashSet;
551
552 #[test]
553 fn build_event_tree_treats_orphan_parent_as_root() {
554 let correlation_id = Uuid::new_v4();
555 let event_id = Uuid::new_v4();
556 let missing_parent = Uuid::new_v4();
557 let now = Utc::now();
558
559 let events = vec![EventRow {
560 id: 1,
561 event_id,
562 parent_id: Some(missing_parent),
563 correlation_id,
564 event_type: "OrphanEvent".to_string(),
565 payload: serde_json::json!({"ok": true}),
566 hops: 1,
567 batch_id: None,
568 batch_index: None,
569 batch_size: None,
570 created_at: now,
571 }];
572
573 let effects: Vec<EffectTreeRow> = Vec::new();
574 let event_ids: HashSet<Uuid> = events.iter().map(|event| event.event_id).collect();
575
576 let roots = build_event_tree(&events, &effects, None, &event_ids, true);
577
578 assert_eq!(roots.len(), 1);
579 assert_eq!(roots[0].event_id, event_id);
580 }
581}