Skip to main content

rivet/pipeline/
mod.rs

1//! **Layer: Coordinator** (planning → execution → persistence/observability)
2//!
3//! `pipeline/mod.rs` is the only module allowed to bridge all three layers.
4//! It reads a resolved plan (planning), dispatches to execution modules, then
5//! records metrics and sends notifications (persistence/observability).
6//!
7//! See `docs/adr/0003-layer-classification.md` for the full module taxonomy.
8
9mod aggregate;
10mod apply_cmd;
11pub(crate) mod chunked;
12mod cli;
13mod finalize;
14pub(crate) mod ipc;
15mod job;
16mod manifest_writer;
17mod parallel_children;
18pub(crate) mod parent_ui;
19mod plan_cmd;
20pub(crate) mod progress;
21mod reconcile_cmd;
22mod repair_cmd;
23pub(crate) mod report;
24mod resume_decisions;
25mod retry;
26// The `rivet run` orchestrator (~290 LOC) lives next door so this facade
27// stays a thin re-export layer.  Module name shadows `pub fn run` below;
28// the duplicate is resolved by Rust's namespace rules (modules live in
29// the type namespace, fns in the value namespace) and unambiguous at
30// every call site (`pipeline::run(...)` is the function).
31mod run;
32mod single;
33mod sink;
34mod summary;
35mod validate;
36mod validate_cmd;
37mod validate_manifest;
38
39// ── Public API surface (consumed by `src/cli/dispatch.rs` + binaries) ──────
40//
41// These items are the contract the binary depends on.  Adding to this list
42// is an API change that requires a release-note entry; removing or
43// renaming requires a deprecation cycle.
44
45pub use apply_cmd::run_apply_command;
46pub use cli::{
47    reset_chunk_checkpoint, reset_chunk_checkpoints_stuck, reset_state, show_chunk_checkpoint,
48    show_files, show_journal, show_metrics, show_progression, show_state,
49};
50pub use plan_cmd::{PlanOutputFormat, run_plan_command};
51pub use reconcile_cmd::{ReconcileOutputFormat, run_reconcile_command};
52pub use repair_cmd::{RepairOutputFormat, RepairReportSource, run_repair_command};
53pub use validate_cmd::{ValidateOutputFormat, ValidateTarget, run_validate_command};
54
55// `RunSummary` is consumed by `notify::*` (via the Coordinator path) plus
56// integration-test fixtures.  It is the canonical observability struct so
57// it stays in the regular public surface.
58pub use summary::RunSummary;
59
60// ── Crate-internal cross-module use ────────────────────────────────────────
61
62pub(crate) use job::run_export_job_with_chunk_source;
63#[cfg(test)]
64#[allow(unused_imports)]
65pub(crate) use retry::is_transient;
66
67// ── Test-only surface ──────────────────────────────────────────────────────
68//
69// The integration tests in `tests/*.rs` exercise the trust-contract writers,
70// readers, and decision logic without spinning up a full pipeline.  These
71// items are NOT part of the public CLI contract — operators get them only
72// transitively (via `summary.json`, `manifest.json`, `--validate`, etc.).
73//
74// Hidden behind `#[doc(hidden)] pub mod for_tests` so they don't pollute
75// the rendered crate docs and so a renaming refactor here is a clear
76// "test-only" change rather than appearing as a public API break.
77//
78// Convention matches the existing `destination_for_tests` window in
79// `lib.rs`: tests reach these via `rivet::pipeline::for_tests::*`.
80
81#[doc(hidden)]
82pub mod for_tests {
83    pub use super::chunked::generate_chunks;
84    pub use super::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
85    pub use super::report::{RunReport, report_dir, write_run_report};
86    pub use super::resume_decisions::{
87        PartDecision, QuarantineReason, ResumeDecision, ResumePlan, UntrackedDecision,
88        build_resume_plan,
89    };
90    pub use super::retry::{RetryClass, classify_error};
91    pub use super::validate::validate_output;
92    pub use super::validate_manifest::{
93        Failure as ManifestVerificationFailure, ManifestVerification, verify_at_destination,
94    };
95    pub use crate::plan::build_time_window_query;
96}
97
98// Backwards-compat re-exports at the crate root so existing test files
99// keep compiling without a sweeping import-site update.  Each is delegated
100// to `for_tests::*`; new test code should import from `for_tests` directly.
101//
102// `#[allow(unused_imports)]` because the bin target's dead-code analysis
103// doesn't see the integration tests that consume these — same situation
104// as `RunSummary::stub_for_testing`.
105#[doc(hidden)]
106#[allow(unused_imports)]
107pub use for_tests::{
108    ManifestBuilder, ManifestVerification, ManifestVerificationFailure, PartDecision,
109    QuarantineReason, ResumeDecision, ResumePlan, RetryClass, RunReport, UntrackedDecision,
110    WriteOutcome, build_resume_plan, build_time_window_query, classify_error, generate_chunks,
111    report_dir, validate_output, verify_at_destination, write_manifest, write_run_report,
112};
113
114// The orchestrator and its `RunOptions` live in `run.rs`.  Re-exported
115// here so external call sites keep using `pipeline::run(...)` and
116// `pipeline::RunOptions`.  Multi-export render-mode flags ride along
117// because `RunSummary::print` and the in-place card renderer read them.
118pub use run::{RunOptions, run};
119#[allow(unused_imports)] // `multi_export_concurrent` is wired for future use
120pub(crate) use run::{multi_export_concurrent, multi_export_mode};
121
122pub(crate) fn format_bytes(b: u64) -> String {
123    if b >= 1_073_741_824 {
124        format!("{:.1} GB", b as f64 / 1_073_741_824.0)
125    } else if b >= 1_048_576 {
126        format!("{:.1} MB", b as f64 / 1_048_576.0)
127    } else if b >= 1024 {
128        format!("{:.1} KB", b as f64 / 1024.0)
129    } else {
130        format!("{} B", b)
131    }
132}
133
134/// Strip the trailing recovery-hint portion of a chunked-pipeline error
135/// message produced by `pipeline::chunked`.  Returns the cause prefix and
136/// whether a chunked-checkpoint hint was detected.
137///
138/// Hints emitted by `pipeline::chunked` always follow the pattern
139/// `<cause>; <connector> \`rivet …\` …`, so we cut at the first `; ` whose
140/// remainder contains a backtick-quoted `rivet` invocation.
141///
142/// Used by both the per-export card renderer (`parent_ui`) and the run
143/// aggregator (`aggregate`) so the long inline command doesn't wrap, distort
144/// the in-place card layout, and doesn't repeat the consolidated recovery
145/// block printed by the aggregator.
146pub(crate) fn strip_chunked_recovery_hint(msg: &str) -> (&str, bool) {
147    let mut pos = 0;
148    while let Some(off) = msg[pos..].find("; ") {
149        let abs = pos + off;
150        let tail = &msg[abs + 2..];
151        if tail.contains("`rivet ") {
152            return (&msg[..abs], true);
153        }
154        pos = abs + 2;
155    }
156    (msg, false)
157}
158
159/// Truncate `s` to at most `max_chars` Unicode characters, appending `…`
160/// when truncated.  Returns `s` unchanged if already short enough.  Used by
161/// the in-place card renderer to keep every line within the chosen
162/// terminal width — line wrapping breaks the cursor-up redraw math and
163/// causes cards to drift down the screen.
164pub(crate) fn clamp_line(s: &str, max_chars: usize) -> String {
165    if max_chars == 0 {
166        return String::new();
167    }
168    if s.chars().count() <= max_chars {
169        return s.to_string();
170    }
171    let keep = max_chars.saturating_sub(1);
172    let mut out: String = s.chars().take(keep).collect();
173    out.push('…');
174    out
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::config::{SourceConfig, SourceType};
181    use crate::plan::{
182        CompressionType, DestinationConfig, DestinationType, DiagnosticLevel, ExtractionStrategy,
183        FormatType, MetaColumns, ResolvedRunPlan, validate_plan,
184    };
185    use crate::tuning::SourceTuning;
186
187    #[test]
188    fn test_format_bytes() {
189        assert_eq!(format_bytes(500), "500 B");
190        assert_eq!(format_bytes(1024), "1.0 KB");
191        assert_eq!(format_bytes(1536), "1.5 KB");
192        assert_eq!(format_bytes(1_048_576), "1.0 MB");
193        assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
194        assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
195    }
196
197    #[test]
198    fn strip_chunked_recovery_hint_strips_use_form() {
199        let m = "export 'users': chunk checkpoint run 'users_x' still in progress; \
200                 use `rivet run --config foo.yaml --export users --resume` or \
201                 `rivet state reset-chunks --config foo.yaml --export users`";
202        let (cause, hinted) = strip_chunked_recovery_hint(m);
203        assert!(hinted);
204        assert_eq!(
205            cause,
206            "export 'users': chunk checkpoint run 'users_x' still in progress"
207        );
208    }
209
210    #[test]
211    fn strip_chunked_recovery_hint_strips_fix_errors_form() {
212        let m = "export 'a': chunk checkpoint incomplete (3 tasks not completed); \
213                 fix errors and `rivet run --config c.yaml --export a --resume` or \
214                 `rivet state reset-chunks --config c.yaml --export a`";
215        let (cause, hinted) = strip_chunked_recovery_hint(m);
216        assert!(hinted);
217        assert_eq!(
218            cause,
219            "export 'a': chunk checkpoint incomplete (3 tasks not completed)"
220        );
221    }
222
223    #[test]
224    fn strip_chunked_recovery_hint_passthrough_when_no_hint() {
225        let m = "export 'q': source connection refused; retry exhausted";
226        let (cause, hinted) = strip_chunked_recovery_hint(m);
227        assert!(!hinted);
228        assert_eq!(cause, m);
229    }
230
231    #[test]
232    fn clamp_line_truncates_with_ellipsis() {
233        assert_eq!(clamp_line("short", 80), "short");
234        assert_eq!(clamp_line("hello world", 8), "hello w…");
235        let s = "αβγδ".repeat(50);
236        let out = clamp_line(&s, 10);
237        assert_eq!(out.chars().count(), 10);
238        assert!(out.ends_with('…'));
239    }
240
241    #[test]
242    fn format_bytes_boundary_values() {
243        assert_eq!(format_bytes(0), "0 B");
244        assert_eq!(format_bytes(1), "1 B");
245        assert_eq!(format_bytes(1023), "1023 B");
246        assert_eq!(format_bytes(1024), "1.0 KB");
247        assert_eq!(format_bytes(1025), "1.0 KB");
248        assert_eq!(format_bytes(1_048_575), "1024.0 KB");
249        assert_eq!(format_bytes(1_048_576), "1.0 MB");
250        assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
251        assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
252    }
253
254    fn minimal_plan() -> ResolvedRunPlan {
255        ResolvedRunPlan {
256            export_name: "test_export".into(),
257            base_query: "SELECT 1".into(),
258            strategy: ExtractionStrategy::Snapshot,
259            format: FormatType::Parquet,
260            compression: CompressionType::default(),
261            compression_level: None,
262            max_file_size_bytes: None,
263            skip_empty: false,
264            meta_columns: MetaColumns::default(),
265            destination: DestinationConfig {
266                destination_type: DestinationType::Local,
267                path: Some("./out".into()),
268                ..Default::default()
269            },
270            quality: None,
271            tuning: SourceTuning::from_config(None),
272            tuning_profile_label: "balanced (default)".into(),
273            validate: false,
274            reconcile: false,
275            resume: false,
276            source: SourceConfig {
277                source_type: SourceType::Postgres,
278                url: Some("postgresql://localhost/test".into()),
279                url_env: None,
280                url_file: None,
281                host: None,
282                port: None,
283                user: None,
284                password: None,
285                password_env: None,
286                database: None,
287                environment: None,
288                tuning: None,
289                tls: None,
290            },
291            column_overrides: Default::default(),
292            schema_drift_policy: Default::default(),
293            shape_drift_warn_factor: 2.0,
294            parquet: None,
295        }
296    }
297
298    #[test]
299    fn test_run_summary_fields() {
300        let plan = minimal_plan();
301        let summary = RunSummary::new(&plan);
302        assert_eq!(summary.export_name, "test_export");
303        assert_eq!(summary.status, "running");
304        assert_eq!(summary.total_rows, 0);
305        assert_eq!(summary.files_produced, 0);
306        assert_eq!(summary.tuning_profile, "balanced (default)");
307        assert_eq!(summary.batch_size, 10_000);
308        assert_eq!(summary.format, "parquet");
309        assert_eq!(summary.mode, "full");
310        assert!(
311            summary.run_id.starts_with("test_export_"),
312            "run_id should start with export name, got: {}",
313            summary.run_id
314        );
315    }
316
317    // ─── RunSummary::new() journal invariants ────────────────────────────────
318
319    /// `RunSummary::new()` must immediately record a `PlanResolved` event as the
320    /// first journal entry.  This satisfies the "what was planned?" query from ADR-0001.
321    #[test]
322    fn run_summary_new_records_plan_resolved_as_first_event() {
323        let plan = minimal_plan();
324        let summary = RunSummary::new(&plan);
325
326        assert!(
327            !summary.journal.entries.is_empty(),
328            "journal must have at least one entry after RunSummary::new()"
329        );
330        assert!(
331            matches!(
332                summary.journal.entries[0].event,
333                crate::journal::RunEvent::PlanResolved(_)
334            ),
335            "first journal event must be PlanResolved, got: {:?}",
336            summary.journal.entries[0].event
337        );
338    }
339
340    /// The `PlanSnapshot` recorded inside `PlanResolved` must faithfully capture
341    /// key fields from the `ResolvedRunPlan`.
342    #[test]
343    fn run_summary_plan_snapshot_matches_plan_fields() {
344        let plan = minimal_plan();
345        let summary = RunSummary::new(&plan);
346
347        let snap = summary
348            .journal
349            .plan_snapshot()
350            .expect("plan_snapshot() must be Some after RunSummary::new()");
351
352        assert_eq!(snap.export_name, plan.export_name);
353        assert_eq!(snap.validate, plan.validate);
354        assert_eq!(snap.reconcile, plan.reconcile);
355        assert_eq!(snap.resume, plan.resume);
356        assert_eq!(snap.batch_size, plan.tuning.batch_size);
357    }
358
359    /// The journal's `run_id` must match the `RunSummary`'s `run_id`.
360    #[test]
361    fn run_summary_journal_run_id_matches_summary_run_id() {
362        let plan = minimal_plan();
363        let summary = RunSummary::new(&plan);
364        assert_eq!(
365            summary.journal.run_id, summary.run_id,
366            "journal run_id must match summary run_id"
367        );
368    }
369
370    // ─── Rejected plan gate ──────────────────────────────────────────────────
371
372    /// Gap 7 — `run_export_job` bails before execution when `validate_plan` returns
373    /// a `Rejected` diagnostic.  The wiring is in `run_export_job` (this file,
374    /// ~line 210): if `rejected` is non-empty, the function returns `anyhow::bail!`.
375    ///
376    /// This test verifies the *condition* that triggers the bail: that `validate_plan`
377    /// does in fact produce a `Rejected` diagnostic for the stdout+split combination.
378    /// The gate itself (`run_export_job`) cannot be called directly in tests because
379    /// it requires a live database connection and config; we test its precondition here.
380    #[test]
381    fn rejected_plan_produces_rejected_diagnostic_blocking_run_export_job() {
382        let mut plan = minimal_plan();
383        // stdout + max_file_size triggers check_stdout_split → Rejected.
384        plan.destination.destination_type = DestinationType::Stdout;
385        plan.max_file_size_bytes = Some(10 * 1024 * 1024);
386
387        let diags = validate_plan(&plan);
388        let rejected_count = diags
389            .iter()
390            .filter(|d| d.level == DiagnosticLevel::Rejected)
391            .count();
392
393        assert!(
394            rejected_count > 0,
395            "stdout + max_file_size must produce a Rejected diagnostic so that \
396             run_export_job bails before calling run_with_reconnect; got: {:?}",
397            diags
398                .iter()
399                .map(|d| (&d.rule, &d.level))
400                .collect::<Vec<_>>()
401        );
402    }
403
404    /// stdout + chunked strategy also triggers a Rejected diagnostic (check_stdout_chunked).
405    #[test]
406    fn rejected_plan_stdout_chunked_blocks_run_export_job() {
407        use crate::plan::ChunkedPlan;
408        let mut plan = minimal_plan();
409        plan.destination.destination_type = DestinationType::Stdout;
410        plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
411            column: "id".into(),
412            chunk_size: 1000,
413            chunk_count: None,
414            parallel: 1,
415            dense: false,
416            by_days: None,
417            max_attempts: 3,
418            checkpoint: false,
419        });
420
421        let diags = validate_plan(&plan);
422        assert!(
423            diags.iter().any(|d| d.level == DiagnosticLevel::Rejected),
424            "stdout + chunked must produce a Rejected diagnostic"
425        );
426    }
427
428    // ─── synthetic_failed_summary ────────────────────────────────────────────
429
430    /// Pre-`RunSummary::new` failures (plan-build error, plan-validation
431    /// rejection) still need to be aggregated.  `synthetic_failed_summary`
432    /// produces a minimally-populated summary that aggregation can consume
433    /// without panicking.
434    #[test]
435    fn synthetic_failed_summary_carries_error_and_status() {
436        let err = anyhow::anyhow!("could not connect to source: timeout");
437        let s = job::synthetic_failed_summary("orders", &err);
438        assert_eq!(s.export_name, "orders");
439        assert_eq!(s.status, "failed");
440        assert_eq!(
441            s.error_message.as_deref(),
442            Some("could not connect to source: timeout")
443        );
444        assert!(
445            s.run_id.starts_with("orders_"),
446            "run_id must be derived from export name, got {}",
447            s.run_id
448        );
449        // Aggregation reads these fields directly — they must default to zero.
450        assert_eq!(s.total_rows, 0);
451        assert_eq!(s.files_produced, 0);
452        assert_eq!(s.bytes_written, 0);
453        assert_eq!(s.duration_ms, 0);
454    }
455
456    /// `entry_from_summary` must faithfully copy fields the aggregate cares
457    /// about.  This guards against silent drift if `RunAggregateEntry` or
458    /// `RunSummary` gain new fields.
459    #[test]
460    fn aggregate_entry_from_summary_copies_observable_fields() {
461        let plan = minimal_plan();
462        let mut summary = RunSummary::new(&plan);
463        summary.status = "success".into();
464        summary.total_rows = 12_345;
465        summary.files_produced = 3;
466        summary.bytes_written = 9_876_543;
467        summary.duration_ms = 5_000;
468
469        let entry = aggregate::entry_from_summary(&summary);
470        assert_eq!(entry.export_name, summary.export_name);
471        assert_eq!(entry.status, "success");
472        assert_eq!(entry.run_id, summary.run_id);
473        assert_eq!(entry.rows, 12_345);
474        assert_eq!(entry.files, 3);
475        assert_eq!(entry.bytes, 9_876_543);
476        assert_eq!(entry.duration_ms, 5_000);
477        assert_eq!(entry.mode, summary.mode);
478        assert_eq!(entry.error_message, None);
479    }
480}