Skip to main content

rivet/
journal.rs

1//! **Layer: Observability**
2// Query methods and event fields are intentionally defined for future consumers
3// (storage, CLI inspection, notifications).  Suppress dead_code for this module.
4#![allow(dead_code)]
5//!
6//! `RunJournal` is the canonical in-memory record of everything that happened
7//! during a pipeline run.  It accumulates typed, timestamped events and can
8//! answer the four observability questions from the Epic 10 DoD:
9//!
10//! | Question               | Method               |
11//! |------------------------|----------------------|
12//! | What was planned?      | `plan_snapshot()`    |
13//! | What happened?         | `files()`, `retries()`, `chunk_events()` |
14//! | What degraded?         | `quality_issues()`, `schema_changes()`, `warnings()` |
15//! | What was the outcome?  | `final_outcome()`    |
16//!
17//! `RunJournal` is currently embedded in `RunSummary` so that all pipeline
18//! modules — which already hold `&mut RunSummary` — can record events without
19//! signature changes.  A future epic will invert the relationship so that
20//! `RunSummary` is derived from `RunJournal`.
21
22//!
23//! This module is the canonical home for journal types. It deliberately has no
24//! dependencies on `plan`, `state`, or `pipeline` so that storage (state) and
25//! orchestration (pipeline) can both depend on it without creating a cycle.
26//! The `From<&ResolvedRunPlan>` conversion lives in `pipeline/summary.rs`
27//! beside the call site that needs it.
28
29use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31
32// ─── Plan snapshot ───────────────────────────────────────────────────────────
33
34/// Owned, serialisable snapshot of the resolved execution plan, captured at
35/// the moment a run starts.  Answers "what was planned?" without requiring the
36/// original `ResolvedRunPlan` to remain in scope.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct PlanSnapshot {
39    pub export_name: String,
40    pub base_query: String,
41    pub strategy: String,
42    pub format: String,
43    pub compression: String,
44    pub destination_type: String,
45    pub tuning_profile: String,
46    pub batch_size: usize,
47    pub validate: bool,
48    pub reconcile: bool,
49    pub resume: bool,
50}
51
52// ─── Events ──────────────────────────────────────────────────────────────────
53
54/// A single typed event emitted during a pipeline run.
55///
56/// Variants are grouped by DoD question:
57/// - *Planned* — `PlanResolved`, `PlanWarning`
58/// - *Happened* — `FileWritten`, `ChunkStarted`, `ChunkCompleted`, `ChunkFailed`, `RetryAttempted`
59/// - *Degraded* — `QualityIssue`, `SchemaChanged`, `Warning`
60/// - *Succeeded* — `ValidationResult`, `ReconciliationResult`
61/// - *Outcome* — `RunCompleted`
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum RunEvent {
64    // ── Planned ──────────────────────────────────────────────
65    /// Emitted once at the start of a run with a snapshot of the resolved plan.
66    PlanResolved(PlanSnapshot),
67    /// A plan validation diagnostic at `Warning` or `Degraded` level.
68    PlanWarning { rule: String, message: String },
69
70    // ── Happened ─────────────────────────────────────────────
71    /// One output file was successfully written to the destination.
72    FileWritten {
73        file_name: String,
74        rows: i64,
75        bytes: u64,
76        part_index: usize,
77    },
78    /// A chunk task transitioned from `pending` to `running`.
79    ChunkStarted {
80        chunk_index: i64,
81        start_key: String,
82        end_key: String,
83    },
84    /// A chunk task completed successfully.
85    ChunkCompleted {
86        chunk_index: i64,
87        rows: i64,
88        file_name: Option<String>,
89    },
90    /// A chunk task failed (may be retried up to `max_chunk_attempts`).
91    ChunkFailed {
92        chunk_index: i64,
93        error: String,
94        attempt: i64,
95    },
96    /// The pipeline is about to retry after a transient error.
97    RetryAttempted {
98        attempt: u32,
99        reason: String,
100        backoff_ms: u64,
101    },
102
103    // ── Degraded ─────────────────────────────────────────────
104    /// One quality rule fired (severity: `"FAIL"` or `"WARN"`).
105    QualityIssue { severity: String, message: String },
106    /// The output schema differs from the previously stored snapshot.
107    SchemaChanged {
108        /// Columns added since the last run, formatted as `"name (type)"`.
109        added: Vec<String>,
110        /// Column names removed since the last run.
111        removed: Vec<String>,
112        /// `(column, old_type, new_type)` for columns whose type changed.
113        type_changed: Vec<(String, String, String)>,
114    },
115    /// A non-fatal warning that does not fit another variant.
116    Warning { context: String, message: String },
117    /// The OPT-2 concurrency governor changed the active parallelism in
118    /// response to source pressure. `from`/`to` are permit counts; `reason`
119    /// describes the trigger (e.g. `"pressure rising: backed off"`).
120    ParallelismAdjusted {
121        from: usize,
122        to: usize,
123        reason: String,
124    },
125
126    // ── Succeeded ────────────────────────────────────────────
127    /// Output file row-count validation completed.
128    ValidationResult { passed: bool },
129    /// Source COUNT(*) reconciliation completed.
130    ReconciliationResult {
131        source_count: i64,
132        exported_rows: i64,
133        matched: bool,
134    },
135
136    // ── Outcome ──────────────────────────────────────────────
137    /// Terminal event — the run has reached its final state.
138    RunCompleted {
139        status: String,
140        error_message: Option<String>,
141        duration_ms: i64,
142    },
143}
144
145// ─── Journal entry ───────────────────────────────────────────────────────────
146
147/// A timestamped wrapper around a `RunEvent`.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct JournalEntry {
150    pub recorded_at: DateTime<Utc>,
151    pub event: RunEvent,
152}
153
154// ─── RunJournal ──────────────────────────────────────────────────────────────
155
156/// Canonical in-memory record of a pipeline run.
157///
158/// Accumulated during execution via `record()`.  Query methods let callers
159/// answer the four DoD questions without iterating `entries` directly.
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct RunJournal {
162    pub run_id: String,
163    pub export_name: String,
164    /// All events in insertion order.
165    pub entries: Vec<JournalEntry>,
166}
167
168impl RunJournal {
169    pub fn new(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
170        Self {
171            run_id: run_id.into(),
172            export_name: export_name.into(),
173            entries: Vec::new(),
174        }
175    }
176
177    /// Append an event with the current UTC timestamp.
178    pub fn record(&mut self, event: RunEvent) {
179        self.entries.push(JournalEntry {
180            recorded_at: Utc::now(),
181            event,
182        });
183    }
184
185    // ── What was planned? ─────────────────────────────────────
186
187    /// Returns the plan snapshot recorded at the start of the run.
188    pub fn plan_snapshot(&self) -> Option<&PlanSnapshot> {
189        self.entries.iter().find_map(|e| {
190            if let RunEvent::PlanResolved(s) = &e.event {
191                Some(s)
192            } else {
193                None
194            }
195        })
196    }
197
198    // ── What happened? ────────────────────────────────────────
199
200    /// All `FileWritten` entries, in the order files were committed.
201    pub fn files(&self) -> Vec<&JournalEntry> {
202        self.entries
203            .iter()
204            .filter(|e| matches!(e.event, RunEvent::FileWritten { .. }))
205            .collect()
206    }
207
208    /// All `RetryAttempted` entries.
209    pub fn retries(&self) -> Vec<&JournalEntry> {
210        self.entries
211            .iter()
212            .filter(|e| matches!(e.event, RunEvent::RetryAttempted { .. }))
213            .collect()
214    }
215
216    /// All chunk lifecycle entries (`ChunkStarted`, `ChunkCompleted`, `ChunkFailed`).
217    pub fn chunk_events(&self) -> Vec<&JournalEntry> {
218        self.entries
219            .iter()
220            .filter(|e| {
221                matches!(
222                    e.event,
223                    RunEvent::ChunkStarted { .. }
224                        | RunEvent::ChunkCompleted { .. }
225                        | RunEvent::ChunkFailed { .. }
226                )
227            })
228            .collect()
229    }
230
231    // ── What degraded? ────────────────────────────────────────
232
233    /// All `QualityIssue` entries (both FAIL and WARN severity).
234    pub fn quality_issues(&self) -> Vec<&JournalEntry> {
235        self.entries
236            .iter()
237            .filter(|e| matches!(e.event, RunEvent::QualityIssue { .. }))
238            .collect()
239    }
240
241    /// All `SchemaChanged` entries.
242    pub fn schema_changes(&self) -> Vec<&JournalEntry> {
243        self.entries
244            .iter()
245            .filter(|e| matches!(e.event, RunEvent::SchemaChanged { .. }))
246            .collect()
247    }
248
249    /// All `Warning` and `PlanWarning` entries.
250    pub fn warnings(&self) -> Vec<&JournalEntry> {
251        self.entries
252            .iter()
253            .filter(|e| {
254                matches!(
255                    e.event,
256                    RunEvent::Warning { .. } | RunEvent::PlanWarning { .. }
257                )
258            })
259            .collect()
260    }
261
262    // ── What was the final outcome? ───────────────────────────
263
264    /// The last `RunCompleted` entry, or `None` if the run has not yet finished.
265    pub fn final_outcome(&self) -> Option<&JournalEntry> {
266        self.entries
267            .iter()
268            .rev()
269            .find(|e| matches!(e.event, RunEvent::RunCompleted { .. }))
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    fn journal() -> RunJournal {
278        RunJournal::new("run-1", "orders")
279    }
280
281    fn snap() -> PlanSnapshot {
282        PlanSnapshot {
283            export_name: "orders".into(),
284            base_query: "SELECT 1".into(),
285            strategy: "snapshot".into(),
286            format: "parquet".into(),
287            compression: "zstd".into(),
288            destination_type: "local".into(),
289            tuning_profile: "balanced".into(),
290            batch_size: 1000,
291            validate: false,
292            reconcile: false,
293            resume: false,
294        }
295    }
296
297    // ── construction ────────────────────────────────────────────────────────
298
299    #[test]
300    fn new_journal_is_empty() {
301        let j = journal();
302        assert_eq!(j.run_id, "run-1");
303        assert_eq!(j.export_name, "orders");
304        assert!(j.entries.is_empty());
305    }
306
307    // ── record ───────────────────────────────────────────────────────────────
308
309    #[test]
310    fn record_appends_entry() {
311        let mut j = journal();
312        j.record(RunEvent::Warning {
313            context: "test".into(),
314            message: "w".into(),
315        });
316        assert_eq!(j.entries.len(), 1);
317    }
318
319    #[test]
320    fn record_multiple_entries_in_order() {
321        let mut j = journal();
322        j.record(RunEvent::Warning {
323            context: "a".into(),
324            message: "1".into(),
325        });
326        j.record(RunEvent::Warning {
327            context: "b".into(),
328            message: "2".into(),
329        });
330        assert_eq!(j.entries.len(), 2);
331    }
332
333    // ── plan_snapshot ────────────────────────────────────────────────────────
334
335    #[test]
336    fn plan_snapshot_none_when_empty() {
337        assert!(journal().plan_snapshot().is_none());
338    }
339
340    #[test]
341    fn plan_snapshot_returns_first_resolved() {
342        let mut j = journal();
343        j.record(RunEvent::PlanResolved(snap()));
344        let s = j.plan_snapshot().unwrap();
345        assert_eq!(s.export_name, "orders");
346        assert_eq!(s.batch_size, 1000);
347    }
348
349    // ── files ────────────────────────────────────────────────────────────────
350
351    #[test]
352    fn files_empty_when_no_file_written() {
353        let mut j = journal();
354        j.record(RunEvent::Warning {
355            context: "x".into(),
356            message: "y".into(),
357        });
358        assert!(j.files().is_empty());
359    }
360
361    #[test]
362    fn files_returns_file_written_entries() {
363        let mut j = journal();
364        j.record(RunEvent::FileWritten {
365            file_name: "f.parquet".into(),
366            rows: 100,
367            bytes: 4096,
368            part_index: 0,
369        });
370        j.record(RunEvent::Warning {
371            context: "x".into(),
372            message: "y".into(),
373        });
374        j.record(RunEvent::FileWritten {
375            file_name: "g.parquet".into(),
376            rows: 50,
377            bytes: 2048,
378            part_index: 1,
379        });
380        assert_eq!(j.files().len(), 2);
381    }
382
383    // ── retries ──────────────────────────────────────────────────────────────
384
385    #[test]
386    fn retries_empty_when_none_recorded() {
387        assert!(journal().retries().is_empty());
388    }
389
390    #[test]
391    fn retries_returns_retry_attempted_entries() {
392        let mut j = journal();
393        j.record(RunEvent::RetryAttempted {
394            attempt: 1,
395            reason: "timeout".into(),
396            backoff_ms: 500,
397        });
398        j.record(RunEvent::RetryAttempted {
399            attempt: 2,
400            reason: "timeout".into(),
401            backoff_ms: 1000,
402        });
403        assert_eq!(j.retries().len(), 2);
404    }
405
406    // ── chunk_events ─────────────────────────────────────────────────────────
407
408    #[test]
409    fn chunk_events_collects_all_three_variant_types() {
410        let mut j = journal();
411        j.record(RunEvent::ChunkStarted {
412            chunk_index: 0,
413            start_key: "0".into(),
414            end_key: "100".into(),
415        });
416        j.record(RunEvent::ChunkCompleted {
417            chunk_index: 0,
418            rows: 100,
419            file_name: None,
420        });
421        j.record(RunEvent::ChunkFailed {
422            chunk_index: 1,
423            error: "err".into(),
424            attempt: 1,
425        });
426        j.record(RunEvent::Warning {
427            context: "x".into(),
428            message: "y".into(),
429        });
430        assert_eq!(j.chunk_events().len(), 3);
431    }
432
433    // ── quality_issues ───────────────────────────────────────────────────────
434
435    #[test]
436    fn quality_issues_filters_correctly() {
437        let mut j = journal();
438        j.record(RunEvent::QualityIssue {
439            severity: "FAIL".into(),
440            message: "null check".into(),
441        });
442        j.record(RunEvent::Warning {
443            context: "x".into(),
444            message: "y".into(),
445        });
446        assert_eq!(j.quality_issues().len(), 1);
447    }
448
449    // ── schema_changes ───────────────────────────────────────────────────────
450
451    #[test]
452    fn schema_changes_filters_correctly() {
453        let mut j = journal();
454        j.record(RunEvent::SchemaChanged {
455            added: vec!["new_col (Int64)".into()],
456            removed: vec![],
457            type_changed: vec![],
458        });
459        assert_eq!(j.schema_changes().len(), 1);
460    }
461
462    // ── warnings ─────────────────────────────────────────────────────────────
463
464    #[test]
465    fn warnings_includes_both_warning_and_plan_warning() {
466        let mut j = journal();
467        j.record(RunEvent::Warning {
468            context: "ctx".into(),
469            message: "w1".into(),
470        });
471        j.record(RunEvent::PlanWarning {
472            rule: "r".into(),
473            message: "w2".into(),
474        });
475        j.record(RunEvent::QualityIssue {
476            severity: "WARN".into(),
477            message: "q".into(),
478        });
479        assert_eq!(j.warnings().len(), 2);
480    }
481
482    // ── final_outcome ─────────────────────────────────────────────────────────
483
484    #[test]
485    fn final_outcome_none_when_not_completed() {
486        let mut j = journal();
487        j.record(RunEvent::Warning {
488            context: "x".into(),
489            message: "y".into(),
490        });
491        assert!(j.final_outcome().is_none());
492    }
493
494    #[test]
495    fn final_outcome_returns_last_run_completed() {
496        let mut j = journal();
497        j.record(RunEvent::RunCompleted {
498            status: "success".into(),
499            error_message: None,
500            duration_ms: 1234,
501        });
502        j.record(RunEvent::Warning {
503            context: "x".into(),
504            message: "y".into(),
505        });
506        j.record(RunEvent::RunCompleted {
507            status: "failed".into(),
508            error_message: Some("err".into()),
509            duration_ms: 5678,
510        });
511        let outcome = j.final_outcome().unwrap();
512        if let RunEvent::RunCompleted { status, .. } = &outcome.event {
513            assert_eq!(status, "failed");
514        } else {
515            panic!("expected RunCompleted");
516        }
517    }
518}