use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crate::config::{Config, ExportConfig};
use crate::error::Result;
use crate::state::StateStore;
use super::summary::RunSummary;
use super::{aggregate, finalize, ipc, job, parallel_children, parent_ui, partition_expand};
#[derive(Debug, Clone, Copy)]
pub struct RunOptions<'a> {
pub validate: bool,
pub reconcile: bool,
pub resume: bool,
pub force: bool,
pub params: Option<&'a std::collections::HashMap<String, String>>,
}
pub(crate) static MULTI_EXPORT_MODE: AtomicBool = AtomicBool::new(false);
pub(crate) static MULTI_EXPORT_CONCURRENT: AtomicBool = AtomicBool::new(false);
pub(crate) fn multi_export_mode() -> bool {
MULTI_EXPORT_MODE.load(AtomicOrdering::Relaxed)
}
#[allow(dead_code)] pub(crate) fn multi_export_concurrent() -> bool {
MULTI_EXPORT_CONCURRENT.load(AtomicOrdering::Relaxed)
}
fn print_json_summary(agg: &crate::state::RunAggregate) {
match serde_json::to_string_pretty(agg) {
Ok(json) => println!("{json}"),
Err(e) => eprintln!(
"rivet: error: failed to serialize run summary as JSON: {:#}",
e
),
}
}
fn emit_child_stderr(dump: &str, dir: &Path) {
if dump.is_empty() {
return;
}
let name = format!(
"rivet-child-stderr-{}.log",
chrono::Utc::now().format("%Y%m%dT%H%M%S")
);
let path = dir.join(name);
match std::fs::write(&path, dump) {
Ok(()) => eprintln!(
"\n child stderr (full per-export logs) → {}",
path.display()
),
Err(e) => {
log::warn!(
"could not write child stderr to {} ({e}); printing inline",
path.display()
);
use std::io::Write;
let mut h = std::io::stderr().lock();
let _ = h.write_all(dump.as_bytes());
let _ = h.flush();
}
}
}
#[allow(clippy::too_many_arguments)] pub fn run(
config_path: &str,
export_name: Option<&str>,
validate: bool,
reconcile: bool,
resume: bool,
force: bool,
params: Option<&std::collections::HashMap<String, String>>,
parallel_exports_cli: bool,
parallel_export_processes_cli: bool,
summary_output: Option<&Path>,
json_output: bool,
) -> Result<()> {
if force && !resume {
log::warn!(
"--force without --resume is a no-op today (force only overrides the resume safety \
gate against a destination prefix whose _SUCCESS is already present)"
);
}
let config = Config::load_with_params(config_path, params)?;
let config_dir = Path::new(config_path)
.parent()
.unwrap_or(Path::new("."))
.to_path_buf();
let selected: Vec<&ExportConfig> = if let Some(name) = export_name {
let e = config
.exports
.iter()
.find(|e| e.name == name)
.ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
vec![e]
} else {
config.exports.iter().collect()
};
let partitioned = partition_expand::any_partitioned(&selected);
let expanded_owned: Vec<ExportConfig>;
let exports: Vec<&ExportConfig> = if partitioned {
expanded_owned = partition_expand::expand_partitioned_exports(
&selected,
&config.source,
&config_dir,
params,
)?;
expanded_owned.iter().collect()
} else {
selected
};
let opts = RunOptions {
validate,
reconcile,
resume,
force,
params,
};
let name_floor = exports
.iter()
.map(|e| e.name.chars().count())
.max()
.unwrap_or(0);
let process_mode_requested = parallel_export_processes_cli || config.parallel_export_processes;
if partitioned && process_mode_requested {
log::warn!(
"partition_by: --parallel-export-processes is disabled with partitioned exports \
(child processes re-load the config and can't see synthesised partitions); \
running in-process"
);
}
let run_parallel_processes =
process_mode_requested && export_name.is_none() && exports.len() > 1 && !partitioned;
let started_at = chrono::Utc::now();
if run_parallel_processes {
if let Err(e) = StateStore::open(config_path) {
return Err(anyhow::anyhow!(
"state: failed to initialize state DB before spawning children: {:#}",
e
));
}
let (result, child_failures, stderr_dump) =
parallel_children::run_exports_as_child_processes(
config_path,
&exports,
validate,
reconcile,
resume,
force,
params,
name_floor,
);
let finished_at = chrono::Utc::now();
match StateStore::open(config_path) {
Ok(state) => {
let entries =
aggregate::collect_child_entries(&state, &exports, started_at, &child_failures);
let agg = aggregate::build(
entries,
started_at,
finished_at,
Some(config_path),
"parallel-processes",
);
aggregate::print(&agg);
aggregate::persist(&state, &agg, summary_output);
if json_output {
print_json_summary(&agg);
}
}
Err(e) => log::warn!(
"aggregate: cannot open state DB to record run aggregate: {:#}",
e
),
}
emit_child_stderr(&stderr_dump, &config_dir);
return result;
}
let run_parallel = (parallel_exports_cli || config.parallel_exports)
&& export_name.is_none()
&& exports.len() > 1;
let multi_export = export_name.is_none() && exports.len() > 1;
let prev_multi = MULTI_EXPORT_MODE.swap(multi_export, AtomicOrdering::Relaxed);
let prev_concurrent = MULTI_EXPORT_CONCURRENT.swap(run_parallel, AtomicOrdering::Relaxed);
struct ResetMultiExport(bool, bool);
impl Drop for ResetMultiExport {
fn drop(&mut self) {
MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
MULTI_EXPORT_CONCURRENT.store(self.1, AtomicOrdering::Relaxed);
}
}
let _reset_multi = ResetMultiExport(prev_multi, prev_concurrent);
let mut summaries: Vec<RunSummary> = Vec::with_capacity(exports.len());
let mut failures: Vec<anyhow::Error> = Vec::new();
if run_parallel {
log::info!(
"running {} exports in parallel (separate state DB connection per export)",
exports.len()
);
if let Err(e) = StateStore::open(config_path) {
return Err(anyhow::anyhow!(
"state: failed to initialize state DB before spawning export threads: {:#}",
e
));
}
let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
ipc::install_in_process_tx(tx);
let ui_thread = std::thread::Builder::new()
.name("rivet-ui".to_string())
.spawn(move || parent_ui::run_ui(rx, name_floor))
.ok();
let collected: std::sync::Mutex<Vec<(Result<()>, RunSummary)>> =
std::sync::Mutex::new(Vec::with_capacity(exports.len()));
std::thread::scope(|s| {
let mut handles = Vec::new();
for &export in &exports {
handles.push(s.spawn(|| {
let state = match StateStore::open(config_path) {
Ok(s) => s,
Err(e) => {
let err = anyhow::anyhow!(
"export '{}': failed to open state database: {:#}",
export.name,
e
);
let summary = job::synthetic_failed_summary(&export.name, &err);
return (Err(err), summary);
}
};
job::run_export_job(config_path, &config, export, &state, &config_dir, &opts)
}));
}
for h in handles {
match h.join() {
Ok(pair) => collected.lock().unwrap().push(pair),
Err(payload) => std::panic::resume_unwind(payload),
}
}
});
ipc::clear_in_process_tx();
if let Some(t) = ui_thread {
let _ = t.join();
}
for (res, summary) in collected.into_inner().unwrap() {
if let Err(e) = res {
failures.push(e);
}
summaries.push(summary);
}
} else {
let state = StateStore::open(config_path)?;
let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
ipc::install_in_process_tx(tx);
let ui_thread = std::thread::Builder::new()
.name("rivet-ui".to_string())
.spawn(move || parent_ui::run_ui(rx, name_floor))
.ok();
for export in &exports {
let (res, summary) =
job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
if let Err(e) = res {
failures.push(e);
}
summaries.push(summary);
}
ipc::clear_in_process_tx();
if let Some(t) = ui_thread {
let _ = t.join();
}
if exports.len() == 1
&& let Some(summary) = summaries.last()
{
summary.print_stderr_block();
}
}
let finished_at = chrono::Utc::now();
if exports.len() > 1 {
let parallel_mode = if run_parallel {
"parallel-threads"
} else {
"sequential"
};
let entries: Vec<_> = summaries
.iter()
.map(aggregate::entry_from_summary)
.collect();
let agg = aggregate::build(
entries,
started_at,
finished_at,
Some(config_path),
parallel_mode,
);
aggregate::print(&agg);
match StateStore::open(config_path) {
Ok(state) => aggregate::persist(&state, &agg, summary_output),
Err(e) => log::warn!(
"aggregate: cannot open state DB to record run aggregate: {:#}",
e
),
}
if json_output {
print_json_summary(&agg);
}
} else if summary_output.is_some() || json_output {
let entries: Vec<_> = summaries
.iter()
.map(aggregate::entry_from_summary)
.collect();
let agg = aggregate::build(
entries,
started_at,
finished_at,
Some(config_path),
"sequential",
);
if let Some(out) = summary_output
&& let Err(e) =
std::fs::write(out, serde_json::to_string_pretty(&agg).unwrap_or_default())
{
log::warn!(
"aggregate: failed to write summary JSON to {}: {:#}",
out.display(),
e
);
}
if json_output {
print_json_summary(&agg);
}
}
if !failures.is_empty() {
let primary_idx = representative_failure_idx(&failures).unwrap();
let primary = failures.remove(primary_idx);
if failures.is_empty() {
return Err(primary);
}
let others = failures
.iter()
.map(|e| format!("{e:#}"))
.collect::<Vec<_>>()
.join("; ");
return Err(primary.context(format!(
"{} export(s) failed; representative error follows (also: {others})",
failures.len() + 1
)));
}
Ok(())
}
pub(crate) fn run_waves(
config_path: &str,
force: bool,
parallel_cli: bool,
resume: bool,
) -> Result<()> {
let config = Config::load_with_params(config_path, None)?;
let config_dir = Path::new(config_path)
.parent()
.unwrap_or(Path::new("."))
.to_path_buf();
let opts = RunOptions {
validate: false,
reconcile: false,
resume,
force,
params: None,
};
let by_wave = group_exports_by_wave(&config.exports);
let total: usize = by_wave.iter().map(|(_, v)| v.len()).sum();
if total == 0 {
log::warn!("apply: config '{config_path}' defines no exports");
return Ok(());
}
let parallel = parallel_cli || config.parallel_export_processes;
let prev_multi = MULTI_EXPORT_MODE.swap(total > 1 && !parallel, AtomicOrdering::Relaxed);
struct ResetMulti(bool);
impl Drop for ResetMulti {
fn drop(&mut self) {
MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
}
}
let _reset = ResetMulti(prev_multi);
let state = StateStore::open(config_path)?;
let started_at = chrono::Utc::now();
let mut summaries: Vec<RunSummary> = Vec::with_capacity(total);
let mut failures: Vec<anyhow::Error> = Vec::new();
let mut all_exports: Vec<&ExportConfig> = Vec::with_capacity(total);
let mut child_failures: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
let mut combined_stderr = String::new();
for (wave, exports) in &by_wave {
let label = if *wave == u32::MAX {
"unscheduled".to_string()
} else {
wave.to_string()
};
let pending: Vec<&ExportConfig> = exports
.iter()
.copied()
.filter(|e| {
let done = resume && finalize::destination_has_success(&e.destination);
if done {
log::info!(
"apply: skipping '{}' — destination already complete (_SUCCESS)",
e.name
);
}
!done
})
.collect();
if pending.is_empty() {
continue;
}
if total > 1 {
println!("\n ── wave {label} · {} export(s) ──", pending.len());
}
if parallel {
let (safe, lone): (Vec<&ExportConfig>, Vec<&ExportConfig>) =
pending.iter().copied().partition(|e| is_parallel_safe(e));
log::info!(
"apply: wave {} — {} parallel-safe export(s) in parallel, {} run alone",
label,
safe.len(),
lone.len()
);
let mut batches: Vec<Vec<&ExportConfig>> = lone.iter().map(|e| vec![*e]).collect();
if !safe.is_empty() {
batches.push(safe);
}
let wave_name_floor = pending
.iter()
.map(|e| e.name.chars().count())
.max()
.unwrap_or(0);
for batch in &batches {
let (result, cf, stderr_dump) = parallel_children::run_exports_as_child_processes(
config_path,
batch,
false,
false,
resume,
force,
None,
wave_name_floor,
);
child_failures.extend(cf);
combined_stderr.push_str(&stderr_dump);
if let Err(e) = result {
failures.push(e);
}
}
all_exports.extend_from_slice(&pending);
} else {
log::info!(
"apply: wave {} — {} export(s), sequential",
label,
pending.len()
);
for export in &pending {
let (res, summary) =
job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
if let Err(e) = res {
failures.push(e);
}
summaries.push(summary);
}
}
}
let finished_at = chrono::Utc::now();
if total > 1 {
let entries = if parallel {
aggregate::collect_child_entries(&state, &all_exports, started_at, &child_failures)
} else {
summaries
.iter()
.map(aggregate::entry_from_summary)
.collect()
};
let agg = aggregate::build(
entries,
started_at,
finished_at,
Some(config_path),
if parallel {
"wave-parallel-processes"
} else {
"wave-sequential"
},
);
aggregate::print(&agg);
aggregate::persist(&state, &agg, None);
}
emit_child_stderr(&combined_stderr, &config_dir);
if !failures.is_empty() {
let primary_idx = representative_failure_idx(&failures).unwrap();
let primary = failures.remove(primary_idx);
if failures.is_empty() {
return Err(primary);
}
let others = failures
.iter()
.map(|e| format!("{e:#}"))
.collect::<Vec<_>>()
.join("; ");
return Err(primary.context(format!(
"{} export(s) failed across waves; representative error follows (also: {others})",
failures.len() + 1
)));
}
Ok(())
}
fn group_exports_by_wave(exports: &[ExportConfig]) -> Vec<(u32, Vec<&ExportConfig>)> {
let mut by_wave: std::collections::BTreeMap<u32, Vec<&ExportConfig>> =
std::collections::BTreeMap::new();
for e in exports {
by_wave
.entry(e.wave.unwrap_or(u32::MAX))
.or_default()
.push(e);
}
by_wave.into_iter().collect()
}
fn is_parallel_safe(export: &ExportConfig) -> bool {
export.parallel_safe.unwrap_or(false)
}
#[cfg(test)]
mod wave_grouping_tests {
use super::{group_exports_by_wave, is_parallel_safe};
#[test]
fn groups_ascending_with_unscheduled_last() {
let mut a = crate::config::sample_export("a");
a.wave = Some(3);
let mut b = crate::config::sample_export("b");
b.wave = None; let mut c = crate::config::sample_export("c");
c.wave = Some(1);
let mut d = crate::config::sample_export("d");
d.wave = Some(1);
let exports = vec![a, b, c, d];
let grouped = group_exports_by_wave(&exports);
let waves: Vec<u32> = grouped.iter().map(|(w, _)| *w).collect();
assert_eq!(waves, vec![1, 3, u32::MAX], "ascending, unscheduled last");
let wave1: Vec<&str> = grouped[0].1.iter().map(|e| e.name.as_str()).collect();
assert_eq!(wave1, vec!["c", "d"], "same-wave keeps input order");
assert_eq!(grouped[2].1.len(), 1);
assert_eq!(
grouped[2].1[0].name, "b",
"the no-wave export lands in the last group"
);
}
#[test]
fn parallel_safe_reads_the_plan_flag() {
let unset = crate::config::sample_export("unset");
assert!(!is_parallel_safe(&unset), "None is treated as not-safe");
let mut safe = crate::config::sample_export("safe");
safe.parallel_safe = Some(true);
assert!(is_parallel_safe(&safe), "parallel_safe: true → concurrent");
let mut not_safe = crate::config::sample_export("heavy");
not_safe.parallel_safe = Some(false);
assert!(!is_parallel_safe(¬_safe), "parallel_safe: false → alone");
}
}
fn representative_failure_idx(failures: &[anyhow::Error]) -> Option<usize> {
let rank = |e: &anyhow::Error| match crate::error::classify_exit(e) {
c if c == crate::error::ExitClass::DataIntegrity.code() => 3,
c if c == crate::error::ExitClass::SchemaDrift.code() => 2,
c if c == crate::error::ExitClass::Retryable.code() => 1,
_ => 0,
};
(0..failures.len()).max_by_key(|&i| rank(&failures[i]))
}
#[cfg(test)]
mod representative_failure_tests {
use super::representative_failure_idx;
use crate::error::{DataIntegrityError, ExitClass, SchemaDriftError, classify_exit};
#[test]
fn empty_batch_has_no_representative() {
assert_eq!(representative_failure_idx(&[]), None);
}
#[test]
fn data_integrity_outranks_everything_regardless_of_position() {
let failures = vec![
anyhow::anyhow!("generic boom"),
SchemaDriftError::new("shape changed").into(),
anyhow::anyhow!("another generic"),
DataIntegrityError::new("reconcile mismatch").into(),
];
let idx = representative_failure_idx(&failures).unwrap();
assert_eq!(
classify_exit(&failures[idx]),
ExitClass::DataIntegrity.code(),
"a mixed batch must surface the data-integrity (exit 3) failure"
);
}
#[test]
fn schema_drift_outranks_retryable_and_generic() {
let failures = vec![
anyhow::anyhow!("generic"),
SchemaDriftError::new("drift").into(),
];
let idx = representative_failure_idx(&failures).unwrap();
assert_eq!(classify_exit(&failures[idx]), ExitClass::SchemaDrift.code());
}
}