use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crate::config::{Config, ExportConfig};
use crate::error::Result;
use crate::state::StateStore;
use super::summary::RunSummary;
use super::{aggregate, ipc, job, parallel_children, parent_ui};
#[derive(Debug, Clone, Copy)]
pub struct RunOptions<'a> {
pub validate: bool,
pub reconcile: bool,
pub resume: bool,
pub force: bool,
pub params: Option<&'a std::collections::HashMap<String, String>>,
}
pub(crate) static MULTI_EXPORT_MODE: AtomicBool = AtomicBool::new(false);
pub(crate) static MULTI_EXPORT_CONCURRENT: AtomicBool = AtomicBool::new(false);
pub(crate) fn multi_export_mode() -> bool {
MULTI_EXPORT_MODE.load(AtomicOrdering::Relaxed)
}
#[allow(dead_code)] pub(crate) fn multi_export_concurrent() -> bool {
MULTI_EXPORT_CONCURRENT.load(AtomicOrdering::Relaxed)
}
fn print_json_summary(agg: &crate::state::RunAggregate) {
match serde_json::to_string_pretty(agg) {
Ok(json) => println!("{json}"),
Err(e) => eprintln!(
"rivet: error: failed to serialize run summary as JSON: {:#}",
e
),
}
}
#[allow(clippy::too_many_arguments)] pub fn run(
config_path: &str,
export_name: Option<&str>,
validate: bool,
reconcile: bool,
resume: bool,
force: bool,
params: Option<&std::collections::HashMap<String, String>>,
parallel_exports_cli: bool,
parallel_export_processes_cli: bool,
summary_output: Option<&Path>,
json_output: bool,
) -> Result<()> {
if force && !resume {
log::warn!(
"--force without --resume is a no-op today (force only overrides the resume safety \
gate against a destination prefix whose _SUCCESS is already present)"
);
}
let config = Config::load_with_params(config_path, params)?;
let config_dir = Path::new(config_path)
.parent()
.unwrap_or(Path::new("."))
.to_path_buf();
let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
let e = config
.exports
.iter()
.find(|e| e.name == name)
.ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
vec![e]
} else {
config.exports.iter().collect()
};
let opts = RunOptions {
validate,
reconcile,
resume,
force,
params,
};
let run_parallel_processes = (parallel_export_processes_cli
|| config.parallel_export_processes)
&& export_name.is_none()
&& exports.len() > 1;
let started_at = chrono::Utc::now();
if run_parallel_processes {
if let Err(e) = StateStore::open(config_path) {
return Err(anyhow::anyhow!(
"state: failed to initialize state DB before spawning children: {:#}",
e
));
}
let (result, child_failures, stderr_dump) =
parallel_children::run_exports_as_child_processes(
config_path,
&exports,
validate,
reconcile,
resume,
force,
params,
);
let finished_at = chrono::Utc::now();
match StateStore::open(config_path) {
Ok(state) => {
let entries =
aggregate::collect_child_entries(&state, &exports, started_at, &child_failures);
let agg = aggregate::build(
entries,
started_at,
finished_at,
Some(config_path),
"parallel-processes",
);
aggregate::print(&agg);
aggregate::persist(&state, &agg, summary_output);
if json_output {
print_json_summary(&agg);
}
}
Err(e) => log::warn!(
"aggregate: cannot open state DB to record run aggregate: {:#}",
e
),
}
if !stderr_dump.is_empty() {
use std::io::Write;
let mut h = std::io::stderr().lock();
let _ = h.write_all(stderr_dump.as_bytes());
let _ = h.flush();
}
return result;
}
let run_parallel = (parallel_exports_cli || config.parallel_exports)
&& export_name.is_none()
&& exports.len() > 1;
let multi_export = export_name.is_none() && exports.len() > 1;
let prev_multi = MULTI_EXPORT_MODE.swap(multi_export, AtomicOrdering::Relaxed);
let prev_concurrent = MULTI_EXPORT_CONCURRENT.swap(run_parallel, AtomicOrdering::Relaxed);
struct ResetMultiExport(bool, bool);
impl Drop for ResetMultiExport {
fn drop(&mut self) {
MULTI_EXPORT_MODE.store(self.0, AtomicOrdering::Relaxed);
MULTI_EXPORT_CONCURRENT.store(self.1, AtomicOrdering::Relaxed);
}
}
let _reset_multi = ResetMultiExport(prev_multi, prev_concurrent);
let mut summaries: Vec<RunSummary> = Vec::with_capacity(exports.len());
let mut failures: Vec<String> = Vec::new();
if run_parallel {
log::info!(
"running {} exports in parallel (separate state DB connection per export)",
exports.len()
);
if let Err(e) = StateStore::open(config_path) {
return Err(anyhow::anyhow!(
"state: failed to initialize state DB before spawning export threads: {:#}",
e
));
}
let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
ipc::install_in_process_tx(tx);
let ui_thread = std::thread::Builder::new()
.name("rivet-ui".to_string())
.spawn(move || parent_ui::run_ui(rx))
.ok();
let collected: std::sync::Mutex<Vec<(Result<()>, RunSummary)>> =
std::sync::Mutex::new(Vec::with_capacity(exports.len()));
std::thread::scope(|s| {
let mut handles = Vec::new();
for &export in &exports {
handles.push(s.spawn(|| {
let state = match StateStore::open(config_path) {
Ok(s) => s,
Err(e) => {
let err = anyhow::anyhow!(
"export '{}': failed to open state database: {:#}",
export.name,
e
);
let summary = job::synthetic_failed_summary(&export.name, &err);
return (Err(err), summary);
}
};
job::run_export_job(config_path, &config, export, &state, &config_dir, &opts)
}));
}
for h in handles {
match h.join() {
Ok(pair) => collected.lock().unwrap().push(pair),
Err(payload) => std::panic::resume_unwind(payload),
}
}
});
ipc::clear_in_process_tx();
if let Some(t) = ui_thread {
let _ = t.join();
}
for (res, summary) in collected.into_inner().unwrap() {
if let Err(e) = res {
failures.push(format!("{e:#}"));
}
summaries.push(summary);
}
} else {
let state = StateStore::open(config_path)?;
let (tx, rx) = std::sync::mpsc::channel::<parent_ui::UiMessage>();
ipc::install_in_process_tx(tx);
let ui_thread = std::thread::Builder::new()
.name("rivet-ui".to_string())
.spawn(move || parent_ui::run_ui(rx))
.ok();
for export in &exports {
let (res, summary) =
job::run_export_job(config_path, &config, export, &state, &config_dir, &opts);
if let Err(e) = res {
failures.push(format!("{e:#}"));
}
summaries.push(summary);
}
ipc::clear_in_process_tx();
if let Some(t) = ui_thread {
let _ = t.join();
}
if exports.len() == 1
&& let Some(summary) = summaries.last()
{
summary.print_stderr_block();
}
}
let finished_at = chrono::Utc::now();
if exports.len() > 1 {
let parallel_mode = if run_parallel {
"parallel-threads"
} else {
"sequential"
};
let entries: Vec<_> = summaries
.iter()
.map(aggregate::entry_from_summary)
.collect();
let agg = aggregate::build(
entries,
started_at,
finished_at,
Some(config_path),
parallel_mode,
);
aggregate::print(&agg);
match StateStore::open(config_path) {
Ok(state) => aggregate::persist(&state, &agg, summary_output),
Err(e) => log::warn!(
"aggregate: cannot open state DB to record run aggregate: {:#}",
e
),
}
if json_output {
print_json_summary(&agg);
}
} else if summary_output.is_some() || json_output {
let entries: Vec<_> = summaries
.iter()
.map(aggregate::entry_from_summary)
.collect();
let agg = aggregate::build(
entries,
started_at,
finished_at,
Some(config_path),
"sequential",
);
if let Some(out) = summary_output
&& let Err(e) =
std::fs::write(out, serde_json::to_string_pretty(&agg).unwrap_or_default())
{
log::warn!(
"aggregate: failed to write summary JSON to {}: {:#}",
out.display(),
e
);
}
if json_output {
print_json_summary(&agg);
}
}
if !failures.is_empty() {
anyhow::bail!("{}", failures.join("; "));
}
Ok(())
}