Skip to main content

rivet/pipeline/
run.rs

1//! **Layer: Coordinator entrypoint** — the `rivet run` orchestrator.
2//!
3//! Single bridge between planning, execution, and persistence/observability.
4//! Owns the multi-export render-mode flags, decides between sequential vs
5//! thread-parallel vs process-parallel, and produces the run aggregate at
6//! the end.
7//!
8//! Lives in its own file so [`crate::pipeline`] (which is read as a facade
9//! by every other module) stays a thin re-export layer rather than a
10//! ~300-LOC orchestrator wrapped in mod-level declarations.
11
12use std::path::Path;
13use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::state::StateStore;
18
19use super::summary::RunSummary;
20use super::{aggregate, finalize, ipc, job, parallel_children, parent_ui, partition_expand};
21
22/// Per-run configuration flags passed from the CLI to the pipeline.
23///
24/// Replaces the previous pattern of threading 4+ positional `bool` arguments
25/// through `run`, `run_export_job`, and child-process invocations.  Named fields
26/// prevent silent argument transposition (e.g., `validate` and `reconcile`
27/// swapped).
28#[derive(Debug, Clone, Copy)]
29pub struct RunOptions<'a> {
30    pub validate: bool,
31    pub reconcile: bool,
32    pub resume: bool,
33    /// Override safety gates that would otherwise refuse to start the run.
34    ///
35    /// Currently used by ADR-0012 M8 — `--resume` against a prefix whose
36    /// `_SUCCESS` marker is present is refused unless `--force` is given,
37    /// so an operator cannot accidentally re-export over a verified
38    /// dataset.  Other gates may share the same flag in the future
39    /// (per ADR-0013: one `--force`, scoped to whichever gate it overrides).
40    pub force: bool,
41    pub params: Option<&'a std::collections::HashMap<String, String>>,
42}
43
44/// True when the current process is running more than one export in this
45/// `rivet run` invocation (sequential or `--parallel-exports`).  Per-export
46/// renderers (`RunSummary::print`, `ChunkProgress`) read this to switch to
47/// the compact one-line format and to suppress the indicatif chunk bar
48/// respectively, so 15 exports take 15 lines instead of 100+ and threads
49/// don't stack progress bars on top of each other.
50///
51/// Children of `--parallel-export-processes` always have `exports.len() == 1`
52/// in their own process so this flag stays `false` for them; the parent
53/// renders cards itself via `parent_ui`.
54pub(crate) static MULTI_EXPORT_MODE: AtomicBool = AtomicBool::new(false);
55
56/// True only when multiple exports run **concurrently** in the current
57/// process (i.e. `--parallel-exports`, threads).  Used to suppress
58/// per-export `indicatif` chunk progress bars whose terminal writes
59/// otherwise interleave across threads and corrupt each other.
60pub(crate) static MULTI_EXPORT_CONCURRENT: AtomicBool = AtomicBool::new(false);
61
62pub(crate) fn multi_export_mode() -> bool {
63    MULTI_EXPORT_MODE.load(AtomicOrdering::Relaxed)
64}
65
66#[allow(dead_code)] // kept for future renderers; flag is still set in `run` below.
67pub(crate) fn multi_export_concurrent() -> bool {
68    MULTI_EXPORT_CONCURRENT.load(AtomicOrdering::Relaxed)
69}
70
71fn print_json_summary(agg: &crate::state::RunAggregate) {
72    match serde_json::to_string_pretty(agg) {
73        Ok(json) => println!("{json}"),
74        Err(e) => eprintln!(
75            "rivet: error: failed to serialize run summary as JSON: {:#}",
76            e
77        ),
78    }
79}
80
81/// Emit captured child stderr from a parallel run. It's verbose — every child's
82/// full run card — so write it to a timestamped log beside the config and print
83/// a one-line pointer, instead of flooding the console with all N exports'
84/// stderr. Falls back to the inline console dump if the file can't be written.
85fn emit_child_stderr(dump: &str, dir: &Path) {
86    if dump.is_empty() {
87        return;
88    }
89    let name = format!(
90        "rivet-child-stderr-{}.log",
91        chrono::Utc::now().format("%Y%m%dT%H%M%S")
92    );
93    let path = dir.join(name);
94    match std::fs::write(&path, dump) {
95        // stderr, not stdout — stdout may carry the machine-readable `--json`
96        // run summary, which this pointer would otherwise corrupt.
97        Ok(()) => eprintln!(
98            "\n  child stderr (full per-export logs) → {}",
99            path.display()
100        ),
101        Err(e) => {
102            log::warn!(
103                "could not write child stderr to {} ({e}); printing inline",
104                path.display()
105            );
106            use std::io::Write;
107            let mut h = std::io::stderr().lock();
108            let _ = h.write_all(dump.as_bytes());
109            let _ = h.flush();
110        }
111    }
112}
113
114#[allow(clippy::too_many_arguments)] // CLI fan-in; surface stays stable per ADR-0013
115pub fn run(
116    config_path: &str,
117    export_name: Option<&str>,
118    validate: bool,
119    reconcile: bool,
120    resume: bool,
121    force: bool,
122    params: Option<&std::collections::HashMap<String, String>>,
123    parallel_exports_cli: bool,
124    parallel_export_processes_cli: bool,
125    summary_output: Option<&Path>,
126    json_output: bool,
127) -> Result<()> {
128    // F-NEW-B (0.7.5 audit): `--force` is scoped to whichever gate it
129    // overrides (today: the `_SUCCESS`-already-present refusal on
130    // resume).  When the operator passes `--force` without `--resume`,
131    // the flag is a no-op — surface that explicitly so a typo or
132    // copy-paste mistake does not pass silently.
133    if force && !resume {
134        log::warn!(
135            "--force without --resume is a no-op today (force only overrides the resume safety \
136             gate against a destination prefix whose _SUCCESS is already present)"
137        );
138    }
139    let config = Config::load_with_params(config_path, params)?;
140
141    let config_dir = Path::new(config_path)
142        .parent()
143        .unwrap_or(Path::new("."))
144        .to_path_buf();
145
146    let selected: Vec<&ExportConfig> = if let Some(name) = export_name {
147        let e = config
148            .exports
149            .iter()
150            .find(|e| e.name == name)
151            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
152        vec![e]
153    } else {
154        config.exports.iter().collect()
155    };
156
157    // Value-based partitioning: rewrite any `partition_by` export into one
158    // concrete child export per bucket *before* the run loop. Non-partitioned
159    // exports pass through. The owned vec must outlive the borrowed `exports`
160    // view rebuilt over it, so it is declared in the enclosing scope.
161    let partitioned = partition_expand::any_partitioned(&selected);
162    let expanded_owned: Vec<ExportConfig>;
163    let exports: Vec<&ExportConfig> = if partitioned {
164        expanded_owned = partition_expand::expand_partitioned_exports(
165            &selected,
166            &config.source,
167            &config_dir,
168            params,
169        )?;
170        expanded_owned.iter().collect()
171    } else {
172        selected
173    };
174
175    let opts = RunOptions {
176        validate,
177        reconcile,
178        resume,
179        force,
180        params,
181    };
182
183    // Seeds the card-table name column so it aligns from the first redraw
184    // (the renderer can't see a long name until its export emits `Started`).
185    let name_floor = exports
186        .iter()
187        .map(|e| e.name.chars().count())
188        .max()
189        .unwrap_or(0);
190    let process_mode_requested = parallel_export_processes_cli || config.parallel_export_processes;
191    // Process-mode children re-exec `rivet run --export <name>` and re-load the
192    // config from disk, so they cannot see the synthesised partition child
193    // names. Force in-process execution when partitioning is active.
194    if partitioned && process_mode_requested {
195        log::warn!(
196            "partition_by: --parallel-export-processes is disabled with partitioned exports \
197             (child processes re-load the config and can't see synthesised partitions); \
198             running in-process"
199        );
200    }
201    let run_parallel_processes =
202        process_mode_requested && export_name.is_none() && exports.len() > 1 && !partitioned;
203
204    let started_at = chrono::Utc::now();
205
206    if run_parallel_processes {
207        // Run schema migrations once in the parent BEFORE forking children.
208        // Otherwise N children race for the exclusive write lock on a
209        // brand-new `.rivet_state.db` and `busy_timeout` is not enough to
210        // serialise them — most fail with `migration v1 failed: database is
211        // locked`.  After this open succeeds the schema is at the latest
212        // version and children's `StateStore::open` calls become idempotent
213        // (the `MIGRATIONS` loop is a no-op when `ver <= current`).
214        if let Err(e) = StateStore::open(config_path) {
215            return Err(anyhow::anyhow!(
216                "state: failed to initialize state DB before spawning children: {:#}",
217                e
218            ));
219        }
220
221        let (result, child_failures, stderr_dump) =
222            parallel_children::run_exports_as_child_processes(
223                config_path,
224                &exports,
225                validate,
226                reconcile,
227                resume,
228                force,
229                params,
230                name_floor,
231            );
232        let finished_at = chrono::Utc::now();
233        // Best-effort aggregate: open the state DB read-only-ish and reconstruct
234        // entries from the per-child `record_metric` rows.  Failure to open the
235        // DB here only suppresses the aggregate, not the run itself.
236        match StateStore::open(config_path) {
237            Ok(state) => {
238                let entries =
239                    aggregate::collect_child_entries(&state, &exports, started_at, &child_failures);
240                let agg = aggregate::build(
241                    entries,
242                    started_at,
243                    finished_at,
244                    Some(config_path),
245                    "parallel-processes",
246                );
247                aggregate::print(&agg);
248                aggregate::persist(&state, &agg, summary_output);
249                if json_output {
250                    print_json_summary(&agg);
251                }
252            }
253            Err(e) => log::warn!(
254                "aggregate: cannot open state DB to record run aggregate: {:#}",
255                e
256            ),
257        }
258        // Captured child stderr (verbose per-export cards) goes to a file
259        // artifact beside the config, with a one-line console pointer — the run
260        // summary stays clean instead of flooding with every child's stderr.
261        emit_child_stderr(&stderr_dump, &config_dir);
262        return result;
263    }
264
265    let run_parallel = (parallel_exports_cli || config.parallel_exports)
266        && export_name.is_none()
267        && exports.len() > 1;
268
269    // Compact-rendering hints for the per-export renderers.  Set once here so
270    // every code path below — sequential, `--parallel-exports`, the apply
271    // path, etc. — sees a consistent mode.  Restored at the end of the run
272    // so subsequent invocations within the same process (tests, library
273    // callers) start with a clean slate.
274    let multi_export = export_name.is_none() && exports.len() > 1;
275    let prev_multi = MULTI_EXPORT_MODE.swap(multi_export, AtomicOrdering::Relaxed);
276    let prev_concurrent = MULTI_EXPORT_CONCURRENT.swap(run_parallel, AtomicOrdering::Relaxed);
277    struct ResetMultiExport(bool, bool);
278    impl Drop for ResetMultiExport {
279        fn drop(&mut self) {
280            MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
281            MULTI_EXPORT_CONCURRENT.store(self.1, AtomicOrdering::Relaxed);
282        }
283    }
284    let _reset_multi = ResetMultiExport(prev_multi, prev_concurrent);
285
286    let mut summaries: Vec<RunSummary> = Vec::with_capacity(exports.len());
287    // Keep the typed `anyhow::Error`s (not flattened strings) so the final bail
288    // can carry a representative one — its DataIntegrityError / SchemaDriftError /
289    // transient marker downcasts through anyhow's context chain in
290    // `error::classify_exit`, giving the right process exit code without grepping
291    // the message.
292    let mut failures: Vec<anyhow::Error> = Vec::new();
293
294    if run_parallel {
295        log::info!(
296            "running {} exports in parallel (separate state DB connection per export)",
297            exports.len()
298        );
299
300        // In threads mode every export emits the same `ChildEvent` stream
301        // that `--parallel-export-processes` children emit, but routed
302        // through an in-process `mpsc` channel.  A single UI thread (the
303        // same `parent_ui::run_ui` used for the process-mode parent) owns
304        // stderr and renders one card line per export — no indicatif, no
305        // multi-bar coordination headache, no scrollback artefacts from
306        // concurrent redraws.  Ensure stderr is also pre-migrated so child
307        // threads opening their own `StateStore` don't race on schema DDL.
308        if let Err(e) = StateStore::open(config_path) {
309            return Err(anyhow::anyhow!(
310                "state: failed to initialize state DB before spawning export threads: {:#}",
311                e
312            ));
313        }
314        let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
315        ipc::install_in_process_tx(tx);
316        let ui_thread = std::thread::Builder::new()
317            .name("rivet-ui".to_string())
318            .spawn(move || parent_ui::run_ui(rx, name_floor))
319            .ok();
320
321        let collected: std::sync::Mutex<Vec<(Result<()>, RunSummary)>> =
322            std::sync::Mutex::new(Vec::with_capacity(exports.len()));
323        std::thread::scope(|s| {
324            let mut handles = Vec::new();
325            for &export in &exports {
326                handles.push(s.spawn(|| {
327                    let state = match StateStore::open(config_path) {
328                        Ok(s) => s,
329                        Err(e) => {
330                            let err = anyhow::anyhow!(
331                                "export '{}': failed to open state database: {:#}",
332                                export.name,
333                                e
334                            );
335                            let summary = job::synthetic_failed_summary(&export.name, &err);
336                            return (Err(err), summary);
337                        }
338                    };
339                    job::run_export_job(config_path, &config, export, &state, &config_dir, &opts)
340                }));
341            }
342            for h in handles {
343                match h.join() {
344                    Ok(pair) => collected.lock().unwrap().push(pair),
345                    Err(payload) => std::panic::resume_unwind(payload),
346                }
347            }
348        });
349
350        // All exports are done → drop the sender so `parent_ui::run_ui`
351        // sees the channel close and exits cleanly (committing the final
352        // card stack to scrollback).  Joining is best-effort: even if the
353        // UI thread is wedged we still want to print the run aggregate
354        // below.
355        ipc::clear_in_process_tx();
356        if let Some(t) = ui_thread {
357            let _ = t.join();
358        }
359
360        for (res, summary) in collected.into_inner().unwrap() {
361            if let Err(e) = res {
362                failures.push(e);
363            }
364            summaries.push(summary);
365        }
366    } else {
367        let state = StateStore::open(config_path)?;
368
369        // Always route through `parent_ui` — same as `--parallel-exports`.
370        // Gating on `is_attended()` left VHS/ttyd on indicatif when the
371        // attended bit is unset; `run_ui` already falls back to linear
372        // mode for piped stderr.
373        let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
374        ipc::install_in_process_tx(tx);
375        let ui_thread = std::thread::Builder::new()
376            .name("rivet-ui".to_string())
377            .spawn(move || parent_ui::run_ui(rx, name_floor))
378            .ok();
379
380        for export in &exports {
381            let (res, summary) =
382                job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
383            if let Err(e) = res {
384                failures.push(e);
385            }
386            summaries.push(summary);
387        }
388
389        ipc::clear_in_process_tx();
390        if let Some(t) = ui_thread {
391            let _ = t.join();
392        }
393        // Single-export sequential runs still emit the detailed block after
394        // the card commits to scrollback.
395        if exports.len() == 1
396            && let Some(summary) = summaries.last()
397        {
398            summary.print_stderr_block();
399        }
400    }
401
402    let finished_at = chrono::Utc::now();
403    // Skip the aggregate for single-export runs.  Two cases this catches:
404    //   1) `rivet run --export X` (manual one-off): the per-export block
405    //      already says everything, an aggregate of one row is just noise.
406    //   2) Children spawned by `--parallel-export-processes`: each child
407    //      enters this code path with exports.len() == 1.  The parent
408    //      (parallel_processes branch above) builds the run-wide aggregate
409    //      from every child's `export_metrics` row, so a child-level
410    //      aggregate would just write a duplicate into `run_aggregate`.
411    // Force-write the JSON file even when skipping, so `--summary-output`
412    // remains useful for one-off runs.
413    if exports.len() > 1 {
414        let parallel_mode = if run_parallel {
415            "parallel-threads"
416        } else {
417            "sequential"
418        };
419        let entries: Vec<_> = summaries
420            .iter()
421            .map(aggregate::entry_from_summary)
422            .collect();
423        let agg = aggregate::build(
424            entries,
425            started_at,
426            finished_at,
427            Some(config_path),
428            parallel_mode,
429        );
430        aggregate::print(&agg);
431        // Open a fresh state handle for persisting the aggregate so we don't
432        // assume which thread owned the per-export `StateStore` above.
433        match StateStore::open(config_path) {
434            Ok(state) => aggregate::persist(&state, &agg, summary_output),
435            Err(e) => log::warn!(
436                "aggregate: cannot open state DB to record run aggregate: {:#}",
437                e
438            ),
439        }
440        if json_output {
441            print_json_summary(&agg);
442        }
443    } else if summary_output.is_some() || json_output {
444        // One export, but the user asked for a summary file and/or JSON stdout —
445        // honour both without polluting the DB or stderr.
446        let entries: Vec<_> = summaries
447            .iter()
448            .map(aggregate::entry_from_summary)
449            .collect();
450        let agg = aggregate::build(
451            entries,
452            started_at,
453            finished_at,
454            Some(config_path),
455            "sequential",
456        );
457        if let Some(out) = summary_output
458            && let Err(e) =
459                std::fs::write(out, serde_json::to_string_pretty(&agg).unwrap_or_default())
460        {
461            log::warn!(
462                "aggregate: failed to write summary JSON to {}: {:#}",
463                out.display(),
464                e
465            );
466        }
467        if json_output {
468            print_json_summary(&agg);
469        }
470    }
471
472    if !failures.is_empty() {
473        // Carry a representative typed failure as the returned error so
474        // `error::classify_exit` downcasts the marker (DataIntegrityError=3,
475        // SchemaDriftError=4, transient=2) through anyhow's context chain. Pick
476        // the most "stop-worthy" class — data-integrity (possibly-wrong data)
477        // outranks schema-drift, which outranks retryable, which outranks
478        // generic — so a mixed batch exits on the scariest reason.
479        let primary_idx = representative_failure_idx(&failures).unwrap();
480        let primary = failures.remove(primary_idx);
481        if failures.is_empty() {
482            // Single failure — return it verbatim (its own message + marker).
483            return Err(primary);
484        }
485        // Multiple failures: list the others as higher-level context; `primary`
486        // (with its typed marker) rides underneath so the downcast still finds it.
487        let others = failures
488            .iter()
489            .map(|e| format!("{e:#}"))
490            .collect::<Vec<_>>()
491            .join("; ");
492        return Err(primary.context(format!(
493            "{} export(s) failed; representative error follows (also: {others})",
494            failures.len() + 1
495        )));
496    }
497
498    Ok(())
499}
500
501/// `rivet apply -c config.yaml` (plan→apply cycle): run every export of the
502/// config **wave by wave** in ascending `wave:` order — exports with no `wave:`
503/// run last — reusing the same per-export job + run aggregate as [`run`]. This
504/// first cut runs each wave's exports SEQUENTIALLY (deterministic); safety-aware
505/// within-wave parallelism is a follow-up, and `partition_by` exports are not
506/// expanded here yet (use `rivet run` for those).
507pub(crate) fn run_waves(
508    config_path: &str,
509    force: bool,
510    parallel_cli: bool,
511    resume: bool,
512) -> Result<()> {
513    let config = Config::load_with_params(config_path, None)?;
514    let config_dir = Path::new(config_path)
515        .parent()
516        .unwrap_or(Path::new("."))
517        .to_path_buf();
518    let opts = RunOptions {
519        validate: false,
520        reconcile: false,
521        resume,
522        force,
523        params: None,
524    };
525
526    // Group exports by wave (ascending; an export with no `wave:` runs last).
527    // The ordering is the contract apply depends on, so it lives in a pure
528    // tested helper rather than hiding inline here.
529    let by_wave = group_exports_by_wave(&config.exports);
530    let total: usize = by_wave.iter().map(|(_, v)| v.len()).sum();
531    if total == 0 {
532        log::warn!("apply: config '{config_path}' defines no exports");
533        return Ok(());
534    }
535
536    // `--parallel` (or `parallel_export_processes: true` in the config) opts into
537    // within-wave parallelism: each wave's exports run as concurrent child
538    // processes (per-child governor keeps each one source-safe), the call blocks
539    // until all exit = the wave barrier. Default stays sequential.
540    let parallel = parallel_cli || config.parallel_export_processes;
541
542    // Compact per-export rendering for the SEQUENTIAL path only. The parallel
543    // (subprocess) path renders the parent card stack itself and each child sees
544    // `exports.len() == 1`, so the flag must stay clear there — matching `run`'s
545    // parallel-processes branch.
546    let prev_multi = MULTI_EXPORT_MODE.swap(total > 1 && !parallel, AtomicOrdering::Relaxed);
547    struct ResetMulti(bool);
548    impl Drop for ResetMulti {
549        fn drop(&mut self) {
550            MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
551        }
552    }
553    let _reset = ResetMulti(prev_multi);
554
555    let state = StateStore::open(config_path)?;
556    let started_at = chrono::Utc::now();
557    let mut summaries: Vec<RunSummary> = Vec::with_capacity(total);
558    let mut failures: Vec<anyhow::Error> = Vec::new();
559    // Parallel-path accumulators: per-child metrics live in the state DB, so the
560    // parent reconstructs one aggregate from them after every wave has joined.
561    let mut all_exports: Vec<&ExportConfig> = Vec::with_capacity(total);
562    let mut child_failures: std::collections::HashMap<String, String> =
563        std::collections::HashMap::new();
564    let mut combined_stderr = String::new();
565
566    for (wave, exports) in &by_wave {
567        let label = if *wave == u32::MAX {
568            "unscheduled".to_string()
569        } else {
570            wave.to_string()
571        };
572        // Skip-completed under --resume: an export whose destination already has
573        // `_SUCCESS` is done — re-running must not redo it (and would hit the
574        // resume gate). The rest run with `resume`, so an incomplete chunked
575        // export continues from its checkpoint. Reuses `finalize`'s prior-run
576        // probe rather than re-implementing the marker check.
577        let pending: Vec<&ExportConfig> = exports
578            .iter()
579            .copied()
580            .filter(|e| {
581                let done = resume && finalize::destination_has_success(&e.destination);
582                if done {
583                    log::info!(
584                        "apply: skipping '{}' — destination already complete (_SUCCESS)",
585                        e.name
586                    );
587                }
588                !done
589            })
590            .collect();
591        if pending.is_empty() {
592            continue;
593        }
594        if total > 1 {
595            println!("\n  ── wave {label} · {} export(s) ──", pending.len());
596        }
597        // The wave barrier is the loop itself: each strategy below fully drains
598        // the wave (the sequential loop, or the blocking child-process join)
599        // before the next iteration starts the next wave.
600        if parallel {
601            // Cost safety-gate: within the wave, the cheap (`parallel_safe`)
602            // exports run together in ONE concurrent batch; every heavier export
603            // runs ALONE in its own single-child batch, since a big table already
604            // chunk-parallelizes internally and two at once would overload the
605            // source. The per-child governor still bounds each one; this gate also
606            // bounds the concurrent connection count.
607            let (safe, lone): (Vec<&ExportConfig>, Vec<&ExportConfig>) =
608                pending.iter().copied().partition(|e| is_parallel_safe(e));
609            log::info!(
610                "apply: wave {} — {} parallel-safe export(s) in parallel, {} run alone",
611                label,
612                safe.len(),
613                lone.len()
614            );
615            // One single-child batch per lone export (run sequentially), then
616            // one concurrent batch for all parallel-safe exports.
617            let mut batches: Vec<Vec<&ExportConfig>> = lone.iter().map(|e| vec![*e]).collect();
618            if !safe.is_empty() {
619                batches.push(safe);
620            }
621            // Wave-wide name floor so cards align across the safe/lone batches
622            // (the cost gate splits a wave into one safe batch + N lone batches,
623            // each its own renderer — without a shared floor they'd each pad to
624            // their own widest name and the table would step).
625            let wave_name_floor = pending
626                .iter()
627                .map(|e| e.name.chars().count())
628                .max()
629                .unwrap_or(0);
630            for batch in &batches {
631                let (result, cf, stderr_dump) = parallel_children::run_exports_as_child_processes(
632                    config_path,
633                    batch,
634                    false,
635                    false,
636                    resume,
637                    force,
638                    None,
639                    wave_name_floor,
640                );
641                child_failures.extend(cf);
642                combined_stderr.push_str(&stderr_dump);
643                if let Err(e) = result {
644                    failures.push(e);
645                }
646            }
647            all_exports.extend_from_slice(&pending);
648        } else {
649            log::info!(
650                "apply: wave {} — {} export(s), sequential",
651                label,
652                pending.len()
653            );
654            for export in &pending {
655                let (res, summary) =
656                    job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
657                if let Err(e) = res {
658                    failures.push(e);
659                }
660                summaries.push(summary);
661            }
662        }
663    }
664
665    let finished_at = chrono::Utc::now();
666    if total > 1 {
667        let entries = if parallel {
668            aggregate::collect_child_entries(&state, &all_exports, started_at, &child_failures)
669        } else {
670            summaries
671                .iter()
672                .map(aggregate::entry_from_summary)
673                .collect()
674        };
675        let agg = aggregate::build(
676            entries,
677            started_at,
678            finished_at,
679            Some(config_path),
680            if parallel {
681                "wave-parallel-processes"
682            } else {
683                "wave-sequential"
684            },
685        );
686        aggregate::print(&agg);
687        aggregate::persist(&state, &agg, None);
688    }
689    // Captured child stderr (verbose per-export cards, parallel path only) goes
690    // to a file artifact beside the config, with a one-line console pointer.
691    emit_child_stderr(&combined_stderr, &config_dir);
692
693    if !failures.is_empty() {
694        let primary_idx = representative_failure_idx(&failures).unwrap();
695        let primary = failures.remove(primary_idx);
696        if failures.is_empty() {
697            return Err(primary);
698        }
699        let others = failures
700            .iter()
701            .map(|e| format!("{e:#}"))
702            .collect::<Vec<_>>()
703            .join("; ");
704        return Err(primary.context(format!(
705            "{} export(s) failed across waves; representative error follows (also: {others})",
706            failures.len() + 1
707        )));
708    }
709    Ok(())
710}
711
712/// Group exports by `wave:` in ascending order; an export with no `wave:` runs
713/// last (sorted as `u32::MAX`). Pure + unit-tested — the ordering is the
714/// contract `apply` depends on, so it does not hide inside [`run_waves`].
715fn group_exports_by_wave(exports: &[ExportConfig]) -> Vec<(u32, Vec<&ExportConfig>)> {
716    let mut by_wave: std::collections::BTreeMap<u32, Vec<&ExportConfig>> =
717        std::collections::BTreeMap::new();
718    for e in exports {
719        by_wave
720            .entry(e.wave.unwrap_or(u32::MAX))
721            .or_default()
722            .push(e);
723    }
724    by_wave.into_iter().collect()
725}
726
727/// Whether an export may run concurrently with its wave-mates: the
728/// `parallel_safe` flag that `rivet plan` records from the source-aware cost
729/// class (true only for cheap, `Low`-cost tables — see
730/// [`ExportConfig::parallel_safe`]). A heavy table already chunk-parallelizes
731/// internally, so it runs ALONE within its wave; only the cheap exports share a
732/// concurrent batch. `None` (un-planned / hand-written) is treated as not-safe.
733fn is_parallel_safe(export: &ExportConfig) -> bool {
734    export.parallel_safe.unwrap_or(false)
735}
736
737#[cfg(test)]
738mod wave_grouping_tests {
739    use super::{group_exports_by_wave, is_parallel_safe};
740
741    #[test]
742    fn groups_ascending_with_unscheduled_last() {
743        let mut a = crate::config::sample_export("a");
744        a.wave = Some(3);
745        let mut b = crate::config::sample_export("b");
746        b.wave = None; // unscheduled → must sort last
747        let mut c = crate::config::sample_export("c");
748        c.wave = Some(1);
749        let mut d = crate::config::sample_export("d");
750        d.wave = Some(1); // shares wave 1 with c, preserves input order
751
752        let exports = vec![a, b, c, d];
753        let grouped = group_exports_by_wave(&exports);
754
755        let waves: Vec<u32> = grouped.iter().map(|(w, _)| *w).collect();
756        assert_eq!(waves, vec![1, 3, u32::MAX], "ascending, unscheduled last");
757        let wave1: Vec<&str> = grouped[0].1.iter().map(|e| e.name.as_str()).collect();
758        assert_eq!(wave1, vec!["c", "d"], "same-wave keeps input order");
759        assert_eq!(grouped[2].1.len(), 1);
760        assert_eq!(
761            grouped[2].1[0].name, "b",
762            "the no-wave export lands in the last group"
763        );
764    }
765
766    #[test]
767    fn parallel_safe_reads_the_plan_flag() {
768        // default sample_export leaves `parallel_safe` None → not safe
769        let unset = crate::config::sample_export("unset");
770        assert!(!is_parallel_safe(&unset), "None is treated as not-safe");
771
772        let mut safe = crate::config::sample_export("safe");
773        safe.parallel_safe = Some(true);
774        assert!(is_parallel_safe(&safe), "parallel_safe: true → concurrent");
775
776        let mut not_safe = crate::config::sample_export("heavy");
777        not_safe.parallel_safe = Some(false);
778        assert!(!is_parallel_safe(&not_safe), "parallel_safe: false → alone");
779    }
780}
781
782/// Index of the most "stop-worthy" failure in a batch: data-integrity (exit 3)
783/// outranks schema-drift (4), which outranks retryable (2), which outranks
784/// generic (1). The chosen error's typed marker then rides up so `classify_exit`
785/// exits the process on the scariest reason rather than whichever export happened
786/// to fail first. Returns `None` for an empty slice.
787fn representative_failure_idx(failures: &[anyhow::Error]) -> Option<usize> {
788    let rank = |e: &anyhow::Error| match crate::error::classify_exit(e) {
789        c if c == crate::error::ExitClass::DataIntegrity.code() => 3,
790        c if c == crate::error::ExitClass::SchemaDrift.code() => 2,
791        c if c == crate::error::ExitClass::Retryable.code() => 1,
792        _ => 0,
793    };
794    (0..failures.len()).max_by_key(|&i| rank(&failures[i]))
795}
796
797#[cfg(test)]
798mod representative_failure_tests {
799    use super::representative_failure_idx;
800    use crate::error::{DataIntegrityError, ExitClass, SchemaDriftError, classify_exit};
801
802    #[test]
803    fn empty_batch_has_no_representative() {
804        assert_eq!(representative_failure_idx(&[]), None);
805    }
806
807    #[test]
808    fn data_integrity_outranks_everything_regardless_of_position() {
809        // Data-integrity sits LAST so a naive "first failure" or a flipped
810        // min/max selector would pick the generic error instead.
811        let failures = vec![
812            anyhow::anyhow!("generic boom"),
813            SchemaDriftError::new("shape changed").into(),
814            anyhow::anyhow!("another generic"),
815            DataIntegrityError::new("reconcile mismatch").into(),
816        ];
817        let idx = representative_failure_idx(&failures).unwrap();
818        assert_eq!(
819            classify_exit(&failures[idx]),
820            ExitClass::DataIntegrity.code(),
821            "a mixed batch must surface the data-integrity (exit 3) failure"
822        );
823    }
824
825    #[test]
826    fn schema_drift_outranks_retryable_and_generic() {
827        // No data-integrity present → schema-drift (exit 4) is the scariest.
828        let failures = vec![
829            anyhow::anyhow!("generic"),
830            SchemaDriftError::new("drift").into(),
831        ];
832        let idx = representative_failure_idx(&failures).unwrap();
833        assert_eq!(classify_exit(&failures[idx]), ExitClass::SchemaDrift.code());
834    }
835}