Skip to main content

rivet/pipeline/
mod.rs

1//! **Layer: Coordinator** (planning → execution → persistence/observability)
2//!
3//! `pipeline/mod.rs` is the only module allowed to bridge all three layers.
4//! It reads a resolved plan (planning), dispatches to execution modules, then
5//! records metrics and sends notifications (persistence/observability).
6//!
7//! See `docs/adr/0003-layer-classification.md` for the full module taxonomy.
8
9mod aggregate;
10mod apply_cmd;
11pub(crate) mod chunked;
12mod cli;
13mod commit;
14mod finalize;
15pub(crate) mod ipc;
16mod job;
17mod keyset;
18mod manifest_reconcile;
19mod manifest_writer;
20mod parallel_children;
21pub(crate) mod parent_ui;
22mod partition_expand;
23mod plan_cmd;
24pub(crate) mod progress;
25mod reconcile_cmd;
26mod repair_cmd;
27pub(crate) mod report;
28mod resume_decisions;
29// `pub(crate)` so `error::classify_exit` can reach `retry::classify_error`
30// (transient → exit-code 2) without routing through the test-only re-export.
31pub(crate) mod retry;
32// The `rivet run` orchestrator (~290 LOC) lives next door so this facade
33// stays a thin re-export layer.  Module name shadows `pub fn run` below;
34// the duplicate is resolved by Rust's namespace rules (modules live in
35// the type namespace, fns in the value namespace) and unambiguous at
36// every call site (`pipeline::run(...)` is the function).
37mod run;
38mod run_store;
39mod schema_drift;
40mod single;
41mod sink;
42mod summary;
43mod validate;
44mod validate_cmd;
45mod validate_manifest;
46
47// ── Public API surface (consumed by `src/cli/dispatch.rs` + binaries) ──────
48//
49// These items are the contract the binary depends on.  Adding to this list
50// is an API change that requires a release-note entry; removing or
51// renaming requires a deprecation cycle.
52
53pub use apply_cmd::run_apply_command;
54pub use cli::{
55    reset_chunk_checkpoint, reset_chunk_checkpoints_stuck, reset_state, show_chunk_checkpoint,
56    show_files, show_journal, show_metrics, show_progression, show_state,
57};
58pub use plan_cmd::{PlanOutputFormat, run_plan_command};
59pub use reconcile_cmd::{ReconcileOutputFormat, run_reconcile_command};
60pub use repair_cmd::{RepairOutputFormat, RepairReportSource, run_repair_command};
61pub use validate_cmd::{ValidateOutputFormat, ValidateTarget, run_validate_command};
62
63// `RunSummary` is consumed by `notify::*` (via the Coordinator path) plus
64// integration-test fixtures.  It is the canonical observability struct so
65// it stays in the regular public surface.
66pub use summary::RunSummary;
67
68// ── Crate-internal cross-module use ────────────────────────────────────────
69
70pub(crate) use job::run_export_job_with_chunk_source;
71#[cfg(test)]
72#[allow(unused_imports)]
73pub(crate) use retry::is_transient;
74
75// ── Test-only surface ──────────────────────────────────────────────────────
76//
77// The integration tests in `tests/*.rs` exercise the trust-contract writers,
78// readers, and decision logic without spinning up a full pipeline.  These
79// items are NOT part of the public CLI contract — operators get them only
80// transitively (via `summary.json`, `manifest.json`, `--validate`, etc.).
81//
82// Hidden behind `#[doc(hidden)] pub mod for_tests` so they don't pollute
83// the rendered crate docs and so a renaming refactor here is a clear
84// "test-only" change rather than appearing as a public API break.
85//
86// Convention matches the existing `destination_for_tests` window in
87// `lib.rs`: tests reach these via `rivet::pipeline::for_tests::*`.
88
89#[doc(hidden)]
90pub mod for_tests {
91    pub use super::chunked::generate_chunks;
92    pub use super::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
93    pub use super::report::{RunReport, report_dir, write_run_report};
94    pub use super::resume_decisions::{
95        PartDecision, QuarantineReason, ResumeDecision, ResumePlan, UntrackedDecision,
96        build_resume_plan,
97    };
98    pub use super::retry::{RetryClass, classify_error};
99    pub use super::validate::validate_output;
100    pub use super::validate_manifest::{
101        Failure as ManifestVerificationFailure, ManifestVerification, verify_at_destination,
102    };
103    pub use crate::plan::build_time_window_query;
104}
105
106// Backwards-compat re-exports at the crate root so existing test files
107// keep compiling without a sweeping import-site update.  Each is delegated
108// to `for_tests::*`; new test code should import from `for_tests` directly.
109//
110// `#[allow(unused_imports)]` because the bin target's dead-code analysis
111// doesn't see the integration tests that consume these — same situation
112// as `RunSummary::stub_for_testing`.
113#[doc(hidden)]
114#[allow(unused_imports)]
115pub use for_tests::{
116    ManifestBuilder, ManifestVerification, ManifestVerificationFailure, PartDecision,
117    QuarantineReason, ResumeDecision, ResumePlan, RetryClass, RunReport, UntrackedDecision,
118    WriteOutcome, build_resume_plan, build_time_window_query, classify_error, generate_chunks,
119    report_dir, validate_output, verify_at_destination, write_manifest, write_run_report,
120};
121
122// The orchestrator and its `RunOptions` live in `run.rs`.  Re-exported
123// here so external call sites keep using `pipeline::run(...)` and
124// `pipeline::RunOptions`.  Multi-export render-mode flags ride along
125// because `RunSummary::print` and the in-place card renderer read them.
126pub use run::{RunOptions, run};
127#[allow(unused_imports)] // `multi_export_concurrent` is wired for future use
128pub(crate) use run::{multi_export_concurrent, multi_export_mode};
129
130pub(crate) fn format_bytes(b: u64) -> String {
131    if b >= 1_073_741_824 {
132        format!("{:.1} GB", b as f64 / 1_073_741_824.0)
133    } else if b >= 1_048_576 {
134        format!("{:.1} MB", b as f64 / 1_048_576.0)
135    } else if b >= 1024 {
136        format!("{:.1} KB", b as f64 / 1024.0)
137    } else {
138        format!("{} B", b)
139    }
140}
141
142/// Strip the trailing recovery-hint portion of a chunked-pipeline error
143/// message produced by `pipeline::chunked`.  Returns the cause prefix and
144/// whether a chunked-checkpoint hint was detected.
145///
146/// Hints emitted by `pipeline::chunked` always follow the pattern
147/// `<cause>; <connector> \`rivet …\` …`, so we cut at the first `; ` whose
148/// remainder contains a backtick-quoted `rivet` invocation.
149///
150/// Used by both the per-export card renderer (`parent_ui`) and the run
151/// aggregator (`aggregate`) so the long inline command doesn't wrap, distort
152/// the in-place card layout, and doesn't repeat the consolidated recovery
153/// block printed by the aggregator.
154pub(crate) fn strip_chunked_recovery_hint(msg: &str) -> (&str, bool) {
155    let mut pos = 0;
156    while let Some(off) = msg[pos..].find("; ") {
157        let abs = pos + off;
158        let tail = &msg[abs + 2..];
159        if tail.contains("`rivet ") {
160            return (&msg[..abs], true);
161        }
162        pos = abs + 2;
163    }
164    (msg, false)
165}
166
167/// Truncate `s` to at most `max_chars` Unicode characters, appending `…`
168/// when truncated.  Returns `s` unchanged if already short enough.  Used by
169/// the in-place card renderer to keep every line within the chosen
170/// terminal width — line wrapping breaks the cursor-up redraw math and
171/// causes cards to drift down the screen.
172pub(crate) fn clamp_line(s: &str, max_chars: usize) -> String {
173    if max_chars == 0 {
174        return String::new();
175    }
176    if s.chars().count() <= max_chars {
177        return s.to_string();
178    }
179    let keep = max_chars.saturating_sub(1);
180    let mut out: String = s.chars().take(keep).collect();
181    out.push('…');
182    out
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use crate::config::{SourceConfig, SourceType};
189    use crate::plan::{
190        CompressionType, DestinationConfig, DestinationType, DiagnosticLevel, ExtractionStrategy,
191        FormatType, MetaColumns, ResolvedRunPlan, validate_plan,
192    };
193    use crate::tuning::SourceTuning;
194
195    #[test]
196    fn test_format_bytes() {
197        assert_eq!(format_bytes(500), "500 B");
198        assert_eq!(format_bytes(1024), "1.0 KB");
199        assert_eq!(format_bytes(1536), "1.5 KB");
200        assert_eq!(format_bytes(1_048_576), "1.0 MB");
201        assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
202        assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
203    }
204
205    #[test]
206    fn strip_chunked_recovery_hint_strips_use_form() {
207        let m = "export 'users': chunk checkpoint run 'users_x' still in progress; \
208                 use `rivet run --config foo.yaml --export users --resume` or \
209                 `rivet state reset-chunks --config foo.yaml --export users`";
210        let (cause, hinted) = strip_chunked_recovery_hint(m);
211        assert!(hinted);
212        assert_eq!(
213            cause,
214            "export 'users': chunk checkpoint run 'users_x' still in progress"
215        );
216    }
217
218    #[test]
219    fn strip_chunked_recovery_hint_strips_fix_errors_form() {
220        let m = "export 'a': chunk checkpoint incomplete (3 tasks not completed); \
221                 fix errors and `rivet run --config c.yaml --export a --resume` or \
222                 `rivet state reset-chunks --config c.yaml --export a`";
223        let (cause, hinted) = strip_chunked_recovery_hint(m);
224        assert!(hinted);
225        assert_eq!(
226            cause,
227            "export 'a': chunk checkpoint incomplete (3 tasks not completed)"
228        );
229    }
230
231    #[test]
232    fn strip_chunked_recovery_hint_passthrough_when_no_hint() {
233        let m = "export 'q': source connection refused; retry exhausted";
234        let (cause, hinted) = strip_chunked_recovery_hint(m);
235        assert!(!hinted);
236        assert_eq!(cause, m);
237    }
238
239    #[test]
240    fn clamp_line_truncates_with_ellipsis() {
241        assert_eq!(clamp_line("short", 80), "short");
242        assert_eq!(clamp_line("hello world", 8), "hello w…");
243        let s = "αβγδ".repeat(50);
244        let out = clamp_line(&s, 10);
245        assert_eq!(out.chars().count(), 10);
246        assert!(out.ends_with('…'));
247    }
248
249    #[test]
250    fn format_bytes_boundary_values() {
251        assert_eq!(format_bytes(0), "0 B");
252        assert_eq!(format_bytes(1), "1 B");
253        assert_eq!(format_bytes(1023), "1023 B");
254        assert_eq!(format_bytes(1024), "1.0 KB");
255        assert_eq!(format_bytes(1025), "1.0 KB");
256        assert_eq!(format_bytes(1_048_575), "1024.0 KB");
257        assert_eq!(format_bytes(1_048_576), "1.0 MB");
258        assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
259        assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
260    }
261
262    fn minimal_plan() -> ResolvedRunPlan {
263        ResolvedRunPlan {
264            export_name: "test_export".into(),
265            base_query: "SELECT 1".into(),
266            strategy: ExtractionStrategy::Snapshot,
267            format: FormatType::Parquet,
268            compression: CompressionType::default(),
269            compression_level: None,
270            max_file_size_bytes: None,
271            skip_empty: false,
272            meta_columns: MetaColumns::default(),
273            destination: DestinationConfig {
274                destination_type: DestinationType::Local,
275                path: Some("./out".into()),
276                ..Default::default()
277            },
278            quality: None,
279            tuning: SourceTuning::from_config(None),
280            tuning_profile_label: "balanced (default)".into(),
281            validate: false,
282            reconcile: false,
283            resume: false,
284            source: SourceConfig {
285                source_type: SourceType::Postgres,
286                url: Some("postgresql://localhost/test".into()),
287                url_env: None,
288                url_file: None,
289                host: None,
290                port: None,
291                user: None,
292                password: None,
293                password_env: None,
294                database: None,
295                environment: None,
296                tuning: None,
297                tls: None,
298            },
299            column_overrides: Default::default(),
300            verify: crate::config::VerifyMode::Size,
301            schema_drift_policy: Default::default(),
302            shape_drift_warn_factor: 2.0,
303            parquet: None,
304        }
305    }
306
307    #[test]
308    fn test_run_summary_fields() {
309        let plan = minimal_plan();
310        let summary = RunSummary::new(&plan);
311        assert_eq!(summary.export_name, "test_export");
312        assert_eq!(summary.status, "running");
313        assert_eq!(summary.total_rows, 0);
314        assert_eq!(summary.files_produced, 0);
315        assert_eq!(summary.tuning_profile, "balanced (default)");
316        assert_eq!(summary.batch_size, 10_000);
317        assert_eq!(summary.format, "parquet");
318        assert_eq!(summary.mode, "full");
319        assert!(
320            summary.run_id.starts_with("test_export_"),
321            "run_id should start with export name, got: {}",
322            summary.run_id
323        );
324    }
325
326    // ─── RunSummary::new() journal invariants ────────────────────────────────
327
328    /// `RunSummary::new()` must immediately record a `PlanResolved` event as the
329    /// first journal entry.  This satisfies the "what was planned?" query from ADR-0001.
330    #[test]
331    fn run_summary_new_records_plan_resolved_as_first_event() {
332        let plan = minimal_plan();
333        let summary = RunSummary::new(&plan);
334
335        assert!(
336            !summary.journal.entries.is_empty(),
337            "journal must have at least one entry after RunSummary::new()"
338        );
339        assert!(
340            matches!(
341                summary.journal.entries[0].event,
342                crate::journal::RunEvent::PlanResolved(_)
343            ),
344            "first journal event must be PlanResolved, got: {:?}",
345            summary.journal.entries[0].event
346        );
347    }
348
349    /// The `PlanSnapshot` recorded inside `PlanResolved` must faithfully capture
350    /// key fields from the `ResolvedRunPlan`.
351    #[test]
352    fn run_summary_plan_snapshot_matches_plan_fields() {
353        let plan = minimal_plan();
354        let summary = RunSummary::new(&plan);
355
356        let snap = summary
357            .journal
358            .plan_snapshot()
359            .expect("plan_snapshot() must be Some after RunSummary::new()");
360
361        assert_eq!(snap.export_name, plan.export_name);
362        assert_eq!(snap.validate, plan.validate);
363        assert_eq!(snap.reconcile, plan.reconcile);
364        assert_eq!(snap.resume, plan.resume);
365        assert_eq!(snap.batch_size, plan.tuning.batch_size);
366    }
367
368    /// The journal's `run_id` must match the `RunSummary`'s `run_id`.
369    #[test]
370    fn run_summary_journal_run_id_matches_summary_run_id() {
371        let plan = minimal_plan();
372        let summary = RunSummary::new(&plan);
373        assert_eq!(
374            summary.journal.run_id, summary.run_id,
375            "journal run_id must match summary run_id"
376        );
377    }
378
379    // ─── Rejected plan gate ──────────────────────────────────────────────────
380
381    /// Gap 7 — `run_export_job` bails before execution when `validate_plan` returns
382    /// a `Rejected` diagnostic.  The wiring is in `run_export_job` (this file,
383    /// ~line 210): if `rejected` is non-empty, the function returns `anyhow::bail!`.
384    ///
385    /// This test verifies the *condition* that triggers the bail: that `validate_plan`
386    /// does in fact produce a `Rejected` diagnostic for the stdout+split combination.
387    /// The gate itself (`run_export_job`) cannot be called directly in tests because
388    /// it requires a live database connection and config; we test its precondition here.
389    #[test]
390    fn rejected_plan_produces_rejected_diagnostic_blocking_run_export_job() {
391        let mut plan = minimal_plan();
392        // stdout + max_file_size triggers check_stdout_split → Rejected.
393        plan.destination.destination_type = DestinationType::Stdout;
394        plan.max_file_size_bytes = Some(10 * 1024 * 1024);
395
396        let diags = validate_plan(&plan);
397        let rejected_count = diags
398            .iter()
399            .filter(|d| d.level == DiagnosticLevel::Rejected)
400            .count();
401
402        assert!(
403            rejected_count > 0,
404            "stdout + max_file_size must produce a Rejected diagnostic so that \
405             run_export_job bails before calling run_with_reconnect; got: {:?}",
406            diags
407                .iter()
408                .map(|d| (&d.rule, &d.level))
409                .collect::<Vec<_>>()
410        );
411    }
412
413    /// stdout + chunked strategy also triggers a Rejected diagnostic (check_stdout_chunked).
414    #[test]
415    fn rejected_plan_stdout_chunked_blocks_run_export_job() {
416        use crate::plan::ChunkedPlan;
417        let mut plan = minimal_plan();
418        plan.destination.destination_type = DestinationType::Stdout;
419        plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
420            column: "id".into(),
421            chunk_size: 1000,
422            chunk_count: None,
423            parallel: 1,
424            dense: false,
425            by_days: None,
426            max_attempts: 3,
427            checkpoint: false,
428        });
429
430        let diags = validate_plan(&plan);
431        assert!(
432            diags.iter().any(|d| d.level == DiagnosticLevel::Rejected),
433            "stdout + chunked must produce a Rejected diagnostic"
434        );
435    }
436
437    // ─── synthetic_failed_summary ────────────────────────────────────────────
438
439    /// Pre-`RunSummary::new` failures (plan-build error, plan-validation
440    /// rejection) still need to be aggregated.  `synthetic_failed_summary`
441    /// produces a minimally-populated summary that aggregation can consume
442    /// without panicking.
443    #[test]
444    fn synthetic_failed_summary_carries_error_and_status() {
445        let err = anyhow::anyhow!("could not connect to source: timeout");
446        let s = job::synthetic_failed_summary("orders", &err);
447        assert_eq!(s.export_name, "orders");
448        assert_eq!(s.status, "failed");
449        assert_eq!(
450            s.error_message.as_deref(),
451            Some("could not connect to source: timeout")
452        );
453        assert!(
454            s.run_id.starts_with("orders_"),
455            "run_id must be derived from export name, got {}",
456            s.run_id
457        );
458        // Aggregation reads these fields directly — they must default to zero.
459        assert_eq!(s.total_rows, 0);
460        assert_eq!(s.files_produced, 0);
461        assert_eq!(s.bytes_written, 0);
462        assert_eq!(s.duration_ms, 0);
463    }
464
465    /// `entry_from_summary` must faithfully copy fields the aggregate cares
466    /// about.  This guards against silent drift if `RunAggregateEntry` or
467    /// `RunSummary` gain new fields.
468    #[test]
469    fn aggregate_entry_from_summary_copies_observable_fields() {
470        let plan = minimal_plan();
471        let mut summary = RunSummary::new(&plan);
472        summary.status = "success".into();
473        summary.total_rows = 12_345;
474        summary.files_produced = 3;
475        summary.bytes_written = 9_876_543;
476        summary.duration_ms = 5_000;
477
478        let entry = aggregate::entry_from_summary(&summary);
479        assert_eq!(entry.export_name, summary.export_name);
480        assert_eq!(entry.status, "success");
481        assert_eq!(entry.run_id, summary.run_id);
482        assert_eq!(entry.rows, 12_345);
483        assert_eq!(entry.files, 3);
484        assert_eq!(entry.bytes, 9_876_543);
485        assert_eq!(entry.duration_ms, 5_000);
486        assert_eq!(entry.mode, summary.mode);
487        assert_eq!(entry.error_message, None);
488    }
489}