Skip to main content

rivet/pipeline/
run.rs

1//! **Layer: Coordinator entrypoint** — the `rivet run` orchestrator.
2//!
3//! Single bridge between planning, execution, and persistence/observability.
4//! Owns the multi-export render-mode flags, decides between sequential vs
5//! thread-parallel vs process-parallel, and produces the run aggregate at
6//! the end.
7//!
8//! Lives in its own file so [`crate::pipeline`] (which is read as a facade
9//! by every other module) stays a thin re-export layer rather than a
10//! ~300-LOC orchestrator wrapped in mod-level declarations.
11
12use std::path::Path;
13use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::state::StateStore;
18
19use super::summary::RunSummary;
20use super::{aggregate, ipc, job, parallel_children, parent_ui, partition_expand};
21
22/// Per-run configuration flags passed from the CLI to the pipeline.
23///
24/// Replaces the previous pattern of threading 4+ positional `bool` arguments
25/// through `run`, `run_export_job`, and child-process invocations.  Named fields
26/// prevent silent argument transposition (e.g., `validate` and `reconcile`
27/// swapped).
28#[derive(Debug, Clone, Copy)]
29pub struct RunOptions<'a> {
30    pub validate: bool,
31    pub reconcile: bool,
32    pub resume: bool,
33    /// Override safety gates that would otherwise refuse to start the run.
34    ///
35    /// Currently used by ADR-0012 M8 — `--resume` against a prefix whose
36    /// `_SUCCESS` marker is present is refused unless `--force` is given,
37    /// so an operator cannot accidentally re-export over a verified
38    /// dataset.  Other gates may share the same flag in the future
39    /// (per ADR-0013: one `--force`, scoped to whichever gate it overrides).
40    pub force: bool,
41    pub params: Option<&'a std::collections::HashMap<String, String>>,
42}
43
44/// True when the current process is running more than one export in this
45/// `rivet run` invocation (sequential or `--parallel-exports`).  Per-export
46/// renderers (`RunSummary::print`, `ChunkProgress`) read this to switch to
47/// the compact one-line format and to suppress the indicatif chunk bar
48/// respectively, so 15 exports take 15 lines instead of 100+ and threads
49/// don't stack progress bars on top of each other.
50///
51/// Children of `--parallel-export-processes` always have `exports.len() == 1`
52/// in their own process so this flag stays `false` for them; the parent
53/// renders cards itself via `parent_ui`.
54pub(crate) static MULTI_EXPORT_MODE: AtomicBool = AtomicBool::new(false);
55
56/// True only when multiple exports run **concurrently** in the current
57/// process (i.e. `--parallel-exports`, threads).  Used to suppress
58/// per-export `indicatif` chunk progress bars whose terminal writes
59/// otherwise interleave across threads and corrupt each other.
60pub(crate) static MULTI_EXPORT_CONCURRENT: AtomicBool = AtomicBool::new(false);
61
62pub(crate) fn multi_export_mode() -> bool {
63    MULTI_EXPORT_MODE.load(AtomicOrdering::Relaxed)
64}
65
66#[allow(dead_code)] // kept for future renderers; flag is still set in `run` below.
67pub(crate) fn multi_export_concurrent() -> bool {
68    MULTI_EXPORT_CONCURRENT.load(AtomicOrdering::Relaxed)
69}
70
71fn print_json_summary(agg: &crate::state::RunAggregate) {
72    match serde_json::to_string_pretty(agg) {
73        Ok(json) => println!("{json}"),
74        Err(e) => eprintln!(
75            "rivet: error: failed to serialize run summary as JSON: {:#}",
76            e
77        ),
78    }
79}
80
81#[allow(clippy::too_many_arguments)] // CLI fan-in; surface stays stable per ADR-0013
82pub fn run(
83    config_path: &str,
84    export_name: Option<&str>,
85    validate: bool,
86    reconcile: bool,
87    resume: bool,
88    force: bool,
89    params: Option<&std::collections::HashMap<String, String>>,
90    parallel_exports_cli: bool,
91    parallel_export_processes_cli: bool,
92    summary_output: Option<&Path>,
93    json_output: bool,
94) -> Result<()> {
95    // F-NEW-B (0.7.5 audit): `--force` is scoped to whichever gate it
96    // overrides (today: the `_SUCCESS`-already-present refusal on
97    // resume).  When the operator passes `--force` without `--resume`,
98    // the flag is a no-op — surface that explicitly so a typo or
99    // copy-paste mistake does not pass silently.
100    if force && !resume {
101        log::warn!(
102            "--force without --resume is a no-op today (force only overrides the resume safety \
103             gate against a destination prefix whose _SUCCESS is already present)"
104        );
105    }
106    let config = Config::load_with_params(config_path, params)?;
107
108    let config_dir = Path::new(config_path)
109        .parent()
110        .unwrap_or(Path::new("."))
111        .to_path_buf();
112
113    let selected: Vec<&ExportConfig> = if let Some(name) = export_name {
114        let e = config
115            .exports
116            .iter()
117            .find(|e| e.name == name)
118            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
119        vec![e]
120    } else {
121        config.exports.iter().collect()
122    };
123
124    // Value-based partitioning: rewrite any `partition_by` export into one
125    // concrete child export per bucket *before* the run loop. Non-partitioned
126    // exports pass through. The owned vec must outlive the borrowed `exports`
127    // view rebuilt over it, so it is declared in the enclosing scope.
128    let partitioned = partition_expand::any_partitioned(&selected);
129    let expanded_owned: Vec<ExportConfig>;
130    let exports: Vec<&ExportConfig> = if partitioned {
131        expanded_owned = partition_expand::expand_partitioned_exports(
132            &selected,
133            &config.source,
134            &config_dir,
135            params,
136        )?;
137        expanded_owned.iter().collect()
138    } else {
139        selected
140    };
141
142    let opts = RunOptions {
143        validate,
144        reconcile,
145        resume,
146        force,
147        params,
148    };
149
150    let process_mode_requested = parallel_export_processes_cli || config.parallel_export_processes;
151    // Process-mode children re-exec `rivet run --export <name>` and re-load the
152    // config from disk, so they cannot see the synthesised partition child
153    // names. Force in-process execution when partitioning is active.
154    if partitioned && process_mode_requested {
155        log::warn!(
156            "partition_by: --parallel-export-processes is disabled with partitioned exports \
157             (child processes re-load the config and can't see synthesised partitions); \
158             running in-process"
159        );
160    }
161    let run_parallel_processes =
162        process_mode_requested && export_name.is_none() && exports.len() > 1 && !partitioned;
163
164    let started_at = chrono::Utc::now();
165
166    if run_parallel_processes {
167        // Run schema migrations once in the parent BEFORE forking children.
168        // Otherwise N children race for the exclusive write lock on a
169        // brand-new `.rivet_state.db` and `busy_timeout` is not enough to
170        // serialise them — most fail with `migration v1 failed: database is
171        // locked`.  After this open succeeds the schema is at the latest
172        // version and children's `StateStore::open` calls become idempotent
173        // (the `MIGRATIONS` loop is a no-op when `ver <= current`).
174        if let Err(e) = StateStore::open(config_path) {
175            return Err(anyhow::anyhow!(
176                "state: failed to initialize state DB before spawning children: {:#}",
177                e
178            ));
179        }
180
181        let (result, child_failures, stderr_dump) =
182            parallel_children::run_exports_as_child_processes(
183                config_path,
184                &exports,
185                validate,
186                reconcile,
187                resume,
188                force,
189                params,
190            );
191        let finished_at = chrono::Utc::now();
192        // Best-effort aggregate: open the state DB read-only-ish and reconstruct
193        // entries from the per-child `record_metric` rows.  Failure to open the
194        // DB here only suppresses the aggregate, not the run itself.
195        match StateStore::open(config_path) {
196            Ok(state) => {
197                let entries =
198                    aggregate::collect_child_entries(&state, &exports, started_at, &child_failures);
199                let agg = aggregate::build(
200                    entries,
201                    started_at,
202                    finished_at,
203                    Some(config_path),
204                    "parallel-processes",
205                );
206                aggregate::print(&agg);
207                aggregate::persist(&state, &agg, summary_output);
208                if json_output {
209                    print_json_summary(&agg);
210                }
211            }
212            Err(e) => log::warn!(
213                "aggregate: cannot open state DB to record run aggregate: {:#}",
214                e
215            ),
216        }
217        // Captured child stderr is printed AFTER the aggregate so the run
218        // summary stays immediately under the card stack — verbose log
219        // output sits below for triage when needed.
220        if !stderr_dump.is_empty() {
221            use std::io::Write;
222            let mut h = std::io::stderr().lock();
223            let _ = h.write_all(stderr_dump.as_bytes());
224            let _ = h.flush();
225        }
226        return result;
227    }
228
229    let run_parallel = (parallel_exports_cli || config.parallel_exports)
230        && export_name.is_none()
231        && exports.len() > 1;
232
233    // Compact-rendering hints for the per-export renderers.  Set once here so
234    // every code path below — sequential, `--parallel-exports`, the apply
235    // path, etc. — sees a consistent mode.  Restored at the end of the run
236    // so subsequent invocations within the same process (tests, library
237    // callers) start with a clean slate.
238    let multi_export = export_name.is_none() && exports.len() > 1;
239    let prev_multi = MULTI_EXPORT_MODE.swap(multi_export, AtomicOrdering::Relaxed);
240    let prev_concurrent = MULTI_EXPORT_CONCURRENT.swap(run_parallel, AtomicOrdering::Relaxed);
241    struct ResetMultiExport(bool, bool);
242    impl Drop for ResetMultiExport {
243        fn drop(&mut self) {
244            MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
245            MULTI_EXPORT_CONCURRENT.store(self.1, AtomicOrdering::Relaxed);
246        }
247    }
248    let _reset_multi = ResetMultiExport(prev_multi, prev_concurrent);
249
250    let mut summaries: Vec<RunSummary> = Vec::with_capacity(exports.len());
251    // Keep the typed `anyhow::Error`s (not flattened strings) so the final bail
252    // can carry a representative one — its DataIntegrityError / SchemaDriftError /
253    // transient marker downcasts through anyhow's context chain in
254    // `error::classify_exit`, giving the right process exit code without grepping
255    // the message.
256    let mut failures: Vec<anyhow::Error> = Vec::new();
257
258    if run_parallel {
259        log::info!(
260            "running {} exports in parallel (separate state DB connection per export)",
261            exports.len()
262        );
263
264        // In threads mode every export emits the same `ChildEvent` stream
265        // that `--parallel-export-processes` children emit, but routed
266        // through an in-process `mpsc` channel.  A single UI thread (the
267        // same `parent_ui::run_ui` used for the process-mode parent) owns
268        // stderr and renders one card line per export — no indicatif, no
269        // multi-bar coordination headache, no scrollback artefacts from
270        // concurrent redraws.  Ensure stderr is also pre-migrated so child
271        // threads opening their own `StateStore` don't race on schema DDL.
272        if let Err(e) = StateStore::open(config_path) {
273            return Err(anyhow::anyhow!(
274                "state: failed to initialize state DB before spawning export threads: {:#}",
275                e
276            ));
277        }
278        let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
279        ipc::install_in_process_tx(tx);
280        let ui_thread = std::thread::Builder::new()
281            .name("rivet-ui".to_string())
282            .spawn(move || parent_ui::run_ui(rx))
283            .ok();
284
285        let collected: std::sync::Mutex<Vec<(Result<()>, RunSummary)>> =
286            std::sync::Mutex::new(Vec::with_capacity(exports.len()));
287        std::thread::scope(|s| {
288            let mut handles = Vec::new();
289            for &export in &exports {
290                handles.push(s.spawn(|| {
291                    let state = match StateStore::open(config_path) {
292                        Ok(s) => s,
293                        Err(e) => {
294                            let err = anyhow::anyhow!(
295                                "export '{}': failed to open state database: {:#}",
296                                export.name,
297                                e
298                            );
299                            let summary = job::synthetic_failed_summary(&export.name, &err);
300                            return (Err(err), summary);
301                        }
302                    };
303                    job::run_export_job(config_path, &config, export, &state, &config_dir, &opts)
304                }));
305            }
306            for h in handles {
307                match h.join() {
308                    Ok(pair) => collected.lock().unwrap().push(pair),
309                    Err(payload) => std::panic::resume_unwind(payload),
310                }
311            }
312        });
313
314        // All exports are done → drop the sender so `parent_ui::run_ui`
315        // sees the channel close and exits cleanly (committing the final
316        // card stack to scrollback).  Joining is best-effort: even if the
317        // UI thread is wedged we still want to print the run aggregate
318        // below.
319        ipc::clear_in_process_tx();
320        if let Some(t) = ui_thread {
321            let _ = t.join();
322        }
323
324        for (res, summary) in collected.into_inner().unwrap() {
325            if let Err(e) = res {
326                failures.push(e);
327            }
328            summaries.push(summary);
329        }
330    } else {
331        let state = StateStore::open(config_path)?;
332
333        // Always route through `parent_ui` — same as `--parallel-exports`.
334        // Gating on `is_attended()` left VHS/ttyd on indicatif when the
335        // attended bit is unset; `run_ui` already falls back to linear
336        // mode for piped stderr.
337        let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
338        ipc::install_in_process_tx(tx);
339        let ui_thread = std::thread::Builder::new()
340            .name("rivet-ui".to_string())
341            .spawn(move || parent_ui::run_ui(rx))
342            .ok();
343
344        for export in &exports {
345            let (res, summary) =
346                job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
347            if let Err(e) = res {
348                failures.push(e);
349            }
350            summaries.push(summary);
351        }
352
353        ipc::clear_in_process_tx();
354        if let Some(t) = ui_thread {
355            let _ = t.join();
356        }
357        // Single-export sequential runs still emit the detailed block after
358        // the card commits to scrollback.
359        if exports.len() == 1
360            && let Some(summary) = summaries.last()
361        {
362            summary.print_stderr_block();
363        }
364    }
365
366    let finished_at = chrono::Utc::now();
367    // Skip the aggregate for single-export runs.  Two cases this catches:
368    //   1) `rivet run --export X` (manual one-off): the per-export block
369    //      already says everything, an aggregate of one row is just noise.
370    //   2) Children spawned by `--parallel-export-processes`: each child
371    //      enters this code path with exports.len() == 1.  The parent
372    //      (parallel_processes branch above) builds the run-wide aggregate
373    //      from every child's `export_metrics` row, so a child-level
374    //      aggregate would just write a duplicate into `run_aggregate`.
375    // Force-write the JSON file even when skipping, so `--summary-output`
376    // remains useful for one-off runs.
377    if exports.len() > 1 {
378        let parallel_mode = if run_parallel {
379            "parallel-threads"
380        } else {
381            "sequential"
382        };
383        let entries: Vec<_> = summaries
384            .iter()
385            .map(aggregate::entry_from_summary)
386            .collect();
387        let agg = aggregate::build(
388            entries,
389            started_at,
390            finished_at,
391            Some(config_path),
392            parallel_mode,
393        );
394        aggregate::print(&agg);
395        // Open a fresh state handle for persisting the aggregate so we don't
396        // assume which thread owned the per-export `StateStore` above.
397        match StateStore::open(config_path) {
398            Ok(state) => aggregate::persist(&state, &agg, summary_output),
399            Err(e) => log::warn!(
400                "aggregate: cannot open state DB to record run aggregate: {:#}",
401                e
402            ),
403        }
404        if json_output {
405            print_json_summary(&agg);
406        }
407    } else if summary_output.is_some() || json_output {
408        // One export, but the user asked for a summary file and/or JSON stdout —
409        // honour both without polluting the DB or stderr.
410        let entries: Vec<_> = summaries
411            .iter()
412            .map(aggregate::entry_from_summary)
413            .collect();
414        let agg = aggregate::build(
415            entries,
416            started_at,
417            finished_at,
418            Some(config_path),
419            "sequential",
420        );
421        if let Some(out) = summary_output
422            && let Err(e) =
423                std::fs::write(out, serde_json::to_string_pretty(&agg).unwrap_or_default())
424        {
425            log::warn!(
426                "aggregate: failed to write summary JSON to {}: {:#}",
427                out.display(),
428                e
429            );
430        }
431        if json_output {
432            print_json_summary(&agg);
433        }
434    }
435
436    if !failures.is_empty() {
437        // Carry a representative typed failure as the returned error so
438        // `error::classify_exit` downcasts the marker (DataIntegrityError=3,
439        // SchemaDriftError=4, transient=2) through anyhow's context chain. Pick
440        // the most "stop-worthy" class — data-integrity (possibly-wrong data)
441        // outranks schema-drift, which outranks retryable, which outranks
442        // generic — so a mixed batch exits on the scariest reason.
443        let primary_idx = representative_failure_idx(&failures).unwrap();
444        let primary = failures.remove(primary_idx);
445        if failures.is_empty() {
446            // Single failure — return it verbatim (its own message + marker).
447            return Err(primary);
448        }
449        // Multiple failures: list the others as higher-level context; `primary`
450        // (with its typed marker) rides underneath so the downcast still finds it.
451        let others = failures
452            .iter()
453            .map(|e| format!("{e:#}"))
454            .collect::<Vec<_>>()
455            .join("; ");
456        return Err(primary.context(format!(
457            "{} export(s) failed; representative error follows (also: {others})",
458            failures.len() + 1
459        )));
460    }
461
462    Ok(())
463}
464
465/// Index of the most "stop-worthy" failure in a batch: data-integrity (exit 3)
466/// outranks schema-drift (4), which outranks retryable (2), which outranks
467/// generic (1). The chosen error's typed marker then rides up so `classify_exit`
468/// exits the process on the scariest reason rather than whichever export happened
469/// to fail first. Returns `None` for an empty slice.
470fn representative_failure_idx(failures: &[anyhow::Error]) -> Option<usize> {
471    let rank = |e: &anyhow::Error| match crate::error::classify_exit(e) {
472        c if c == crate::error::ExitClass::DataIntegrity.code() => 3,
473        c if c == crate::error::ExitClass::SchemaDrift.code() => 2,
474        c if c == crate::error::ExitClass::Retryable.code() => 1,
475        _ => 0,
476    };
477    (0..failures.len()).max_by_key(|&i| rank(&failures[i]))
478}
479
480#[cfg(test)]
481mod representative_failure_tests {
482    use super::representative_failure_idx;
483    use crate::error::{DataIntegrityError, ExitClass, SchemaDriftError, classify_exit};
484
485    #[test]
486    fn empty_batch_has_no_representative() {
487        assert_eq!(representative_failure_idx(&[]), None);
488    }
489
490    #[test]
491    fn data_integrity_outranks_everything_regardless_of_position() {
492        // Data-integrity sits LAST so a naive "first failure" or a flipped
493        // min/max selector would pick the generic error instead.
494        let failures = vec![
495            anyhow::anyhow!("generic boom"),
496            SchemaDriftError::new("shape changed").into(),
497            anyhow::anyhow!("another generic"),
498            DataIntegrityError::new("reconcile mismatch").into(),
499        ];
500        let idx = representative_failure_idx(&failures).unwrap();
501        assert_eq!(
502            classify_exit(&failures[idx]),
503            ExitClass::DataIntegrity.code(),
504            "a mixed batch must surface the data-integrity (exit 3) failure"
505        );
506    }
507
508    #[test]
509    fn schema_drift_outranks_retryable_and_generic() {
510        // No data-integrity present → schema-drift (exit 4) is the scariest.
511        let failures = vec![
512            anyhow::anyhow!("generic"),
513            SchemaDriftError::new("drift").into(),
514        ];
515        let idx = representative_failure_idx(&failures).unwrap();
516        assert_eq!(classify_exit(&failures[idx]), ExitClass::SchemaDrift.code());
517    }
518}