Skip to main content

rivet/pipeline/
mod.rs

1//! **Layer: Coordinator** (planning → execution → persistence/observability)
2//!
3//! `pipeline/mod.rs` is the only module allowed to bridge all three layers.
4//! It reads a resolved plan (planning), dispatches to execution modules, then
5//! records metrics and sends notifications (persistence/observability).
6//!
7//! See `docs/adr/0003-layer-classification.md` for the full module taxonomy.
8
9mod aggregate;
10mod apply_cmd;
11mod cdc_job;
12pub(crate) mod chunked;
13mod cli;
14pub(crate) mod commit;
15mod finalize;
16pub(crate) mod ipc;
17mod job;
18mod keyset;
19mod manifest_reconcile;
20pub(crate) mod manifest_writer;
21mod parallel_children;
22pub(crate) mod parent_ui;
23mod partition_expand;
24mod plan_cmd;
25pub(crate) mod progress;
26mod reconcile_cmd;
27mod repair_cmd;
28pub(crate) mod report;
29mod resume_decisions;
30// `pub(crate)` so `error::classify_exit` can reach `retry::classify_error`
31// (transient → exit-code 2) without routing through the test-only re-export.
32pub(crate) mod retry;
33// The `rivet run` orchestrator (~290 LOC) lives next door so this facade
34// stays a thin re-export layer.  Module name shadows `pub fn run` below;
35// the duplicate is resolved by Rust's namespace rules (modules live in
36// the type namespace, fns in the value namespace) and unambiguous at
37// every call site (`pipeline::run(...)` is the function).
38mod run;
39mod run_store;
40mod schema_drift;
41mod single;
42mod sink;
43mod summary;
44mod validate;
45mod validate_cmd;
46mod validate_manifest;
47
48// ── Public API surface (consumed by `src/cli/dispatch.rs` + binaries) ──────
49//
50// These items are the contract the binary depends on.  Adding to this list
51// is an API change that requires a release-note entry; removing or
52// renaming requires a deprecation cycle.
53
54pub use apply_cmd::run_apply_command;
55pub use cli::{
56    reset_chunk_checkpoint, reset_chunk_checkpoints_stuck, reset_state, show_chunk_checkpoint,
57    show_files, show_journal, show_metrics, show_progression, show_state,
58};
59pub use plan_cmd::{PlanOutputFormat, run_plan_command};
60pub use reconcile_cmd::{ReconcileOutputFormat, run_reconcile_command};
61pub use repair_cmd::{RepairOutputFormat, RepairReportSource, run_repair_command};
62pub use validate_cmd::{ValidateOutputFormat, ValidateTarget, run_validate_command};
63// The graded verify depth is part of the CLI contract (`--depth`) and is the
64// same enum that gates the checks in `verify_at_destination`.  Defined in the
65// pipeline layer; re-exported here so `cli::args` can parse it on the flag
66// without the CLI→pipeline layering inversion of defining it in `cli`.
67pub use validate_manifest::ValidateDepth;
68
69// `RunSummary` is consumed by `notify::*` (via the Coordinator path) plus
70// integration-test fixtures.  It is the canonical observability struct so
71// it stays in the regular public surface.
72pub use summary::RunSummary;
73
74// ── Crate-internal cross-module use ────────────────────────────────────────
75
76pub(crate) use job::run_export_job_with_chunk_source;
77#[cfg(test)]
78#[allow(unused_imports)]
79pub(crate) use retry::is_transient;
80
81// ── Test-only surface ──────────────────────────────────────────────────────
82//
83// The integration tests in `tests/*.rs` exercise the trust-contract writers,
84// readers, and decision logic without spinning up a full pipeline.  These
85// items are NOT part of the public CLI contract — operators get them only
86// transitively (via `summary.json`, `manifest.json`, `--validate`, etc.).
87//
88// Hidden behind `#[doc(hidden)] pub mod for_tests` so they don't pollute
89// the rendered crate docs and so a renaming refactor here is a clear
90// "test-only" change rather than appearing as a public API break.
91//
92// Convention matches the existing `destination_for_tests` window in
93// `lib.rs`: tests reach these via `rivet::pipeline::for_tests::*`.
94
95#[doc(hidden)]
96pub mod for_tests {
97    pub use super::chunked::generate_chunks;
98    pub use super::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
99    pub use super::report::{RunReport, report_dir, write_run_report};
100    pub use super::resume_decisions::{
101        PartDecision, QuarantineReason, ResumeDecision, ResumePlan, UntrackedDecision,
102        build_resume_plan,
103    };
104    pub use super::retry::{RetryClass, classify_error};
105    pub use super::validate::validate_output;
106    pub use super::validate_manifest::{
107        Failure as ManifestVerificationFailure, ManifestVerification, verify_at_destination,
108    };
109    pub use crate::plan::build_time_window_query;
110}
111
112// Backwards-compat re-exports at the crate root so existing test files
113// keep compiling without a sweeping import-site update.  Each is delegated
114// to `for_tests::*`; new test code should import from `for_tests` directly.
115//
116// `#[allow(unused_imports)]` because the bin target's dead-code analysis
117// doesn't see the integration tests that consume these — same situation
118// as `RunSummary::stub_for_testing`.
119#[doc(hidden)]
120#[allow(unused_imports)]
121pub use for_tests::{
122    ManifestBuilder, ManifestVerification, ManifestVerificationFailure, PartDecision,
123    QuarantineReason, ResumeDecision, ResumePlan, RetryClass, RunReport, UntrackedDecision,
124    WriteOutcome, build_resume_plan, build_time_window_query, classify_error, generate_chunks,
125    report_dir, validate_output, verify_at_destination, write_manifest, write_run_report,
126};
127
128// The orchestrator and its `RunOptions` live in `run.rs`.  Re-exported
129// here so external call sites keep using `pipeline::run(...)` and
130// `pipeline::RunOptions`.  Multi-export render-mode flags ride along
131// because `RunSummary::print` and the in-place card renderer read them.
132pub use run::{RunOptions, run};
133#[allow(unused_imports)] // `multi_export_concurrent` is wired for future use
134pub(crate) use run::{multi_export_concurrent, multi_export_mode};
135
136pub(crate) fn format_bytes(b: u64) -> String {
137    if b >= 1_073_741_824 {
138        format!("{:.1} GB", b as f64 / 1_073_741_824.0)
139    } else if b >= 1_048_576 {
140        format!("{:.1} MB", b as f64 / 1_048_576.0)
141    } else if b >= 1024 {
142        format!("{:.1} KB", b as f64 / 1024.0)
143    } else {
144        format!("{} B", b)
145    }
146}
147
148/// Strip the trailing recovery-hint portion of a chunked-pipeline error
149/// message produced by `pipeline::chunked`.  Returns the cause prefix and
150/// whether a chunked-checkpoint hint was detected.
151///
152/// Hints emitted by `pipeline::chunked` always follow the pattern
153/// `<cause>; <connector> \`rivet …\` …`, so we cut at the first `; ` whose
154/// remainder contains a backtick-quoted `rivet` invocation.
155///
156/// Used by both the per-export card renderer (`parent_ui`) and the run
157/// aggregator (`aggregate`) so the long inline command doesn't wrap, distort
158/// the in-place card layout, and doesn't repeat the consolidated recovery
159/// block printed by the aggregator.
160pub(crate) fn strip_chunked_recovery_hint(msg: &str) -> (&str, bool) {
161    let mut pos = 0;
162    while let Some(off) = msg[pos..].find("; ") {
163        let abs = pos + off;
164        let tail = &msg[abs + 2..];
165        if tail.contains("`rivet ") {
166            return (&msg[..abs], true);
167        }
168        pos = abs + 2;
169    }
170    (msg, false)
171}
172
173/// Truncate `s` to at most `max_chars` Unicode characters, appending `…`
174/// when truncated.  Returns `s` unchanged if already short enough.  Used by
175/// the in-place card renderer to keep every line within the chosen
176/// terminal width — line wrapping breaks the cursor-up redraw math and
177/// causes cards to drift down the screen.
178pub(crate) fn clamp_line(s: &str, max_chars: usize) -> String {
179    if max_chars == 0 {
180        return String::new();
181    }
182    if s.chars().count() <= max_chars {
183        return s.to_string();
184    }
185    let keep = max_chars.saturating_sub(1);
186    let mut out: String = s.chars().take(keep).collect();
187    out.push('…');
188    out
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::config::{SourceConfig, SourceType};
195    use crate::plan::{
196        CompressionType, DestinationConfig, DestinationType, DiagnosticLevel, ExtractionStrategy,
197        FormatType, MetaColumns, ResolvedRunPlan, validate_plan,
198    };
199    use crate::tuning::SourceTuning;
200
201    #[test]
202    fn test_format_bytes() {
203        assert_eq!(format_bytes(500), "500 B");
204        assert_eq!(format_bytes(1024), "1.0 KB");
205        assert_eq!(format_bytes(1536), "1.5 KB");
206        assert_eq!(format_bytes(1_048_576), "1.0 MB");
207        assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
208        assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
209    }
210
211    #[test]
212    fn strip_chunked_recovery_hint_strips_use_form() {
213        let m = "export 'users': chunk checkpoint run 'users_x' still in progress; \
214                 use `rivet run --config foo.yaml --export users --resume` or \
215                 `rivet state reset-chunks --config foo.yaml --export users`";
216        let (cause, hinted) = strip_chunked_recovery_hint(m);
217        assert!(hinted);
218        assert_eq!(
219            cause,
220            "export 'users': chunk checkpoint run 'users_x' still in progress"
221        );
222    }
223
224    #[test]
225    fn strip_chunked_recovery_hint_strips_fix_errors_form() {
226        let m = "export 'a': chunk checkpoint incomplete (3 tasks not completed); \
227                 fix errors and `rivet run --config c.yaml --export a --resume` or \
228                 `rivet state reset-chunks --config c.yaml --export a`";
229        let (cause, hinted) = strip_chunked_recovery_hint(m);
230        assert!(hinted);
231        assert_eq!(
232            cause,
233            "export 'a': chunk checkpoint incomplete (3 tasks not completed)"
234        );
235    }
236
237    #[test]
238    fn strip_chunked_recovery_hint_passthrough_when_no_hint() {
239        let m = "export 'q': source connection refused; retry exhausted";
240        let (cause, hinted) = strip_chunked_recovery_hint(m);
241        assert!(!hinted);
242        assert_eq!(cause, m);
243    }
244
245    #[test]
246    fn clamp_line_truncates_with_ellipsis() {
247        assert_eq!(clamp_line("short", 80), "short");
248        assert_eq!(clamp_line("hello world", 8), "hello w…");
249        let s = "αβγδ".repeat(50);
250        let out = clamp_line(&s, 10);
251        assert_eq!(out.chars().count(), 10);
252        assert!(out.ends_with('…'));
253    }
254
255    #[test]
256    fn format_bytes_boundary_values() {
257        assert_eq!(format_bytes(0), "0 B");
258        assert_eq!(format_bytes(1), "1 B");
259        assert_eq!(format_bytes(1023), "1023 B");
260        assert_eq!(format_bytes(1024), "1.0 KB");
261        assert_eq!(format_bytes(1025), "1.0 KB");
262        assert_eq!(format_bytes(1_048_575), "1024.0 KB");
263        assert_eq!(format_bytes(1_048_576), "1.0 MB");
264        assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
265        assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
266    }
267
268    fn minimal_plan() -> ResolvedRunPlan {
269        ResolvedRunPlan {
270            export_name: "test_export".into(),
271            base_query: "SELECT 1".into(),
272            strategy: ExtractionStrategy::Snapshot,
273            format: FormatType::Parquet,
274            compression: CompressionType::default(),
275            compression_level: None,
276            max_file_size_bytes: None,
277            skip_empty: false,
278            meta_columns: MetaColumns::default(),
279            destination: DestinationConfig {
280                destination_type: DestinationType::Local,
281                path: Some("./out".into()),
282                ..Default::default()
283            },
284            quality: None,
285            tuning: SourceTuning::from_config(None),
286            tuning_profile_label: "balanced (default)".into(),
287            validate: false,
288            reconcile: false,
289            resume: false,
290            source: SourceConfig {
291                source_type: SourceType::Postgres,
292                url: Some("postgresql://localhost/test".into()),
293                url_env: None,
294                url_file: None,
295                host: None,
296                port: None,
297                user: None,
298                password: None,
299                password_env: None,
300                database: None,
301                environment: None,
302                tuning: None,
303                tls: None,
304            },
305            column_overrides: Default::default(),
306            verify: crate::config::VerifyMode::Size,
307            schema_drift_policy: Default::default(),
308            shape_drift_warn_factor: 2.0,
309            parquet: None,
310        }
311    }
312
313    #[test]
314    fn test_run_summary_fields() {
315        let plan = minimal_plan();
316        let summary = RunSummary::new(&plan);
317        assert_eq!(summary.export_name, "test_export");
318        assert_eq!(summary.status, "running");
319        assert_eq!(summary.total_rows, 0);
320        assert_eq!(summary.files_produced, 0);
321        assert_eq!(summary.tuning_profile, "balanced (default)");
322        assert_eq!(summary.batch_size, 10_000);
323        assert_eq!(summary.format, "parquet");
324        assert_eq!(summary.mode, "full");
325        assert!(
326            summary.run_id.starts_with("test_export_"),
327            "run_id should start with export name, got: {}",
328            summary.run_id
329        );
330    }
331
332    // ─── RunSummary::new() journal invariants ────────────────────────────────
333
334    /// `RunSummary::new()` must immediately record a `PlanResolved` event as the
335    /// first journal entry.  This satisfies the "what was planned?" query from ADR-0001.
336    #[test]
337    fn run_summary_new_records_plan_resolved_as_first_event() {
338        let plan = minimal_plan();
339        let summary = RunSummary::new(&plan);
340
341        assert!(
342            !summary.journal.entries.is_empty(),
343            "journal must have at least one entry after RunSummary::new()"
344        );
345        assert!(
346            matches!(
347                summary.journal.entries[0].event,
348                crate::journal::RunEvent::PlanResolved(_)
349            ),
350            "first journal event must be PlanResolved, got: {:?}",
351            summary.journal.entries[0].event
352        );
353    }
354
355    /// The `PlanSnapshot` recorded inside `PlanResolved` must faithfully capture
356    /// key fields from the `ResolvedRunPlan`.
357    #[test]
358    fn run_summary_plan_snapshot_matches_plan_fields() {
359        let plan = minimal_plan();
360        let summary = RunSummary::new(&plan);
361
362        let snap = summary
363            .journal
364            .plan_snapshot()
365            .expect("plan_snapshot() must be Some after RunSummary::new()");
366
367        assert_eq!(snap.export_name, plan.export_name);
368        assert_eq!(snap.validate, plan.validate);
369        assert_eq!(snap.reconcile, plan.reconcile);
370        assert_eq!(snap.resume, plan.resume);
371        assert_eq!(snap.batch_size, plan.tuning.batch_size);
372    }
373
374    /// The journal's `run_id` must match the `RunSummary`'s `run_id`.
375    #[test]
376    fn run_summary_journal_run_id_matches_summary_run_id() {
377        let plan = minimal_plan();
378        let summary = RunSummary::new(&plan);
379        assert_eq!(
380            summary.journal.run_id, summary.run_id,
381            "journal run_id must match summary run_id"
382        );
383    }
384
385    // ─── Rejected plan gate ──────────────────────────────────────────────────
386
387    /// Gap 7 — `run_export_job` bails before execution when `validate_plan` returns
388    /// a `Rejected` diagnostic.  The wiring is in `run_export_job` (this file,
389    /// ~line 210): if `rejected` is non-empty, the function returns `anyhow::bail!`.
390    ///
391    /// This test verifies the *condition* that triggers the bail: that `validate_plan`
392    /// does in fact produce a `Rejected` diagnostic for the stdout+split combination.
393    /// The gate itself (`run_export_job`) cannot be called directly in tests because
394    /// it requires a live database connection and config; we test its precondition here.
395    #[test]
396    fn rejected_plan_produces_rejected_diagnostic_blocking_run_export_job() {
397        let mut plan = minimal_plan();
398        // stdout + max_file_size triggers check_stdout_split → Rejected.
399        plan.destination.destination_type = DestinationType::Stdout;
400        plan.max_file_size_bytes = Some(10 * 1024 * 1024);
401
402        let diags = validate_plan(&plan);
403        let rejected_count = diags
404            .iter()
405            .filter(|d| d.level == DiagnosticLevel::Rejected)
406            .count();
407
408        assert!(
409            rejected_count > 0,
410            "stdout + max_file_size must produce a Rejected diagnostic so that \
411             run_export_job bails before calling run_with_reconnect; got: {:?}",
412            diags
413                .iter()
414                .map(|d| (&d.rule, &d.level))
415                .collect::<Vec<_>>()
416        );
417    }
418
419    /// stdout + chunked strategy also triggers a Rejected diagnostic (check_stdout_chunked).
420    #[test]
421    fn rejected_plan_stdout_chunked_blocks_run_export_job() {
422        use crate::plan::ChunkedPlan;
423        let mut plan = minimal_plan();
424        plan.destination.destination_type = DestinationType::Stdout;
425        plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
426            column: "id".into(),
427            chunk_size: 1000,
428            chunk_count: None,
429            parallel: 1,
430            dense: false,
431            by_days: None,
432            max_attempts: 3,
433            checkpoint: false,
434        });
435
436        let diags = validate_plan(&plan);
437        assert!(
438            diags.iter().any(|d| d.level == DiagnosticLevel::Rejected),
439            "stdout + chunked must produce a Rejected diagnostic"
440        );
441    }
442
443    // ─── synthetic_failed_summary ────────────────────────────────────────────
444
445    /// Pre-`RunSummary::new` failures (plan-build error, plan-validation
446    /// rejection) still need to be aggregated.  `synthetic_failed_summary`
447    /// produces a minimally-populated summary that aggregation can consume
448    /// without panicking.
449    #[test]
450    fn synthetic_failed_summary_carries_error_and_status() {
451        let err = anyhow::anyhow!("could not connect to source: timeout");
452        let s = job::synthetic_failed_summary("orders", &err);
453        assert_eq!(s.export_name, "orders");
454        assert_eq!(s.status, "failed");
455        assert_eq!(
456            s.error_message.as_deref(),
457            Some("could not connect to source: timeout")
458        );
459        assert!(
460            s.run_id.starts_with("orders_"),
461            "run_id must be derived from export name, got {}",
462            s.run_id
463        );
464        // Aggregation reads these fields directly — they must default to zero.
465        assert_eq!(s.total_rows, 0);
466        assert_eq!(s.files_produced, 0);
467        assert_eq!(s.bytes_written, 0);
468        assert_eq!(s.duration_ms, 0);
469    }
470
471    /// `entry_from_summary` must faithfully copy fields the aggregate cares
472    /// about.  This guards against silent drift if `RunAggregateEntry` or
473    /// `RunSummary` gain new fields.
474    #[test]
475    fn aggregate_entry_from_summary_copies_observable_fields() {
476        let plan = minimal_plan();
477        let mut summary = RunSummary::new(&plan);
478        summary.status = "success".into();
479        summary.total_rows = 12_345;
480        summary.files_produced = 3;
481        summary.bytes_written = 9_876_543;
482        summary.duration_ms = 5_000;
483
484        let entry = aggregate::entry_from_summary(&summary);
485        assert_eq!(entry.export_name, summary.export_name);
486        assert_eq!(entry.status, "success");
487        assert_eq!(entry.run_id, summary.run_id);
488        assert_eq!(entry.rows, 12_345);
489        assert_eq!(entry.files, 3);
490        assert_eq!(entry.bytes, 9_876_543);
491        assert_eq!(entry.duration_ms, 5_000);
492        assert_eq!(entry.mode, summary.mode);
493        assert_eq!(entry.error_message, None);
494    }
495}