rivet/pipeline/run.rs
1//! **Layer: Coordinator entrypoint** — the `rivet run` orchestrator.
2//!
3//! Single bridge between planning, execution, and persistence/observability.
4//! Owns the multi-export render-mode flags, decides between sequential vs
5//! thread-parallel vs process-parallel, and produces the run aggregate at
6//! the end.
7//!
8//! Lives in its own file so [`crate::pipeline`] (which is read as a facade
9//! by every other module) stays a thin re-export layer rather than a
10//! ~300-LOC orchestrator wrapped in mod-level declarations.
11
12use std::path::Path;
13use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::state::StateStore;
18
19use super::summary::RunSummary;
20use super::{aggregate, finalize, ipc, job, parallel_children, parent_ui, partition_expand};
21
22/// Per-run configuration flags passed from the CLI to the pipeline.
23///
24/// Replaces the previous pattern of threading 4+ positional `bool` arguments
25/// through `run`, `run_export_job`, and child-process invocations. Named fields
26/// prevent silent argument transposition (e.g., `validate` and `reconcile`
27/// swapped).
28#[derive(Debug, Clone, Copy)]
29pub struct RunOptions<'a> {
30 pub validate: bool,
31 pub reconcile: bool,
32 pub resume: bool,
33 /// Override safety gates that would otherwise refuse to start the run.
34 ///
35 /// Currently used by ADR-0012 M8 — `--resume` against a prefix whose
36 /// `_SUCCESS` marker is present is refused unless `--force` is given,
37 /// so an operator cannot accidentally re-export over a verified
38 /// dataset. Other gates may share the same flag in the future
39 /// (per ADR-0013: one `--force`, scoped to whichever gate it overrides).
40 pub force: bool,
41 pub params: Option<&'a std::collections::HashMap<String, String>>,
42}
43
44/// True when the current process is running more than one export in this
45/// `rivet run` invocation (sequential or `--parallel-exports`). Per-export
46/// renderers (`RunSummary::print`, `ChunkProgress`) read this to switch to
47/// the compact one-line format and to suppress the indicatif chunk bar
48/// respectively, so 15 exports take 15 lines instead of 100+ and threads
49/// don't stack progress bars on top of each other.
50///
51/// Children of `--parallel-export-processes` always have `exports.len() == 1`
52/// in their own process so this flag stays `false` for them; the parent
53/// renders cards itself via `parent_ui`.
54pub(crate) static MULTI_EXPORT_MODE: AtomicBool = AtomicBool::new(false);
55
56/// True only when multiple exports run **concurrently** in the current
57/// process (i.e. `--parallel-exports`, threads). Used to suppress
58/// per-export `indicatif` chunk progress bars whose terminal writes
59/// otherwise interleave across threads and corrupt each other.
60pub(crate) static MULTI_EXPORT_CONCURRENT: AtomicBool = AtomicBool::new(false);
61
62pub(crate) fn multi_export_mode() -> bool {
63 MULTI_EXPORT_MODE.load(AtomicOrdering::Relaxed)
64}
65
66#[allow(dead_code)] // kept for future renderers; flag is still set in `run` below.
67pub(crate) fn multi_export_concurrent() -> bool {
68 MULTI_EXPORT_CONCURRENT.load(AtomicOrdering::Relaxed)
69}
70
71fn print_json_summary(agg: &crate::state::RunAggregate) {
72 match serde_json::to_string_pretty(agg) {
73 Ok(json) => println!("{json}"),
74 Err(e) => eprintln!(
75 "rivet: error: failed to serialize run summary as JSON: {:#}",
76 e
77 ),
78 }
79}
80
81/// Emit captured child stderr from a parallel run. It's verbose — every child's
82/// full run card — so write it to a timestamped log beside the config and print
83/// a one-line pointer, instead of flooding the console with all N exports'
84/// stderr. Falls back to the inline console dump if the file can't be written.
85fn emit_child_stderr(dump: &str, dir: &Path) {
86 if dump.is_empty() {
87 return;
88 }
89 let name = format!(
90 "rivet-child-stderr-{}.log",
91 chrono::Utc::now().format("%Y%m%dT%H%M%S")
92 );
93 let path = dir.join(name);
94 match std::fs::write(&path, dump) {
95 // stderr, not stdout — stdout may carry the machine-readable `--json`
96 // run summary, which this pointer would otherwise corrupt.
97 Ok(()) => eprintln!(
98 "\n child stderr (full per-export logs) → {}",
99 path.display()
100 ),
101 Err(e) => {
102 log::warn!(
103 "could not write child stderr to {} ({e}); printing inline",
104 path.display()
105 );
106 use std::io::Write;
107 let mut h = std::io::stderr().lock();
108 let _ = h.write_all(dump.as_bytes());
109 let _ = h.flush();
110 }
111 }
112}
113
114#[allow(clippy::too_many_arguments)] // CLI fan-in; surface stays stable per ADR-0013
115pub fn run(
116 config_path: &str,
117 export_name: Option<&str>,
118 validate: bool,
119 reconcile: bool,
120 resume: bool,
121 force: bool,
122 params: Option<&std::collections::HashMap<String, String>>,
123 parallel_exports_cli: bool,
124 parallel_export_processes_cli: bool,
125 summary_output: Option<&Path>,
126 json_output: bool,
127) -> Result<()> {
128 // F-NEW-B (0.7.5 audit): `--force` is scoped to whichever gate it
129 // overrides (today: the `_SUCCESS`-already-present refusal on
130 // resume). When the operator passes `--force` without `--resume`,
131 // the flag is a no-op — surface that explicitly so a typo or
132 // copy-paste mistake does not pass silently.
133 if force && !resume {
134 log::warn!(
135 "--force without --resume is a no-op today (force only overrides the resume safety \
136 gate against a destination prefix whose _SUCCESS is already present)"
137 );
138 }
139 let config = Config::load_with_params(config_path, params)?;
140
141 let config_dir = Path::new(config_path)
142 .parent()
143 .unwrap_or(Path::new("."))
144 .to_path_buf();
145
146 let selected: Vec<&ExportConfig> = if let Some(name) = export_name {
147 let e = config
148 .exports
149 .iter()
150 .find(|e| e.name == name)
151 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
152 vec![e]
153 } else {
154 config.exports.iter().collect()
155 };
156
157 // Value-based partitioning: rewrite any `partition_by` export into one
158 // concrete child export per bucket *before* the run loop. Non-partitioned
159 // exports pass through. The owned vec must outlive the borrowed `exports`
160 // view rebuilt over it, so it is declared in the enclosing scope.
161 let partitioned = partition_expand::any_partitioned(&selected);
162 let expanded_owned: Vec<ExportConfig>;
163 let exports: Vec<&ExportConfig> = if partitioned {
164 expanded_owned = partition_expand::expand_partitioned_exports(
165 &selected,
166 &config.source,
167 &config_dir,
168 params,
169 )?;
170 expanded_owned.iter().collect()
171 } else {
172 selected
173 };
174
175 let opts = RunOptions {
176 validate,
177 reconcile,
178 resume,
179 force,
180 params,
181 };
182
183 // Seeds the card-table name column so it aligns from the first redraw
184 // (the renderer can't see a long name until its export emits `Started`).
185 let name_floor = exports
186 .iter()
187 .map(|e| e.name.chars().count())
188 .max()
189 .unwrap_or(0);
190 let process_mode_requested = parallel_export_processes_cli || config.parallel_export_processes;
191 // Process-mode children re-exec `rivet run --export <name>` and re-load the
192 // config from disk, so they cannot see the synthesised partition child
193 // names. Force in-process execution when partitioning is active.
194 if partitioned && process_mode_requested {
195 log::warn!(
196 "partition_by: --parallel-export-processes is disabled with partitioned exports \
197 (child processes re-load the config and can't see synthesised partitions); \
198 running in-process"
199 );
200 }
201 let run_parallel_processes =
202 process_mode_requested && export_name.is_none() && exports.len() > 1 && !partitioned;
203
204 let started_at = chrono::Utc::now();
205
206 if run_parallel_processes {
207 // Run schema migrations once in the parent BEFORE forking children.
208 // Otherwise N children race for the exclusive write lock on a
209 // brand-new `.rivet_state.db` and `busy_timeout` is not enough to
210 // serialise them — most fail with `migration v1 failed: database is
211 // locked`. After this open succeeds the schema is at the latest
212 // version and children's `StateStore::open` calls become idempotent
213 // (the `MIGRATIONS` loop is a no-op when `ver <= current`).
214 if let Err(e) = StateStore::open(config_path) {
215 return Err(anyhow::anyhow!(
216 "state: failed to initialize state DB before spawning children: {:#}",
217 e
218 ));
219 }
220
221 let (result, child_failures, stderr_dump) =
222 parallel_children::run_exports_as_child_processes(
223 config_path,
224 &exports,
225 validate,
226 reconcile,
227 resume,
228 force,
229 params,
230 name_floor,
231 );
232 let finished_at = chrono::Utc::now();
233 // Best-effort aggregate: open the state DB read-only-ish and reconstruct
234 // entries from the per-child `record_metric` rows. Failure to open the
235 // DB here only suppresses the aggregate, not the run itself.
236 match StateStore::open(config_path) {
237 Ok(state) => {
238 let entries =
239 aggregate::collect_child_entries(&state, &exports, started_at, &child_failures);
240 let agg = aggregate::build(
241 entries,
242 started_at,
243 finished_at,
244 Some(config_path),
245 "parallel-processes",
246 );
247 aggregate::print(&agg);
248 aggregate::persist(&state, &agg, summary_output);
249 if json_output {
250 print_json_summary(&agg);
251 }
252 }
253 Err(e) => log::warn!(
254 "aggregate: cannot open state DB to record run aggregate: {:#}",
255 e
256 ),
257 }
258 // Captured child stderr (verbose per-export cards) goes to a file
259 // artifact beside the config, with a one-line console pointer — the run
260 // summary stays clean instead of flooding with every child's stderr.
261 emit_child_stderr(&stderr_dump, &config_dir);
262 return result;
263 }
264
265 let run_parallel = (parallel_exports_cli || config.parallel_exports)
266 && export_name.is_none()
267 && exports.len() > 1;
268
269 // Compact-rendering hints for the per-export renderers. Set once here so
270 // every code path below — sequential, `--parallel-exports`, the apply
271 // path, etc. — sees a consistent mode. Restored at the end of the run
272 // so subsequent invocations within the same process (tests, library
273 // callers) start with a clean slate.
274 let multi_export = export_name.is_none() && exports.len() > 1;
275 let prev_multi = MULTI_EXPORT_MODE.swap(multi_export, AtomicOrdering::Relaxed);
276 let prev_concurrent = MULTI_EXPORT_CONCURRENT.swap(run_parallel, AtomicOrdering::Relaxed);
277 struct ResetMultiExport(bool, bool);
278 impl Drop for ResetMultiExport {
279 fn drop(&mut self) {
280 MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
281 MULTI_EXPORT_CONCURRENT.store(self.1, AtomicOrdering::Relaxed);
282 }
283 }
284 let _reset_multi = ResetMultiExport(prev_multi, prev_concurrent);
285
286 let mut summaries: Vec<RunSummary> = Vec::with_capacity(exports.len());
287 // Keep the typed `anyhow::Error`s (not flattened strings) so the final bail
288 // can carry a representative one — its DataIntegrityError / SchemaDriftError /
289 // transient marker downcasts through anyhow's context chain in
290 // `error::classify_exit`, giving the right process exit code without grepping
291 // the message.
292 let mut failures: Vec<anyhow::Error> = Vec::new();
293
294 if run_parallel {
295 log::info!(
296 "running {} exports in parallel (separate state DB connection per export)",
297 exports.len()
298 );
299
300 // In threads mode every export emits the same `ChildEvent` stream
301 // that `--parallel-export-processes` children emit, but routed
302 // through an in-process `mpsc` channel. A single UI thread (the
303 // same `parent_ui::run_ui` used for the process-mode parent) owns
304 // stderr and renders one card line per export — no indicatif, no
305 // multi-bar coordination headache, no scrollback artefacts from
306 // concurrent redraws. Ensure stderr is also pre-migrated so child
307 // threads opening their own `StateStore` don't race on schema DDL.
308 if let Err(e) = StateStore::open(config_path) {
309 return Err(anyhow::anyhow!(
310 "state: failed to initialize state DB before spawning export threads: {:#}",
311 e
312 ));
313 }
314 let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
315 ipc::install_in_process_tx(tx);
316 let ui_thread = std::thread::Builder::new()
317 .name("rivet-ui".to_string())
318 .spawn(move || parent_ui::run_ui(rx, name_floor))
319 .ok();
320
321 let collected: std::sync::Mutex<Vec<(Result<()>, RunSummary)>> =
322 std::sync::Mutex::new(Vec::with_capacity(exports.len()));
323 std::thread::scope(|s| {
324 let mut handles = Vec::new();
325 for &export in &exports {
326 handles.push(s.spawn(|| {
327 let state = match StateStore::open(config_path) {
328 Ok(s) => s,
329 Err(e) => {
330 let err = anyhow::anyhow!(
331 "export '{}': failed to open state database: {:#}",
332 export.name,
333 e
334 );
335 let summary = job::synthetic_failed_summary(&export.name, &err);
336 return (Err(err), summary);
337 }
338 };
339 job::run_export_job(config_path, &config, export, &state, &config_dir, &opts)
340 }));
341 }
342 for h in handles {
343 match h.join() {
344 Ok(pair) => collected.lock().unwrap().push(pair),
345 Err(payload) => std::panic::resume_unwind(payload),
346 }
347 }
348 });
349
350 // All exports are done → drop the sender so `parent_ui::run_ui`
351 // sees the channel close and exits cleanly (committing the final
352 // card stack to scrollback). Joining is best-effort: even if the
353 // UI thread is wedged we still want to print the run aggregate
354 // below.
355 ipc::clear_in_process_tx();
356 if let Some(t) = ui_thread {
357 let _ = t.join();
358 }
359
360 for (res, summary) in collected.into_inner().unwrap() {
361 if let Err(e) = res {
362 failures.push(e);
363 }
364 summaries.push(summary);
365 }
366 } else {
367 let state = StateStore::open(config_path)?;
368
369 // Always route through `parent_ui` — same as `--parallel-exports`.
370 // Gating on `is_attended()` left VHS/ttyd on indicatif when the
371 // attended bit is unset; `run_ui` already falls back to linear
372 // mode for piped stderr.
373 let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
374 ipc::install_in_process_tx(tx);
375 let ui_thread = std::thread::Builder::new()
376 .name("rivet-ui".to_string())
377 .spawn(move || parent_ui::run_ui(rx, name_floor))
378 .ok();
379
380 for export in &exports {
381 let (res, summary) =
382 job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
383 if let Err(e) = res {
384 failures.push(e);
385 }
386 summaries.push(summary);
387 }
388
389 ipc::clear_in_process_tx();
390 if let Some(t) = ui_thread {
391 let _ = t.join();
392 }
393 // Single-export sequential runs still emit the detailed block after
394 // the card commits to scrollback.
395 if exports.len() == 1
396 && let Some(summary) = summaries.last()
397 {
398 summary.print_stderr_block();
399 }
400 }
401
402 let finished_at = chrono::Utc::now();
403 // Skip the aggregate for single-export runs. Two cases this catches:
404 // 1) `rivet run --export X` (manual one-off): the per-export block
405 // already says everything, an aggregate of one row is just noise.
406 // 2) Children spawned by `--parallel-export-processes`: each child
407 // enters this code path with exports.len() == 1. The parent
408 // (parallel_processes branch above) builds the run-wide aggregate
409 // from every child's `export_metrics` row, so a child-level
410 // aggregate would just write a duplicate into `run_aggregate`.
411 // Force-write the JSON file even when skipping, so `--summary-output`
412 // remains useful for one-off runs.
413 if exports.len() > 1 {
414 let parallel_mode = if run_parallel {
415 "parallel-threads"
416 } else {
417 "sequential"
418 };
419 let entries: Vec<_> = summaries
420 .iter()
421 .map(aggregate::entry_from_summary)
422 .collect();
423 let agg = aggregate::build(
424 entries,
425 started_at,
426 finished_at,
427 Some(config_path),
428 parallel_mode,
429 );
430 aggregate::print(&agg);
431 // Open a fresh state handle for persisting the aggregate so we don't
432 // assume which thread owned the per-export `StateStore` above.
433 match StateStore::open(config_path) {
434 Ok(state) => aggregate::persist(&state, &agg, summary_output),
435 Err(e) => log::warn!(
436 "aggregate: cannot open state DB to record run aggregate: {:#}",
437 e
438 ),
439 }
440 if json_output {
441 print_json_summary(&agg);
442 }
443 } else if summary_output.is_some() || json_output {
444 // One export, but the user asked for a summary file and/or JSON stdout —
445 // honour both without polluting the DB or stderr.
446 let entries: Vec<_> = summaries
447 .iter()
448 .map(aggregate::entry_from_summary)
449 .collect();
450 let agg = aggregate::build(
451 entries,
452 started_at,
453 finished_at,
454 Some(config_path),
455 "sequential",
456 );
457 if let Some(out) = summary_output
458 && let Err(e) =
459 std::fs::write(out, serde_json::to_string_pretty(&agg).unwrap_or_default())
460 {
461 log::warn!(
462 "aggregate: failed to write summary JSON to {}: {:#}",
463 out.display(),
464 e
465 );
466 }
467 if json_output {
468 print_json_summary(&agg);
469 }
470 }
471
472 if !failures.is_empty() {
473 // Carry a representative typed failure as the returned error so
474 // `error::classify_exit` downcasts the marker (DataIntegrityError=3,
475 // SchemaDriftError=4, transient=2) through anyhow's context chain. Pick
476 // the most "stop-worthy" class — data-integrity (possibly-wrong data)
477 // outranks schema-drift, which outranks retryable, which outranks
478 // generic — so a mixed batch exits on the scariest reason.
479 let primary_idx = representative_failure_idx(&failures).unwrap();
480 let primary = failures.remove(primary_idx);
481 if failures.is_empty() {
482 // Single failure — return it verbatim (its own message + marker).
483 return Err(primary);
484 }
485 // Multiple failures: list the others as higher-level context; `primary`
486 // (with its typed marker) rides underneath so the downcast still finds it.
487 let others = failures
488 .iter()
489 .map(|e| format!("{e:#}"))
490 .collect::<Vec<_>>()
491 .join("; ");
492 return Err(primary.context(format!(
493 "{} export(s) failed; representative error follows (also: {others})",
494 failures.len() + 1
495 )));
496 }
497
498 Ok(())
499}
500
501/// `rivet apply -c config.yaml` (plan→apply cycle): run every export of the
502/// config **wave by wave** in ascending `wave:` order — exports with no `wave:`
503/// run last — reusing the same per-export job + run aggregate as [`run`]. This
504/// first cut runs each wave's exports SEQUENTIALLY (deterministic); safety-aware
505/// within-wave parallelism is a follow-up, and `partition_by` exports are not
506/// expanded here yet (use `rivet run` for those).
507pub(crate) fn run_waves(
508 config_path: &str,
509 force: bool,
510 parallel_cli: bool,
511 resume: bool,
512) -> Result<()> {
513 let config = Config::load_with_params(config_path, None)?;
514 let config_dir = Path::new(config_path)
515 .parent()
516 .unwrap_or(Path::new("."))
517 .to_path_buf();
518 let opts = RunOptions {
519 validate: false,
520 reconcile: false,
521 resume,
522 force,
523 params: None,
524 };
525
526 // Group exports by wave (ascending; an export with no `wave:` runs last).
527 // The ordering is the contract apply depends on, so it lives in a pure
528 // tested helper rather than hiding inline here.
529 let by_wave = group_exports_by_wave(&config.exports);
530 let total: usize = by_wave.iter().map(|(_, v)| v.len()).sum();
531 if total == 0 {
532 log::warn!("apply: config '{config_path}' defines no exports");
533 return Ok(());
534 }
535
536 // `--parallel` (or `parallel_export_processes: true` in the config) opts into
537 // within-wave parallelism: each wave's exports run as concurrent child
538 // processes (per-child governor keeps each one source-safe), the call blocks
539 // until all exit = the wave barrier. Default stays sequential.
540 let parallel = parallel_cli || config.parallel_export_processes;
541
542 // Compact per-export rendering for the SEQUENTIAL path only. The parallel
543 // (subprocess) path renders the parent card stack itself and each child sees
544 // `exports.len() == 1`, so the flag must stay clear there — matching `run`'s
545 // parallel-processes branch.
546 let prev_multi = MULTI_EXPORT_MODE.swap(total > 1 && !parallel, AtomicOrdering::Relaxed);
547 struct ResetMulti(bool);
548 impl Drop for ResetMulti {
549 fn drop(&mut self) {
550 MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
551 }
552 }
553 let _reset = ResetMulti(prev_multi);
554
555 let state = StateStore::open(config_path)?;
556 let started_at = chrono::Utc::now();
557 let mut summaries: Vec<RunSummary> = Vec::with_capacity(total);
558 let mut failures: Vec<anyhow::Error> = Vec::new();
559 // Parallel-path accumulators: per-child metrics live in the state DB, so the
560 // parent reconstructs one aggregate from them after every wave has joined.
561 let mut all_exports: Vec<&ExportConfig> = Vec::with_capacity(total);
562 let mut child_failures: std::collections::HashMap<String, String> =
563 std::collections::HashMap::new();
564 let mut combined_stderr = String::new();
565
566 for (wave, exports) in &by_wave {
567 let label = if *wave == u32::MAX {
568 "unscheduled".to_string()
569 } else {
570 wave.to_string()
571 };
572 // Skip-completed under --resume: an export whose destination already has
573 // `_SUCCESS` is done — re-running must not redo it (and would hit the
574 // resume gate). The rest run with `resume`, so an incomplete chunked
575 // export continues from its checkpoint. Reuses `finalize`'s prior-run
576 // probe rather than re-implementing the marker check.
577 let pending: Vec<&ExportConfig> = exports
578 .iter()
579 .copied()
580 .filter(|e| {
581 let done = resume && finalize::destination_has_success(&e.destination);
582 if done {
583 log::info!(
584 "apply: skipping '{}' — destination already complete (_SUCCESS)",
585 e.name
586 );
587 }
588 !done
589 })
590 .collect();
591 if pending.is_empty() {
592 continue;
593 }
594 if total > 1 {
595 println!("\n ── wave {label} · {} export(s) ──", pending.len());
596 }
597 // The wave barrier is the loop itself: each strategy below fully drains
598 // the wave (the sequential loop, or the blocking child-process join)
599 // before the next iteration starts the next wave.
600 if parallel {
601 // Cost safety-gate: within the wave, the cheap (`parallel_safe`)
602 // exports run together in ONE concurrent batch; every heavier export
603 // runs ALONE in its own single-child batch, since a big table already
604 // chunk-parallelizes internally and two at once would overload the
605 // source. The per-child governor still bounds each one; this gate also
606 // bounds the concurrent connection count.
607 let (safe, lone): (Vec<&ExportConfig>, Vec<&ExportConfig>) =
608 pending.iter().copied().partition(|e| is_parallel_safe(e));
609 log::info!(
610 "apply: wave {} — {} parallel-safe export(s) in parallel, {} run alone",
611 label,
612 safe.len(),
613 lone.len()
614 );
615 // One single-child batch per lone export (run sequentially), then
616 // one concurrent batch for all parallel-safe exports.
617 let mut batches: Vec<Vec<&ExportConfig>> = lone.iter().map(|e| vec![*e]).collect();
618 if !safe.is_empty() {
619 batches.push(safe);
620 }
621 // Wave-wide name floor so cards align across the safe/lone batches
622 // (the cost gate splits a wave into one safe batch + N lone batches,
623 // each its own renderer — without a shared floor they'd each pad to
624 // their own widest name and the table would step).
625 let wave_name_floor = pending
626 .iter()
627 .map(|e| e.name.chars().count())
628 .max()
629 .unwrap_or(0);
630 for batch in &batches {
631 let (result, cf, stderr_dump) = parallel_children::run_exports_as_child_processes(
632 config_path,
633 batch,
634 false,
635 false,
636 resume,
637 force,
638 None,
639 wave_name_floor,
640 );
641 child_failures.extend(cf);
642 combined_stderr.push_str(&stderr_dump);
643 if let Err(e) = result {
644 failures.push(e);
645 }
646 }
647 all_exports.extend_from_slice(&pending);
648 } else {
649 log::info!(
650 "apply: wave {} — {} export(s), sequential",
651 label,
652 pending.len()
653 );
654 for export in &pending {
655 let (res, summary) =
656 job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
657 if let Err(e) = res {
658 failures.push(e);
659 }
660 summaries.push(summary);
661 }
662 }
663 }
664
665 let finished_at = chrono::Utc::now();
666 if total > 1 {
667 let entries = if parallel {
668 aggregate::collect_child_entries(&state, &all_exports, started_at, &child_failures)
669 } else {
670 summaries
671 .iter()
672 .map(aggregate::entry_from_summary)
673 .collect()
674 };
675 let agg = aggregate::build(
676 entries,
677 started_at,
678 finished_at,
679 Some(config_path),
680 if parallel {
681 "wave-parallel-processes"
682 } else {
683 "wave-sequential"
684 },
685 );
686 aggregate::print(&agg);
687 aggregate::persist(&state, &agg, None);
688 }
689 // Captured child stderr (verbose per-export cards, parallel path only) goes
690 // to a file artifact beside the config, with a one-line console pointer.
691 emit_child_stderr(&combined_stderr, &config_dir);
692
693 if !failures.is_empty() {
694 let primary_idx = representative_failure_idx(&failures).unwrap();
695 let primary = failures.remove(primary_idx);
696 if failures.is_empty() {
697 return Err(primary);
698 }
699 let others = failures
700 .iter()
701 .map(|e| format!("{e:#}"))
702 .collect::<Vec<_>>()
703 .join("; ");
704 return Err(primary.context(format!(
705 "{} export(s) failed across waves; representative error follows (also: {others})",
706 failures.len() + 1
707 )));
708 }
709 Ok(())
710}
711
712/// Group exports by `wave:` in ascending order; an export with no `wave:` runs
713/// last (sorted as `u32::MAX`). Pure + unit-tested — the ordering is the
714/// contract `apply` depends on, so it does not hide inside [`run_waves`].
715fn group_exports_by_wave(exports: &[ExportConfig]) -> Vec<(u32, Vec<&ExportConfig>)> {
716 let mut by_wave: std::collections::BTreeMap<u32, Vec<&ExportConfig>> =
717 std::collections::BTreeMap::new();
718 for e in exports {
719 by_wave
720 .entry(e.wave.unwrap_or(u32::MAX))
721 .or_default()
722 .push(e);
723 }
724 by_wave.into_iter().collect()
725}
726
727/// Whether an export may run concurrently with its wave-mates: the
728/// `parallel_safe` flag that `rivet plan` records from the source-aware cost
729/// class (true only for cheap, `Low`-cost tables — see
730/// [`ExportConfig::parallel_safe`]). A heavy table already chunk-parallelizes
731/// internally, so it runs ALONE within its wave; only the cheap exports share a
732/// concurrent batch. `None` (un-planned / hand-written) is treated as not-safe.
733fn is_parallel_safe(export: &ExportConfig) -> bool {
734 export.parallel_safe.unwrap_or(false)
735}
736
737#[cfg(test)]
738mod wave_grouping_tests {
739 use super::{group_exports_by_wave, is_parallel_safe};
740
741 #[test]
742 fn groups_ascending_with_unscheduled_last() {
743 let mut a = crate::config::sample_export("a");
744 a.wave = Some(3);
745 let mut b = crate::config::sample_export("b");
746 b.wave = None; // unscheduled → must sort last
747 let mut c = crate::config::sample_export("c");
748 c.wave = Some(1);
749 let mut d = crate::config::sample_export("d");
750 d.wave = Some(1); // shares wave 1 with c, preserves input order
751
752 let exports = vec![a, b, c, d];
753 let grouped = group_exports_by_wave(&exports);
754
755 let waves: Vec<u32> = grouped.iter().map(|(w, _)| *w).collect();
756 assert_eq!(waves, vec![1, 3, u32::MAX], "ascending, unscheduled last");
757 let wave1: Vec<&str> = grouped[0].1.iter().map(|e| e.name.as_str()).collect();
758 assert_eq!(wave1, vec!["c", "d"], "same-wave keeps input order");
759 assert_eq!(grouped[2].1.len(), 1);
760 assert_eq!(
761 grouped[2].1[0].name, "b",
762 "the no-wave export lands in the last group"
763 );
764 }
765
766 #[test]
767 fn parallel_safe_reads_the_plan_flag() {
768 // default sample_export leaves `parallel_safe` None → not safe
769 let unset = crate::config::sample_export("unset");
770 assert!(!is_parallel_safe(&unset), "None is treated as not-safe");
771
772 let mut safe = crate::config::sample_export("safe");
773 safe.parallel_safe = Some(true);
774 assert!(is_parallel_safe(&safe), "parallel_safe: true → concurrent");
775
776 let mut not_safe = crate::config::sample_export("heavy");
777 not_safe.parallel_safe = Some(false);
778 assert!(!is_parallel_safe(¬_safe), "parallel_safe: false → alone");
779 }
780}
781
782/// Index of the most "stop-worthy" failure in a batch: data-integrity (exit 3)
783/// outranks schema-drift (4), which outranks retryable (2), which outranks
784/// generic (1). The chosen error's typed marker then rides up so `classify_exit`
785/// exits the process on the scariest reason rather than whichever export happened
786/// to fail first. Returns `None` for an empty slice.
787fn representative_failure_idx(failures: &[anyhow::Error]) -> Option<usize> {
788 let rank = |e: &anyhow::Error| match crate::error::classify_exit(e) {
789 c if c == crate::error::ExitClass::DataIntegrity.code() => 3,
790 c if c == crate::error::ExitClass::SchemaDrift.code() => 2,
791 c if c == crate::error::ExitClass::Retryable.code() => 1,
792 _ => 0,
793 };
794 (0..failures.len()).max_by_key(|&i| rank(&failures[i]))
795}
796
797#[cfg(test)]
798mod representative_failure_tests {
799 use super::representative_failure_idx;
800 use crate::error::{DataIntegrityError, ExitClass, SchemaDriftError, classify_exit};
801
802 #[test]
803 fn empty_batch_has_no_representative() {
804 assert_eq!(representative_failure_idx(&[]), None);
805 }
806
807 #[test]
808 fn data_integrity_outranks_everything_regardless_of_position() {
809 // Data-integrity sits LAST so a naive "first failure" or a flipped
810 // min/max selector would pick the generic error instead.
811 let failures = vec![
812 anyhow::anyhow!("generic boom"),
813 SchemaDriftError::new("shape changed").into(),
814 anyhow::anyhow!("another generic"),
815 DataIntegrityError::new("reconcile mismatch").into(),
816 ];
817 let idx = representative_failure_idx(&failures).unwrap();
818 assert_eq!(
819 classify_exit(&failures[idx]),
820 ExitClass::DataIntegrity.code(),
821 "a mixed batch must surface the data-integrity (exit 3) failure"
822 );
823 }
824
825 #[test]
826 fn schema_drift_outranks_retryable_and_generic() {
827 // No data-integrity present → schema-drift (exit 4) is the scariest.
828 let failures = vec![
829 anyhow::anyhow!("generic"),
830 SchemaDriftError::new("drift").into(),
831 ];
832 let idx = representative_failure_idx(&failures).unwrap();
833 assert_eq!(classify_exit(&failures[idx]), ExitClass::SchemaDrift.code());
834 }
835}