Skip to main content

rivet/
journal.rs

1//! **Layer: Observability**
2// Query methods and event fields are intentionally defined for future consumers
3// (storage, CLI inspection, notifications).  Suppress dead_code for this module.
4#![allow(dead_code)]
5//!
6//! `RunJournal` is the canonical in-memory record of everything that happened
7//! during a pipeline run.  It accumulates typed, timestamped events and can
8//! answer the four observability questions from the Epic 10 DoD:
9//!
10//! | Question               | Method               |
11//! |------------------------|----------------------|
12//! | What was planned?      | `plan_snapshot()`    |
13//! | What happened?         | `files()`, `retries()`, `chunk_events()` |
14//! | What degraded?         | `quality_issues()`, `schema_changes()`, `warnings()` |
15//! | What was the outcome?  | `final_outcome()`    |
16//!
17//! `RunJournal` is currently embedded in `RunSummary` so that all pipeline
18//! modules — which already hold `&mut RunSummary` — can record events without
19//! signature changes.  A future epic will invert the relationship so that
20//! `RunSummary` is derived from `RunJournal`.
21
22//!
23//! This module is the canonical home for journal types. It deliberately has no
24//! dependencies on `plan`, `state`, or `pipeline` so that storage (state) and
25//! orchestration (pipeline) can both depend on it without creating a cycle.
26//! The `From<&ResolvedRunPlan>` conversion lives in `pipeline/summary.rs`
27//! beside the call site that needs it.
28
29use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31
32// ─── Plan snapshot ───────────────────────────────────────────────────────────
33
34/// Owned, serialisable snapshot of the resolved execution plan, captured at
35/// the moment a run starts.  Answers "what was planned?" without requiring the
36/// original `ResolvedRunPlan` to remain in scope.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct PlanSnapshot {
39    pub export_name: String,
40    pub base_query: String,
41    pub strategy: String,
42    pub format: String,
43    pub compression: String,
44    pub destination_type: String,
45    pub tuning_profile: String,
46    pub batch_size: usize,
47    pub validate: bool,
48    pub reconcile: bool,
49    pub resume: bool,
50}
51
52// ─── Events ──────────────────────────────────────────────────────────────────
53
54/// A single typed event emitted during a pipeline run.
55///
56/// Variants are grouped by DoD question:
57/// - *Planned* — `PlanResolved`, `PlanWarning`
58/// - *Happened* — `FileWritten`, `ChunkStarted`, `ChunkCompleted`, `ChunkFailed`, `RetryAttempted`
59/// - *Degraded* — `QualityIssue`, `SchemaChanged`, `Warning`
60/// - *Succeeded* — `ValidationResult`, `ReconciliationResult`
61/// - *Outcome* — `RunCompleted`
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum RunEvent {
64    // ── Planned ──────────────────────────────────────────────
65    /// Emitted once at the start of a run with a snapshot of the resolved plan.
66    PlanResolved(PlanSnapshot),
67    /// A plan validation diagnostic at `Warning` or `Degraded` level.
68    PlanWarning { rule: String, message: String },
69
70    // ── Happened ─────────────────────────────────────────────
71    /// One output file was successfully written to the destination.
72    FileWritten {
73        file_name: String,
74        rows: i64,
75        bytes: u64,
76        part_index: usize,
77    },
78    /// A chunk task transitioned from `pending` to `running`.
79    ChunkStarted {
80        chunk_index: i64,
81        start_key: String,
82        end_key: String,
83    },
84    /// A chunk task completed successfully.
85    ChunkCompleted {
86        chunk_index: i64,
87        rows: i64,
88        file_name: Option<String>,
89    },
90    /// A chunk task failed (may be retried up to `max_chunk_attempts`).
91    ChunkFailed {
92        chunk_index: i64,
93        error: String,
94        attempt: i64,
95    },
96    /// The pipeline is about to retry after a transient error.
97    RetryAttempted {
98        attempt: u32,
99        reason: String,
100        backoff_ms: u64,
101    },
102
103    // ── Degraded ─────────────────────────────────────────────
104    /// One quality rule fired (severity: `"FAIL"` or `"WARN"`).
105    QualityIssue { severity: String, message: String },
106    /// The output schema differs from the previously stored snapshot.
107    SchemaChanged {
108        /// Columns added since the last run, formatted as `"name (type)"`.
109        added: Vec<String>,
110        /// Column names removed since the last run.
111        removed: Vec<String>,
112        /// `(column, old_type, new_type)` for columns whose type changed.
113        type_changed: Vec<(String, String, String)>,
114    },
115    /// A non-fatal warning that does not fit another variant.
116    Warning { context: String, message: String },
117    /// The OPT-2 concurrency governor changed the active parallelism in
118    /// response to source pressure. `from`/`to` are permit counts; `reason`
119    /// describes the trigger (e.g. `"pressure rising: backed off"`).
120    ParallelismAdjusted {
121        from: usize,
122        to: usize,
123        reason: String,
124    },
125
126    // ── Succeeded ────────────────────────────────────────────
127    /// Output file row-count validation completed.
128    ValidationResult { passed: bool },
129    /// Source COUNT(*) reconciliation completed.
130    ReconciliationResult {
131        source_count: i64,
132        exported_rows: i64,
133        matched: bool,
134    },
135
136    // ── Outcome ──────────────────────────────────────────────
137    /// Terminal event — the run has reached its final state.
138    RunCompleted {
139        status: String,
140        error_message: Option<String>,
141        duration_ms: i64,
142    },
143}
144
145// ─── Journal entry ───────────────────────────────────────────────────────────
146
147/// A timestamped wrapper around a `RunEvent`.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct JournalEntry {
150    pub recorded_at: DateTime<Utc>,
151    pub event: RunEvent,
152}
153
154// ─── RunJournal ──────────────────────────────────────────────────────────────
155
156/// Canonical in-memory record of a pipeline run.
157///
158/// Accumulated during execution via `record()`.  Query methods let callers
159/// answer the four DoD questions without iterating `entries` directly.
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct RunJournal {
162    pub run_id: String,
163    pub export_name: String,
164    /// All events in insertion order.
165    pub entries: Vec<JournalEntry>,
166}
167
168impl RunJournal {
169    pub fn new(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
170        Self {
171            run_id: run_id.into(),
172            export_name: export_name.into(),
173            entries: Vec::new(),
174        }
175    }
176
177    /// Append an event with the current UTC timestamp.
178    pub fn record(&mut self, event: RunEvent) {
179        self.entries.push(JournalEntry {
180            recorded_at: Utc::now(),
181            event,
182        });
183    }
184
185    // ── What was planned? ─────────────────────────────────────
186
187    /// Returns the plan snapshot recorded at the start of the run.
188    pub fn plan_snapshot(&self) -> Option<&PlanSnapshot> {
189        self.entries.iter().find_map(|e| {
190            if let RunEvent::PlanResolved(s) = &e.event {
191                Some(s)
192            } else {
193                None
194            }
195        })
196    }
197
198    // ── What happened? ────────────────────────────────────────
199
200    /// All `FileWritten` entries, in the order files were committed.
201    pub fn files(&self) -> Vec<&JournalEntry> {
202        self.entries
203            .iter()
204            .filter(|e| matches!(e.event, RunEvent::FileWritten { .. }))
205            .collect()
206    }
207
208    /// All `RetryAttempted` entries.
209    pub fn retries(&self) -> Vec<&JournalEntry> {
210        self.entries
211            .iter()
212            .filter(|e| matches!(e.event, RunEvent::RetryAttempted { .. }))
213            .collect()
214    }
215
216    /// All chunk lifecycle entries (`ChunkStarted`, `ChunkCompleted`, `ChunkFailed`).
217    pub fn chunk_events(&self) -> Vec<&JournalEntry> {
218        self.entries
219            .iter()
220            .filter(|e| {
221                matches!(
222                    e.event,
223                    RunEvent::ChunkStarted { .. }
224                        | RunEvent::ChunkCompleted { .. }
225                        | RunEvent::ChunkFailed { .. }
226                )
227            })
228            .collect()
229    }
230
231    /// The longest single-chunk wall time in milliseconds, if derivable.
232    ///
233    /// Pairs each `ChunkStarted` with its `ChunkCompleted` by `chunk_index`
234    /// (robust to the interleaving the parallel runners produce) and returns the
235    /// largest gap. This is the #5 source-harm lever — how long one chunk query,
236    /// and the snapshot / locks it holds, stayed open — made measurable.
237    ///
238    /// `None` when no pair is found: a non-chunked run, or a parallel runner that
239    /// records `ChunkCompleted` in a single post-scope batch (so no real
240    /// per-chunk start time exists). The sequential and checkpoint paths
241    /// timestamp each event as it happens, so they yield true per-chunk timings.
242    pub fn longest_chunk_ms(&self) -> Option<i64> {
243        let mut started: std::collections::HashMap<i64, DateTime<Utc>> =
244            std::collections::HashMap::new();
245        let mut max_ms: Option<i64> = None;
246        for e in &self.entries {
247            match &e.event {
248                RunEvent::ChunkStarted { chunk_index, .. } => {
249                    started.insert(*chunk_index, e.recorded_at);
250                }
251                RunEvent::ChunkCompleted { chunk_index, .. } => {
252                    if let Some(start) = started.get(chunk_index) {
253                        let ms = (e.recorded_at - *start).num_milliseconds();
254                        if ms >= 0 {
255                            max_ms = Some(max_ms.map_or(ms, |m| m.max(ms)));
256                        }
257                    }
258                }
259                _ => {}
260            }
261        }
262        max_ms
263    }
264
265    // ── What degraded? ────────────────────────────────────────
266
267    /// All `QualityIssue` entries (both FAIL and WARN severity).
268    pub fn quality_issues(&self) -> Vec<&JournalEntry> {
269        self.entries
270            .iter()
271            .filter(|e| matches!(e.event, RunEvent::QualityIssue { .. }))
272            .collect()
273    }
274
275    /// All `SchemaChanged` entries.
276    pub fn schema_changes(&self) -> Vec<&JournalEntry> {
277        self.entries
278            .iter()
279            .filter(|e| matches!(e.event, RunEvent::SchemaChanged { .. }))
280            .collect()
281    }
282
283    /// All `Warning` and `PlanWarning` entries.
284    pub fn warnings(&self) -> Vec<&JournalEntry> {
285        self.entries
286            .iter()
287            .filter(|e| {
288                matches!(
289                    e.event,
290                    RunEvent::Warning { .. } | RunEvent::PlanWarning { .. }
291                )
292            })
293            .collect()
294    }
295
296    // ── What was the final outcome? ───────────────────────────
297
298    /// The last `RunCompleted` entry, or `None` if the run has not yet finished.
299    pub fn final_outcome(&self) -> Option<&JournalEntry> {
300        self.entries
301            .iter()
302            .rev()
303            .find(|e| matches!(e.event, RunEvent::RunCompleted { .. }))
304    }
305}
306
307#[cfg(test)]
308impl RunJournal {
309    /// Test-only: append a paired `ChunkStarted` + `ChunkCompleted` for
310    /// `chunk_index` whose wall-clock span is exactly `dur_ms`, stamped with
311    /// explicit timestamps (`record()` stamps `Utc::now`, which a test can't
312    /// control). Lets a caller make `longest_chunk_ms()` deterministic without
313    /// reaching into the private-by-convention `entries` Vec.
314    pub(crate) fn push_test_chunk_span(&mut self, chunk_index: i64, dur_ms: i64) {
315        let base = Utc::now();
316        self.entries.push(JournalEntry {
317            recorded_at: base,
318            event: RunEvent::ChunkStarted {
319                chunk_index,
320                start_key: "0".into(),
321                end_key: "1".into(),
322            },
323        });
324        self.entries.push(JournalEntry {
325            recorded_at: base + chrono::Duration::milliseconds(dur_ms),
326            event: RunEvent::ChunkCompleted {
327                chunk_index,
328                rows: 1,
329                file_name: None,
330            },
331        });
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    fn journal() -> RunJournal {
340        RunJournal::new("run-1", "orders")
341    }
342
343    fn snap() -> PlanSnapshot {
344        PlanSnapshot {
345            export_name: "orders".into(),
346            base_query: "SELECT 1".into(),
347            strategy: "snapshot".into(),
348            format: "parquet".into(),
349            compression: "zstd".into(),
350            destination_type: "local".into(),
351            tuning_profile: "balanced".into(),
352            batch_size: 1000,
353            validate: false,
354            reconcile: false,
355            resume: false,
356        }
357    }
358
359    // ── construction ────────────────────────────────────────────────────────
360
361    #[test]
362    fn new_journal_is_empty() {
363        let j = journal();
364        assert_eq!(j.run_id, "run-1");
365        assert_eq!(j.export_name, "orders");
366        assert!(j.entries.is_empty());
367    }
368
369    // ── record ───────────────────────────────────────────────────────────────
370
371    #[test]
372    fn record_appends_entry() {
373        let mut j = journal();
374        j.record(RunEvent::Warning {
375            context: "test".into(),
376            message: "w".into(),
377        });
378        assert_eq!(j.entries.len(), 1);
379    }
380
381    #[test]
382    fn record_multiple_entries_in_order() {
383        let mut j = journal();
384        j.record(RunEvent::Warning {
385            context: "a".into(),
386            message: "1".into(),
387        });
388        j.record(RunEvent::Warning {
389            context: "b".into(),
390            message: "2".into(),
391        });
392        assert_eq!(j.entries.len(), 2);
393    }
394
395    // ── plan_snapshot ────────────────────────────────────────────────────────
396
397    #[test]
398    fn plan_snapshot_none_when_empty() {
399        assert!(journal().plan_snapshot().is_none());
400    }
401
402    #[test]
403    fn plan_snapshot_returns_first_resolved() {
404        let mut j = journal();
405        j.record(RunEvent::PlanResolved(snap()));
406        let s = j.plan_snapshot().unwrap();
407        assert_eq!(s.export_name, "orders");
408        assert_eq!(s.batch_size, 1000);
409    }
410
411    // ── files ────────────────────────────────────────────────────────────────
412
413    #[test]
414    fn files_empty_when_no_file_written() {
415        let mut j = journal();
416        j.record(RunEvent::Warning {
417            context: "x".into(),
418            message: "y".into(),
419        });
420        assert!(j.files().is_empty());
421    }
422
423    #[test]
424    fn files_returns_file_written_entries() {
425        let mut j = journal();
426        j.record(RunEvent::FileWritten {
427            file_name: "f.parquet".into(),
428            rows: 100,
429            bytes: 4096,
430            part_index: 0,
431        });
432        j.record(RunEvent::Warning {
433            context: "x".into(),
434            message: "y".into(),
435        });
436        j.record(RunEvent::FileWritten {
437            file_name: "g.parquet".into(),
438            rows: 50,
439            bytes: 2048,
440            part_index: 1,
441        });
442        assert_eq!(j.files().len(), 2);
443    }
444
445    // ── retries ──────────────────────────────────────────────────────────────
446
447    #[test]
448    fn retries_empty_when_none_recorded() {
449        assert!(journal().retries().is_empty());
450    }
451
452    #[test]
453    fn retries_returns_retry_attempted_entries() {
454        let mut j = journal();
455        j.record(RunEvent::RetryAttempted {
456            attempt: 1,
457            reason: "timeout".into(),
458            backoff_ms: 500,
459        });
460        j.record(RunEvent::RetryAttempted {
461            attempt: 2,
462            reason: "timeout".into(),
463            backoff_ms: 1000,
464        });
465        assert_eq!(j.retries().len(), 2);
466    }
467
468    // ── chunk_events ─────────────────────────────────────────────────────────
469
470    #[test]
471    fn chunk_events_collects_all_three_variant_types() {
472        let mut j = journal();
473        j.record(RunEvent::ChunkStarted {
474            chunk_index: 0,
475            start_key: "0".into(),
476            end_key: "100".into(),
477        });
478        j.record(RunEvent::ChunkCompleted {
479            chunk_index: 0,
480            rows: 100,
481            file_name: None,
482        });
483        j.record(RunEvent::ChunkFailed {
484            chunk_index: 1,
485            error: "err".into(),
486            attempt: 1,
487        });
488        j.record(RunEvent::Warning {
489            context: "x".into(),
490            message: "y".into(),
491        });
492        assert_eq!(j.chunk_events().len(), 3);
493    }
494
495    // ── longest_chunk_ms ──────────────────────────────────────────────────────
496
497    #[test]
498    fn longest_chunk_ms_pairs_started_and_completed_by_index() {
499        use chrono::Duration;
500        // Construct entries with explicit timestamps (record() stamps Utc::now,
501        // which we can't control). chunk 0 = 200ms, chunk 1 = 800ms; starts and
502        // completes interleave the way the parallel runner would order them.
503        let base = Utc::now();
504        let mut j = journal();
505        let push = |j: &mut RunJournal, off_ms: i64, event: RunEvent| {
506            j.entries.push(JournalEntry {
507                recorded_at: base + Duration::milliseconds(off_ms),
508                event,
509            });
510        };
511        let started = |i: i64| RunEvent::ChunkStarted {
512            chunk_index: i,
513            start_key: "0".into(),
514            end_key: "1".into(),
515        };
516        let done = |i: i64| RunEvent::ChunkCompleted {
517            chunk_index: i,
518            rows: 1,
519            file_name: None,
520        };
521        push(&mut j, 0, started(0));
522        push(&mut j, 50, started(1));
523        push(&mut j, 200, done(0)); // chunk 0: 200ms
524        push(&mut j, 850, done(1)); // chunk 1: 850 - 50 = 800ms (the max)
525        assert_eq!(j.longest_chunk_ms(), Some(800));
526    }
527
528    #[test]
529    fn longest_chunk_ms_none_without_paired_start() {
530        // The parallel post-scope batch shape: ChunkCompleted with no matching
531        // ChunkStarted → no real per-chunk timing → None (honest, not a bogus 0).
532        let mut j = journal();
533        j.record(RunEvent::ChunkCompleted {
534            chunk_index: 0,
535            rows: 1,
536            file_name: None,
537        });
538        assert!(j.longest_chunk_ms().is_none());
539        assert!(
540            journal().longest_chunk_ms().is_none(),
541            "empty journal → None"
542        );
543    }
544
545    // ── quality_issues ───────────────────────────────────────────────────────
546
547    #[test]
548    fn quality_issues_filters_correctly() {
549        let mut j = journal();
550        j.record(RunEvent::QualityIssue {
551            severity: "FAIL".into(),
552            message: "null check".into(),
553        });
554        j.record(RunEvent::Warning {
555            context: "x".into(),
556            message: "y".into(),
557        });
558        assert_eq!(j.quality_issues().len(), 1);
559    }
560
561    // ── schema_changes ───────────────────────────────────────────────────────
562
563    #[test]
564    fn schema_changes_filters_correctly() {
565        let mut j = journal();
566        j.record(RunEvent::SchemaChanged {
567            added: vec!["new_col (Int64)".into()],
568            removed: vec![],
569            type_changed: vec![],
570        });
571        assert_eq!(j.schema_changes().len(), 1);
572    }
573
574    // ── warnings ─────────────────────────────────────────────────────────────
575
576    #[test]
577    fn warnings_includes_both_warning_and_plan_warning() {
578        let mut j = journal();
579        j.record(RunEvent::Warning {
580            context: "ctx".into(),
581            message: "w1".into(),
582        });
583        j.record(RunEvent::PlanWarning {
584            rule: "r".into(),
585            message: "w2".into(),
586        });
587        j.record(RunEvent::QualityIssue {
588            severity: "WARN".into(),
589            message: "q".into(),
590        });
591        assert_eq!(j.warnings().len(), 2);
592    }
593
594    // ── final_outcome ─────────────────────────────────────────────────────────
595
596    #[test]
597    fn final_outcome_none_when_not_completed() {
598        let mut j = journal();
599        j.record(RunEvent::Warning {
600            context: "x".into(),
601            message: "y".into(),
602        });
603        assert!(j.final_outcome().is_none());
604    }
605
606    #[test]
607    fn final_outcome_returns_last_run_completed() {
608        let mut j = journal();
609        j.record(RunEvent::RunCompleted {
610            status: "success".into(),
611            error_message: None,
612            duration_ms: 1234,
613        });
614        j.record(RunEvent::Warning {
615            context: "x".into(),
616            message: "y".into(),
617        });
618        j.record(RunEvent::RunCompleted {
619            status: "failed".into(),
620            error_message: Some("err".into()),
621            duration_ms: 5678,
622        });
623        let outcome = j.final_outcome().unwrap();
624        if let RunEvent::RunCompleted { status, .. } = &outcome.event {
625            assert_eq!(status, "failed");
626        } else {
627            panic!("expected RunCompleted");
628        }
629    }
630}