Skip to main content

rivet/
journal.rs

1//! **Layer: Observability**
2// Query methods and event fields are intentionally defined for future consumers
3// (storage, CLI inspection, notifications).  Suppress dead_code for this module.
4#![allow(dead_code)]
5//!
6//! `RunJournal` is the canonical in-memory record of everything that happened
7//! during a pipeline run.  It accumulates typed, timestamped events and can
8//! answer the four observability questions from the Epic 10 DoD:
9//!
10//! | Question               | Method               |
11//! |------------------------|----------------------|
12//! | What was planned?      | `plan_snapshot()`    |
13//! | What happened?         | `files()`, `retries()`, `chunk_events()` |
14//! | What degraded?         | `quality_issues()`, `schema_changes()`, `warnings()` |
15//! | What was the outcome?  | `final_outcome()`    |
16//!
17//! `RunJournal` is currently embedded in `RunSummary` so that all pipeline
18//! modules — which already hold `&mut RunSummary` — can record events without
19//! signature changes.  A future epic will invert the relationship so that
20//! `RunSummary` is derived from `RunJournal`.
21
22//!
23//! This module is the canonical home for journal types. It deliberately has no
24//! dependencies on `plan`, `state`, or `pipeline` so that storage (state) and
25//! orchestration (pipeline) can both depend on it without creating a cycle.
26//! The `From<&ResolvedRunPlan>` conversion lives in `pipeline/summary.rs`
27//! beside the call site that needs it.
28
29use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31
32// ─── Plan snapshot ───────────────────────────────────────────────────────────
33
34/// Owned, serialisable snapshot of the resolved execution plan, captured at
35/// the moment a run starts.  Answers "what was planned?" without requiring the
36/// original `ResolvedRunPlan` to remain in scope.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct PlanSnapshot {
39    pub export_name: String,
40    pub base_query: String,
41    pub strategy: String,
42    pub format: String,
43    pub compression: String,
44    pub destination_type: String,
45    pub tuning_profile: String,
46    pub batch_size: usize,
47    pub validate: bool,
48    pub reconcile: bool,
49    pub resume: bool,
50}
51
52// ─── Events ──────────────────────────────────────────────────────────────────
53
54/// A single typed event emitted during a pipeline run.
55///
56/// Variants are grouped by DoD question:
57/// - *Planned* — `PlanResolved`, `PlanWarning`
58/// - *Happened* — `FileWritten`, `ChunkStarted`, `ChunkCompleted`, `ChunkFailed`, `RetryAttempted`
59/// - *Degraded* — `QualityIssue`, `SchemaChanged`, `Warning`
60/// - *Succeeded* — `ValidationResult`, `ReconciliationResult`
61/// - *Outcome* — `RunCompleted`
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum RunEvent {
64    // ── Planned ──────────────────────────────────────────────
65    /// Emitted once at the start of a run with a snapshot of the resolved plan.
66    PlanResolved(PlanSnapshot),
67    /// A plan validation diagnostic at `Warning` or `Degraded` level.
68    PlanWarning { rule: String, message: String },
69
70    // ── Happened ─────────────────────────────────────────────
71    /// One output file was successfully written to the destination.
72    FileWritten {
73        file_name: String,
74        rows: i64,
75        bytes: u64,
76        part_index: usize,
77    },
78    /// A chunk task transitioned from `pending` to `running`.
79    ChunkStarted {
80        chunk_index: i64,
81        start_key: String,
82        end_key: String,
83    },
84    /// A chunk task completed successfully.
85    ChunkCompleted {
86        chunk_index: i64,
87        rows: i64,
88        file_name: Option<String>,
89    },
90    /// A chunk task failed (may be retried up to `max_chunk_attempts`).
91    ChunkFailed {
92        chunk_index: i64,
93        error: String,
94        attempt: i64,
95    },
96    /// The pipeline is about to retry after a transient error.
97    RetryAttempted {
98        attempt: u32,
99        reason: String,
100        backoff_ms: u64,
101    },
102
103    // ── Degraded ─────────────────────────────────────────────
104    /// One quality rule fired (severity: `"FAIL"` or `"WARN"`).
105    QualityIssue { severity: String, message: String },
106    /// The output schema differs from the previously stored snapshot.
107    SchemaChanged {
108        /// Columns added since the last run, formatted as `"name (type)"`.
109        added: Vec<String>,
110        /// Column names removed since the last run.
111        removed: Vec<String>,
112        /// `(column, old_type, new_type)` for columns whose type changed.
113        type_changed: Vec<(String, String, String)>,
114    },
115    /// A non-fatal warning that does not fit another variant.
116    Warning { context: String, message: String },
117
118    // ── Succeeded ────────────────────────────────────────────
119    /// Output file row-count validation completed.
120    ValidationResult { passed: bool },
121    /// Source COUNT(*) reconciliation completed.
122    ReconciliationResult {
123        source_count: i64,
124        exported_rows: i64,
125        matched: bool,
126    },
127
128    // ── Outcome ──────────────────────────────────────────────
129    /// Terminal event — the run has reached its final state.
130    RunCompleted {
131        status: String,
132        error_message: Option<String>,
133        duration_ms: i64,
134    },
135}
136
137// ─── Journal entry ───────────────────────────────────────────────────────────
138
139/// A timestamped wrapper around a `RunEvent`.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct JournalEntry {
142    pub recorded_at: DateTime<Utc>,
143    pub event: RunEvent,
144}
145
146// ─── RunJournal ──────────────────────────────────────────────────────────────
147
148/// Canonical in-memory record of a pipeline run.
149///
150/// Accumulated during execution via `record()`.  Query methods let callers
151/// answer the four DoD questions without iterating `entries` directly.
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct RunJournal {
154    pub run_id: String,
155    pub export_name: String,
156    /// All events in insertion order.
157    pub entries: Vec<JournalEntry>,
158}
159
160impl RunJournal {
161    pub fn new(run_id: impl Into<String>, export_name: impl Into<String>) -> Self {
162        Self {
163            run_id: run_id.into(),
164            export_name: export_name.into(),
165            entries: Vec::new(),
166        }
167    }
168
169    /// Append an event with the current UTC timestamp.
170    pub fn record(&mut self, event: RunEvent) {
171        self.entries.push(JournalEntry {
172            recorded_at: Utc::now(),
173            event,
174        });
175    }
176
177    // ── What was planned? ─────────────────────────────────────
178
179    /// Returns the plan snapshot recorded at the start of the run.
180    pub fn plan_snapshot(&self) -> Option<&PlanSnapshot> {
181        self.entries.iter().find_map(|e| {
182            if let RunEvent::PlanResolved(s) = &e.event {
183                Some(s)
184            } else {
185                None
186            }
187        })
188    }
189
190    // ── What happened? ────────────────────────────────────────
191
192    /// All `FileWritten` entries, in the order files were committed.
193    pub fn files(&self) -> Vec<&JournalEntry> {
194        self.entries
195            .iter()
196            .filter(|e| matches!(e.event, RunEvent::FileWritten { .. }))
197            .collect()
198    }
199
200    /// All `RetryAttempted` entries.
201    pub fn retries(&self) -> Vec<&JournalEntry> {
202        self.entries
203            .iter()
204            .filter(|e| matches!(e.event, RunEvent::RetryAttempted { .. }))
205            .collect()
206    }
207
208    /// All chunk lifecycle entries (`ChunkStarted`, `ChunkCompleted`, `ChunkFailed`).
209    pub fn chunk_events(&self) -> Vec<&JournalEntry> {
210        self.entries
211            .iter()
212            .filter(|e| {
213                matches!(
214                    e.event,
215                    RunEvent::ChunkStarted { .. }
216                        | RunEvent::ChunkCompleted { .. }
217                        | RunEvent::ChunkFailed { .. }
218                )
219            })
220            .collect()
221    }
222
223    // ── What degraded? ────────────────────────────────────────
224
225    /// All `QualityIssue` entries (both FAIL and WARN severity).
226    pub fn quality_issues(&self) -> Vec<&JournalEntry> {
227        self.entries
228            .iter()
229            .filter(|e| matches!(e.event, RunEvent::QualityIssue { .. }))
230            .collect()
231    }
232
233    /// All `SchemaChanged` entries.
234    pub fn schema_changes(&self) -> Vec<&JournalEntry> {
235        self.entries
236            .iter()
237            .filter(|e| matches!(e.event, RunEvent::SchemaChanged { .. }))
238            .collect()
239    }
240
241    /// All `Warning` and `PlanWarning` entries.
242    pub fn warnings(&self) -> Vec<&JournalEntry> {
243        self.entries
244            .iter()
245            .filter(|e| {
246                matches!(
247                    e.event,
248                    RunEvent::Warning { .. } | RunEvent::PlanWarning { .. }
249                )
250            })
251            .collect()
252    }
253
254    // ── What was the final outcome? ───────────────────────────
255
256    /// The last `RunCompleted` entry, or `None` if the run has not yet finished.
257    pub fn final_outcome(&self) -> Option<&JournalEntry> {
258        self.entries
259            .iter()
260            .rev()
261            .find(|e| matches!(e.event, RunEvent::RunCompleted { .. }))
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268
269    fn journal() -> RunJournal {
270        RunJournal::new("run-1", "orders")
271    }
272
273    fn snap() -> PlanSnapshot {
274        PlanSnapshot {
275            export_name: "orders".into(),
276            base_query: "SELECT 1".into(),
277            strategy: "snapshot".into(),
278            format: "parquet".into(),
279            compression: "zstd".into(),
280            destination_type: "local".into(),
281            tuning_profile: "balanced".into(),
282            batch_size: 1000,
283            validate: false,
284            reconcile: false,
285            resume: false,
286        }
287    }
288
289    // ── construction ────────────────────────────────────────────────────────
290
291    #[test]
292    fn new_journal_is_empty() {
293        let j = journal();
294        assert_eq!(j.run_id, "run-1");
295        assert_eq!(j.export_name, "orders");
296        assert!(j.entries.is_empty());
297    }
298
299    // ── record ───────────────────────────────────────────────────────────────
300
301    #[test]
302    fn record_appends_entry() {
303        let mut j = journal();
304        j.record(RunEvent::Warning {
305            context: "test".into(),
306            message: "w".into(),
307        });
308        assert_eq!(j.entries.len(), 1);
309    }
310
311    #[test]
312    fn record_multiple_entries_in_order() {
313        let mut j = journal();
314        j.record(RunEvent::Warning {
315            context: "a".into(),
316            message: "1".into(),
317        });
318        j.record(RunEvent::Warning {
319            context: "b".into(),
320            message: "2".into(),
321        });
322        assert_eq!(j.entries.len(), 2);
323    }
324
325    // ── plan_snapshot ────────────────────────────────────────────────────────
326
327    #[test]
328    fn plan_snapshot_none_when_empty() {
329        assert!(journal().plan_snapshot().is_none());
330    }
331
332    #[test]
333    fn plan_snapshot_returns_first_resolved() {
334        let mut j = journal();
335        j.record(RunEvent::PlanResolved(snap()));
336        let s = j.plan_snapshot().unwrap();
337        assert_eq!(s.export_name, "orders");
338        assert_eq!(s.batch_size, 1000);
339    }
340
341    // ── files ────────────────────────────────────────────────────────────────
342
343    #[test]
344    fn files_empty_when_no_file_written() {
345        let mut j = journal();
346        j.record(RunEvent::Warning {
347            context: "x".into(),
348            message: "y".into(),
349        });
350        assert!(j.files().is_empty());
351    }
352
353    #[test]
354    fn files_returns_file_written_entries() {
355        let mut j = journal();
356        j.record(RunEvent::FileWritten {
357            file_name: "f.parquet".into(),
358            rows: 100,
359            bytes: 4096,
360            part_index: 0,
361        });
362        j.record(RunEvent::Warning {
363            context: "x".into(),
364            message: "y".into(),
365        });
366        j.record(RunEvent::FileWritten {
367            file_name: "g.parquet".into(),
368            rows: 50,
369            bytes: 2048,
370            part_index: 1,
371        });
372        assert_eq!(j.files().len(), 2);
373    }
374
375    // ── retries ──────────────────────────────────────────────────────────────
376
377    #[test]
378    fn retries_empty_when_none_recorded() {
379        assert!(journal().retries().is_empty());
380    }
381
382    #[test]
383    fn retries_returns_retry_attempted_entries() {
384        let mut j = journal();
385        j.record(RunEvent::RetryAttempted {
386            attempt: 1,
387            reason: "timeout".into(),
388            backoff_ms: 500,
389        });
390        j.record(RunEvent::RetryAttempted {
391            attempt: 2,
392            reason: "timeout".into(),
393            backoff_ms: 1000,
394        });
395        assert_eq!(j.retries().len(), 2);
396    }
397
398    // ── chunk_events ─────────────────────────────────────────────────────────
399
400    #[test]
401    fn chunk_events_collects_all_three_variant_types() {
402        let mut j = journal();
403        j.record(RunEvent::ChunkStarted {
404            chunk_index: 0,
405            start_key: "0".into(),
406            end_key: "100".into(),
407        });
408        j.record(RunEvent::ChunkCompleted {
409            chunk_index: 0,
410            rows: 100,
411            file_name: None,
412        });
413        j.record(RunEvent::ChunkFailed {
414            chunk_index: 1,
415            error: "err".into(),
416            attempt: 1,
417        });
418        j.record(RunEvent::Warning {
419            context: "x".into(),
420            message: "y".into(),
421        });
422        assert_eq!(j.chunk_events().len(), 3);
423    }
424
425    // ── quality_issues ───────────────────────────────────────────────────────
426
427    #[test]
428    fn quality_issues_filters_correctly() {
429        let mut j = journal();
430        j.record(RunEvent::QualityIssue {
431            severity: "FAIL".into(),
432            message: "null check".into(),
433        });
434        j.record(RunEvent::Warning {
435            context: "x".into(),
436            message: "y".into(),
437        });
438        assert_eq!(j.quality_issues().len(), 1);
439    }
440
441    // ── schema_changes ───────────────────────────────────────────────────────
442
443    #[test]
444    fn schema_changes_filters_correctly() {
445        let mut j = journal();
446        j.record(RunEvent::SchemaChanged {
447            added: vec!["new_col (Int64)".into()],
448            removed: vec![],
449            type_changed: vec![],
450        });
451        assert_eq!(j.schema_changes().len(), 1);
452    }
453
454    // ── warnings ─────────────────────────────────────────────────────────────
455
456    #[test]
457    fn warnings_includes_both_warning_and_plan_warning() {
458        let mut j = journal();
459        j.record(RunEvent::Warning {
460            context: "ctx".into(),
461            message: "w1".into(),
462        });
463        j.record(RunEvent::PlanWarning {
464            rule: "r".into(),
465            message: "w2".into(),
466        });
467        j.record(RunEvent::QualityIssue {
468            severity: "WARN".into(),
469            message: "q".into(),
470        });
471        assert_eq!(j.warnings().len(), 2);
472    }
473
474    // ── final_outcome ─────────────────────────────────────────────────────────
475
476    #[test]
477    fn final_outcome_none_when_not_completed() {
478        let mut j = journal();
479        j.record(RunEvent::Warning {
480            context: "x".into(),
481            message: "y".into(),
482        });
483        assert!(j.final_outcome().is_none());
484    }
485
486    #[test]
487    fn final_outcome_returns_last_run_completed() {
488        let mut j = journal();
489        j.record(RunEvent::RunCompleted {
490            status: "success".into(),
491            error_message: None,
492            duration_ms: 1234,
493        });
494        j.record(RunEvent::Warning {
495            context: "x".into(),
496            message: "y".into(),
497        });
498        j.record(RunEvent::RunCompleted {
499            status: "failed".into(),
500            error_message: Some("err".into()),
501            duration_ms: 5678,
502        });
503        let outcome = j.final_outcome().unwrap();
504        if let RunEvent::RunCompleted { status, .. } = &outcome.event {
505            assert_eq!(status, "failed");
506        } else {
507            panic!("expected RunCompleted");
508        }
509    }
510}