rivet/pipeline/run.rs
1//! **Layer: Coordinator entrypoint** — the `rivet run` orchestrator.
2//!
3//! Single bridge between planning, execution, and persistence/observability.
4//! Owns the multi-export render-mode flags, decides between sequential vs
5//! thread-parallel vs process-parallel, and produces the run aggregate at
6//! the end.
7//!
8//! Lives in its own file so [`crate::pipeline`] (which is read as a facade
9//! by every other module) stays a thin re-export layer rather than a
10//! ~300-LOC orchestrator wrapped in mod-level declarations.
11
12use std::path::Path;
13use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
14
15use crate::config::{Config, ExportConfig};
16use crate::error::Result;
17use crate::state::StateStore;
18
19use super::summary::RunSummary;
20use super::{aggregate, ipc, job, parallel_children, parent_ui, partition_expand};
21
22/// Per-run configuration flags passed from the CLI to the pipeline.
23///
24/// Replaces the previous pattern of threading 4+ positional `bool` arguments
25/// through `run`, `run_export_job`, and child-process invocations. Named fields
26/// prevent silent argument transposition (e.g., `validate` and `reconcile`
27/// swapped).
28#[derive(Debug, Clone, Copy)]
29pub struct RunOptions<'a> {
30 pub validate: bool,
31 pub reconcile: bool,
32 pub resume: bool,
33 /// Override safety gates that would otherwise refuse to start the run.
34 ///
35 /// Currently used by ADR-0012 M8 — `--resume` against a prefix whose
36 /// `_SUCCESS` marker is present is refused unless `--force` is given,
37 /// so an operator cannot accidentally re-export over a verified
38 /// dataset. Other gates may share the same flag in the future
39 /// (per ADR-0013: one `--force`, scoped to whichever gate it overrides).
40 pub force: bool,
41 pub params: Option<&'a std::collections::HashMap<String, String>>,
42}
43
44/// True when the current process is running more than one export in this
45/// `rivet run` invocation (sequential or `--parallel-exports`). Per-export
46/// renderers (`RunSummary::print`, `ChunkProgress`) read this to switch to
47/// the compact one-line format and to suppress the indicatif chunk bar
48/// respectively, so 15 exports take 15 lines instead of 100+ and threads
49/// don't stack progress bars on top of each other.
50///
51/// Children of `--parallel-export-processes` always have `exports.len() == 1`
52/// in their own process so this flag stays `false` for them; the parent
53/// renders cards itself via `parent_ui`.
54pub(crate) static MULTI_EXPORT_MODE: AtomicBool = AtomicBool::new(false);
55
56/// True only when multiple exports run **concurrently** in the current
57/// process (i.e. `--parallel-exports`, threads). Used to suppress
58/// per-export `indicatif` chunk progress bars whose terminal writes
59/// otherwise interleave across threads and corrupt each other.
60pub(crate) static MULTI_EXPORT_CONCURRENT: AtomicBool = AtomicBool::new(false);
61
62pub(crate) fn multi_export_mode() -> bool {
63 MULTI_EXPORT_MODE.load(AtomicOrdering::Relaxed)
64}
65
66#[allow(dead_code)] // kept for future renderers; flag is still set in `run` below.
67pub(crate) fn multi_export_concurrent() -> bool {
68 MULTI_EXPORT_CONCURRENT.load(AtomicOrdering::Relaxed)
69}
70
71fn print_json_summary(agg: &crate::state::RunAggregate) {
72 match serde_json::to_string_pretty(agg) {
73 Ok(json) => println!("{json}"),
74 Err(e) => eprintln!(
75 "rivet: error: failed to serialize run summary as JSON: {:#}",
76 e
77 ),
78 }
79}
80
81#[allow(clippy::too_many_arguments)] // CLI fan-in; surface stays stable per ADR-0013
82pub fn run(
83 config_path: &str,
84 export_name: Option<&str>,
85 validate: bool,
86 reconcile: bool,
87 resume: bool,
88 force: bool,
89 params: Option<&std::collections::HashMap<String, String>>,
90 parallel_exports_cli: bool,
91 parallel_export_processes_cli: bool,
92 summary_output: Option<&Path>,
93 json_output: bool,
94) -> Result<()> {
95 // F-NEW-B (0.7.5 audit): `--force` is scoped to whichever gate it
96 // overrides (today: the `_SUCCESS`-already-present refusal on
97 // resume). When the operator passes `--force` without `--resume`,
98 // the flag is a no-op — surface that explicitly so a typo or
99 // copy-paste mistake does not pass silently.
100 if force && !resume {
101 log::warn!(
102 "--force without --resume is a no-op today (force only overrides the resume safety \
103 gate against a destination prefix whose _SUCCESS is already present)"
104 );
105 }
106 let config = Config::load_with_params(config_path, params)?;
107
108 let config_dir = Path::new(config_path)
109 .parent()
110 .unwrap_or(Path::new("."))
111 .to_path_buf();
112
113 let selected: Vec<&ExportConfig> = if let Some(name) = export_name {
114 let e = config
115 .exports
116 .iter()
117 .find(|e| e.name == name)
118 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
119 vec![e]
120 } else {
121 config.exports.iter().collect()
122 };
123
124 // Value-based partitioning: rewrite any `partition_by` export into one
125 // concrete child export per bucket *before* the run loop. Non-partitioned
126 // exports pass through. The owned vec must outlive the borrowed `exports`
127 // view rebuilt over it, so it is declared in the enclosing scope.
128 let partitioned = partition_expand::any_partitioned(&selected);
129 let expanded_owned: Vec<ExportConfig>;
130 let exports: Vec<&ExportConfig> = if partitioned {
131 expanded_owned = partition_expand::expand_partitioned_exports(
132 &selected,
133 &config.source,
134 &config_dir,
135 params,
136 )?;
137 expanded_owned.iter().collect()
138 } else {
139 selected
140 };
141
142 let opts = RunOptions {
143 validate,
144 reconcile,
145 resume,
146 force,
147 params,
148 };
149
150 let process_mode_requested = parallel_export_processes_cli || config.parallel_export_processes;
151 // Process-mode children re-exec `rivet run --export <name>` and re-load the
152 // config from disk, so they cannot see the synthesised partition child
153 // names. Force in-process execution when partitioning is active.
154 if partitioned && process_mode_requested {
155 log::warn!(
156 "partition_by: --parallel-export-processes is disabled with partitioned exports \
157 (child processes re-load the config and can't see synthesised partitions); \
158 running in-process"
159 );
160 }
161 let run_parallel_processes =
162 process_mode_requested && export_name.is_none() && exports.len() > 1 && !partitioned;
163
164 let started_at = chrono::Utc::now();
165
166 if run_parallel_processes {
167 // Run schema migrations once in the parent BEFORE forking children.
168 // Otherwise N children race for the exclusive write lock on a
169 // brand-new `.rivet_state.db` and `busy_timeout` is not enough to
170 // serialise them — most fail with `migration v1 failed: database is
171 // locked`. After this open succeeds the schema is at the latest
172 // version and children's `StateStore::open` calls become idempotent
173 // (the `MIGRATIONS` loop is a no-op when `ver <= current`).
174 if let Err(e) = StateStore::open(config_path) {
175 return Err(anyhow::anyhow!(
176 "state: failed to initialize state DB before spawning children: {:#}",
177 e
178 ));
179 }
180
181 let (result, child_failures, stderr_dump) =
182 parallel_children::run_exports_as_child_processes(
183 config_path,
184 &exports,
185 validate,
186 reconcile,
187 resume,
188 force,
189 params,
190 );
191 let finished_at = chrono::Utc::now();
192 // Best-effort aggregate: open the state DB read-only-ish and reconstruct
193 // entries from the per-child `record_metric` rows. Failure to open the
194 // DB here only suppresses the aggregate, not the run itself.
195 match StateStore::open(config_path) {
196 Ok(state) => {
197 let entries =
198 aggregate::collect_child_entries(&state, &exports, started_at, &child_failures);
199 let agg = aggregate::build(
200 entries,
201 started_at,
202 finished_at,
203 Some(config_path),
204 "parallel-processes",
205 );
206 aggregate::print(&agg);
207 aggregate::persist(&state, &agg, summary_output);
208 if json_output {
209 print_json_summary(&agg);
210 }
211 }
212 Err(e) => log::warn!(
213 "aggregate: cannot open state DB to record run aggregate: {:#}",
214 e
215 ),
216 }
217 // Captured child stderr is printed AFTER the aggregate so the run
218 // summary stays immediately under the card stack — verbose log
219 // output sits below for triage when needed.
220 if !stderr_dump.is_empty() {
221 use std::io::Write;
222 let mut h = std::io::stderr().lock();
223 let _ = h.write_all(stderr_dump.as_bytes());
224 let _ = h.flush();
225 }
226 return result;
227 }
228
229 let run_parallel = (parallel_exports_cli || config.parallel_exports)
230 && export_name.is_none()
231 && exports.len() > 1;
232
233 // Compact-rendering hints for the per-export renderers. Set once here so
234 // every code path below — sequential, `--parallel-exports`, the apply
235 // path, etc. — sees a consistent mode. Restored at the end of the run
236 // so subsequent invocations within the same process (tests, library
237 // callers) start with a clean slate.
238 let multi_export = export_name.is_none() && exports.len() > 1;
239 let prev_multi = MULTI_EXPORT_MODE.swap(multi_export, AtomicOrdering::Relaxed);
240 let prev_concurrent = MULTI_EXPORT_CONCURRENT.swap(run_parallel, AtomicOrdering::Relaxed);
241 struct ResetMultiExport(bool, bool);
242 impl Drop for ResetMultiExport {
243 fn drop(&mut self) {
244 MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
245 MULTI_EXPORT_CONCURRENT.store(self.1, AtomicOrdering::Relaxed);
246 }
247 }
248 let _reset_multi = ResetMultiExport(prev_multi, prev_concurrent);
249
250 let mut summaries: Vec<RunSummary> = Vec::with_capacity(exports.len());
251 let mut failures: Vec<String> = Vec::new();
252
253 if run_parallel {
254 log::info!(
255 "running {} exports in parallel (separate state DB connection per export)",
256 exports.len()
257 );
258
259 // In threads mode every export emits the same `ChildEvent` stream
260 // that `--parallel-export-processes` children emit, but routed
261 // through an in-process `mpsc` channel. A single UI thread (the
262 // same `parent_ui::run_ui` used for the process-mode parent) owns
263 // stderr and renders one card line per export — no indicatif, no
264 // multi-bar coordination headache, no scrollback artefacts from
265 // concurrent redraws. Ensure stderr is also pre-migrated so child
266 // threads opening their own `StateStore` don't race on schema DDL.
267 if let Err(e) = StateStore::open(config_path) {
268 return Err(anyhow::anyhow!(
269 "state: failed to initialize state DB before spawning export threads: {:#}",
270 e
271 ));
272 }
273 let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
274 ipc::install_in_process_tx(tx);
275 let ui_thread = std::thread::Builder::new()
276 .name("rivet-ui".to_string())
277 .spawn(move || parent_ui::run_ui(rx))
278 .ok();
279
280 let collected: std::sync::Mutex<Vec<(Result<()>, RunSummary)>> =
281 std::sync::Mutex::new(Vec::with_capacity(exports.len()));
282 std::thread::scope(|s| {
283 let mut handles = Vec::new();
284 for &export in &exports {
285 handles.push(s.spawn(|| {
286 let state = match StateStore::open(config_path) {
287 Ok(s) => s,
288 Err(e) => {
289 let err = anyhow::anyhow!(
290 "export '{}': failed to open state database: {:#}",
291 export.name,
292 e
293 );
294 let summary = job::synthetic_failed_summary(&export.name, &err);
295 return (Err(err), summary);
296 }
297 };
298 job::run_export_job(config_path, &config, export, &state, &config_dir, &opts)
299 }));
300 }
301 for h in handles {
302 match h.join() {
303 Ok(pair) => collected.lock().unwrap().push(pair),
304 Err(payload) => std::panic::resume_unwind(payload),
305 }
306 }
307 });
308
309 // All exports are done → drop the sender so `parent_ui::run_ui`
310 // sees the channel close and exits cleanly (committing the final
311 // card stack to scrollback). Joining is best-effort: even if the
312 // UI thread is wedged we still want to print the run aggregate
313 // below.
314 ipc::clear_in_process_tx();
315 if let Some(t) = ui_thread {
316 let _ = t.join();
317 }
318
319 for (res, summary) in collected.into_inner().unwrap() {
320 if let Err(e) = res {
321 failures.push(format!("{e:#}"));
322 }
323 summaries.push(summary);
324 }
325 } else {
326 let state = StateStore::open(config_path)?;
327
328 // Always route through `parent_ui` — same as `--parallel-exports`.
329 // Gating on `is_attended()` left VHS/ttyd on indicatif when the
330 // attended bit is unset; `run_ui` already falls back to linear
331 // mode for piped stderr.
332 let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
333 ipc::install_in_process_tx(tx);
334 let ui_thread = std::thread::Builder::new()
335 .name("rivet-ui".to_string())
336 .spawn(move || parent_ui::run_ui(rx))
337 .ok();
338
339 for export in &exports {
340 let (res, summary) =
341 job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
342 if let Err(e) = res {
343 failures.push(format!("{e:#}"));
344 }
345 summaries.push(summary);
346 }
347
348 ipc::clear_in_process_tx();
349 if let Some(t) = ui_thread {
350 let _ = t.join();
351 }
352 // Single-export sequential runs still emit the detailed block after
353 // the card commits to scrollback.
354 if exports.len() == 1
355 && let Some(summary) = summaries.last()
356 {
357 summary.print_stderr_block();
358 }
359 }
360
361 let finished_at = chrono::Utc::now();
362 // Skip the aggregate for single-export runs. Two cases this catches:
363 // 1) `rivet run --export X` (manual one-off): the per-export block
364 // already says everything, an aggregate of one row is just noise.
365 // 2) Children spawned by `--parallel-export-processes`: each child
366 // enters this code path with exports.len() == 1. The parent
367 // (parallel_processes branch above) builds the run-wide aggregate
368 // from every child's `export_metrics` row, so a child-level
369 // aggregate would just write a duplicate into `run_aggregate`.
370 // Force-write the JSON file even when skipping, so `--summary-output`
371 // remains useful for one-off runs.
372 if exports.len() > 1 {
373 let parallel_mode = if run_parallel {
374 "parallel-threads"
375 } else {
376 "sequential"
377 };
378 let entries: Vec<_> = summaries
379 .iter()
380 .map(aggregate::entry_from_summary)
381 .collect();
382 let agg = aggregate::build(
383 entries,
384 started_at,
385 finished_at,
386 Some(config_path),
387 parallel_mode,
388 );
389 aggregate::print(&agg);
390 // Open a fresh state handle for persisting the aggregate so we don't
391 // assume which thread owned the per-export `StateStore` above.
392 match StateStore::open(config_path) {
393 Ok(state) => aggregate::persist(&state, &agg, summary_output),
394 Err(e) => log::warn!(
395 "aggregate: cannot open state DB to record run aggregate: {:#}",
396 e
397 ),
398 }
399 if json_output {
400 print_json_summary(&agg);
401 }
402 } else if summary_output.is_some() || json_output {
403 // One export, but the user asked for a summary file and/or JSON stdout —
404 // honour both without polluting the DB or stderr.
405 let entries: Vec<_> = summaries
406 .iter()
407 .map(aggregate::entry_from_summary)
408 .collect();
409 let agg = aggregate::build(
410 entries,
411 started_at,
412 finished_at,
413 Some(config_path),
414 "sequential",
415 );
416 if let Some(out) = summary_output
417 && let Err(e) =
418 std::fs::write(out, serde_json::to_string_pretty(&agg).unwrap_or_default())
419 {
420 log::warn!(
421 "aggregate: failed to write summary JSON to {}: {:#}",
422 out.display(),
423 e
424 );
425 }
426 if json_output {
427 print_json_summary(&agg);
428 }
429 }
430
431 if !failures.is_empty() {
432 anyhow::bail!("{}", failures.join("; "));
433 }
434
435 Ok(())
436}