use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::path::Path;
use std::process::{Command, Stdio};
use std::sync::mpsc;
use crate::config::ExportConfig;
use crate::error::Result;
use crate::state::StateStore;
use super::ipc::{ChildEvent, ENV_IPC_EVENTS};
use super::parent_ui::{ChildWaitStatus, UiMessage};
#[allow(clippy::too_many_arguments)] pub(super) fn run_exports_as_child_processes(
config_path: &str,
exports: &[&ExportConfig],
validate: bool,
reconcile: bool,
resume: bool,
force: bool,
params: Option<&std::collections::HashMap<String, String>>,
) -> (Result<()>, HashMap<String, String>, String) {
let exe = match std::env::current_exe() {
Ok(p) => p,
Err(e) => {
return (
Err(anyhow::anyhow!(
"failed to resolve rivet executable for child processes: {:#}",
e
)),
HashMap::new(),
String::new(),
);
}
};
let config_arg = Path::new(config_path)
.canonicalize()
.unwrap_or_else(|_| std::path::PathBuf::from(config_path));
if let Err(e) = StateStore::open(config_path) {
return (
Err(anyhow::anyhow!(
"failed to open / migrate state DB before spawning children: {:#}",
e
)),
HashMap::new(),
String::new(),
);
}
log::info!(
"running {} exports as separate rivet processes (each child: single `--export`; SQLite state WAL allows concurrent writers; IPC card UI on)",
exports.len()
);
let (tx, rx) = mpsc::channel::<UiMessage>();
let ui_handle = std::thread::Builder::new()
.name("rivet-ipc-ui".into())
.spawn(move || super::parent_ui::run_ui(rx))
.ok();
const CHILD_STDERR_LINE_CAP: usize = 5_000;
type StderrBuf = std::sync::Arc<std::sync::Mutex<(Vec<String>, usize)>>;
let stderr_bufs: std::sync::Arc<HashMap<String, StderrBuf>> = std::sync::Arc::new(
exports
.iter()
.map(|e| {
(
e.name.clone(),
std::sync::Arc::new(std::sync::Mutex::new((Vec::new(), 0usize))),
)
})
.collect(),
);
let mut children: Vec<(String, std::process::Child)> = Vec::with_capacity(exports.len());
let mut reader_handles: Vec<std::thread::JoinHandle<()>> = Vec::with_capacity(exports.len());
let mut spawn_failures: HashMap<String, String> = HashMap::new();
for export in exports {
let mut cmd = Command::new(&exe);
cmd.arg("run")
.arg("--config")
.arg(&config_arg)
.arg("--export")
.arg(export.name.as_str());
if validate {
cmd.arg("--validate");
}
if reconcile {
cmd.arg("--reconcile");
}
if resume {
cmd.arg("--resume");
}
if force {
cmd.arg("--force");
}
if let Some(p) = params {
for (k, v) in p {
cmd.arg("--param").arg(format!("{k}={v}"));
}
}
cmd.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env(ENV_IPC_EVENTS, "1");
log::debug!("spawning child for export '{}': {:?}", export.name, cmd);
match cmd.spawn() {
Ok(mut child) => {
if let Some(stdout) = child.stdout.take() {
let tx = tx.clone();
let export_name = export.name.clone();
let h = std::thread::Builder::new()
.name(format!("rivet-ipc-rx-{}", export.name))
.spawn(move || {
let reader = BufReader::new(stdout);
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(e) => {
log::debug!(
"ipc: child '{}' stdout read error: {:#}",
export_name,
e
);
break;
}
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<ChildEvent>(trimmed) {
Ok(ev) => {
let _ = tx.send(UiMessage::Event(ev));
}
Err(e) => {
log::debug!(
"ipc: child '{}' emitted unparsable line: {} ({:#})",
export_name,
trimmed,
e
);
}
}
}
})
.ok();
if let Some(h) = h {
reader_handles.push(h);
}
}
if let Some(stderr) = child.stderr.take() {
let export_name = export.name.clone();
let buf = std::sync::Arc::clone(
stderr_bufs.get(&export_name).expect("buf pre-allocated"),
);
let h = std::thread::Builder::new()
.name(format!("rivet-ipc-err-{}", export.name))
.spawn(move || {
let reader = BufReader::new(stderr);
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => break,
};
let mut guard = match buf.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if guard.0.len() >= CHILD_STDERR_LINE_CAP {
guard.1 += 1;
} else {
guard.0.push(line);
}
}
})
.ok();
if let Some(h) = h {
reader_handles.push(h);
}
}
children.push((export.name.clone(), child));
}
Err(e) => {
spawn_failures.insert(export.name.clone(), format!("spawn failed: {e:#}"));
}
}
}
let mut failures = Vec::new();
let mut wait_failures: HashMap<String, String> = HashMap::new();
type WaitOutcome = (String, std::io::Result<std::process::ExitStatus>);
let mut reaper_handles: Vec<std::thread::JoinHandle<WaitOutcome>> =
Vec::with_capacity(children.len());
for (name, mut child) in children {
let handle = std::thread::Builder::new()
.name(format!("rivet-reap-{}", name))
.spawn(move || {
let status = child.wait();
(name, status)
});
match handle {
Ok(h) => reaper_handles.push(h),
Err(e) => {
log::debug!("ipc: failed to spawn reaper thread: {:#}", e);
}
}
}
for h in reaper_handles {
let (name, status) = match h.join() {
Ok(pair) => pair,
Err(payload) => std::panic::resume_unwind(payload),
};
let status = match status {
Ok(s) => s,
Err(e) => {
let msg = format!("wait failed: {e:#}");
failures.push(format!("export '{name}': {msg}"));
wait_failures.insert(name.clone(), msg.clone());
let _ = tx.send(UiMessage::ChildClosed {
export_name: name,
wait_status: ChildWaitStatus::Failed(msg),
});
continue;
}
};
if !status.success() {
let code = status
.code()
.map(|c| c.to_string())
.unwrap_or_else(|| "signal".to_string());
let msg = format!("exited with status {code}");
failures.push(format!("export '{name}' {msg}"));
wait_failures.insert(name.clone(), msg.clone());
let _ = tx.send(UiMessage::ChildClosed {
export_name: name,
wait_status: ChildWaitStatus::Failed(msg),
});
} else {
let _ = tx.send(UiMessage::ChildClosed {
export_name: name,
wait_status: ChildWaitStatus::Success,
});
}
}
drop(tx);
for h in reader_handles {
let _ = h.join();
}
if let Some(h) = ui_handle {
let _ = h.join();
}
let mut stderr_snapshot: HashMap<String, Vec<String>> = HashMap::new();
let mut truncated_snapshot: HashMap<String, usize> = HashMap::new();
for (name, buf) in stderr_bufs.as_ref() {
let guard = match buf.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if !guard.0.is_empty() {
stderr_snapshot.insert(name.clone(), guard.0.clone());
}
if guard.1 > 0 {
truncated_snapshot.insert(name.clone(), guard.1);
}
}
let stderr_dump = render_child_stderr(exports, &stderr_snapshot, &truncated_snapshot);
let mut all_failures = spawn_failures;
all_failures.extend(wait_failures);
for (name, msg) in &all_failures {
if !failures.iter().any(|f| f.contains(name)) {
failures.push(format!("export '{name}': {msg}"));
}
}
let result = if failures.is_empty() {
Ok(())
} else {
Err(anyhow::anyhow!("{}", failures.join("; ")))
};
(result, all_failures, stderr_dump)
}
fn render_child_stderr(
exports: &[&ExportConfig],
buffers: &HashMap<String, Vec<String>>,
truncated: &HashMap<String, usize>,
) -> String {
let any = exports
.iter()
.any(|e| buffers.get(&e.name).is_some_and(|v| !v.is_empty()));
if !any {
return String::new();
}
let mut out = String::new();
out.push('\n');
out.push_str(" child stderr (captured to keep the live card stack stable):\n");
for export in exports {
let Some(lines) = buffers.get(&export.name) else {
continue;
};
if lines.is_empty() {
continue;
}
out.push_str(&format!(" ── {} ──\n", export.name));
for line in lines {
out.push_str(" | ");
out.push_str(line);
out.push('\n');
}
if let Some(extra) = truncated.get(&export.name) {
out.push_str(&format!(
" | … (truncated, {} more line(s) dropped)\n",
extra
));
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
fn make_export(name: &str) -> crate::config::ExportConfig {
let yaml = format!(
"name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n type: local\n path: /tmp\n"
);
serde_yaml_ng::from_str(&yaml).expect("parse test ExportConfig")
}
#[test]
fn empty_buffers_returns_empty_string() {
let exp = make_export("orders");
let exports = vec![&exp];
let out = render_child_stderr(&exports, &HashMap::new(), &HashMap::new());
assert!(out.is_empty(), "no stderr → empty output, got: {out:?}");
}
#[test]
fn single_export_with_stderr_lines_rendered() {
let exp = make_export("orders");
let exports = vec![&exp];
let mut buffers = HashMap::new();
buffers.insert(
"orders".to_string(),
vec!["INFO starting".to_string(), "WARN slow query".to_string()],
);
let out = render_child_stderr(&exports, &buffers, &HashMap::new());
assert!(out.contains("── orders ──"), "should have header: {out}");
assert!(out.contains("INFO starting"), "should have line 1: {out}");
assert!(out.contains("WARN slow query"), "should have line 2: {out}");
assert!(out.contains("| "), "lines prefixed with |: {out}");
}
#[test]
fn truncated_count_appended() {
let exp = make_export("payments");
let exports = vec![&exp];
let mut buffers = HashMap::new();
buffers.insert("payments".to_string(), vec!["some line".to_string()]);
let mut truncated = HashMap::new();
truncated.insert("payments".to_string(), 42usize);
let out = render_child_stderr(&exports, &buffers, &truncated);
assert!(
out.contains("42 more line(s) dropped"),
"truncation note: {out}"
);
}
#[test]
fn export_not_in_buffers_is_skipped() {
let exp = make_export("users");
let exports = vec![&exp];
let mut buffers = HashMap::new();
buffers.insert("other".to_string(), vec!["line".to_string()]);
let out = render_child_stderr(&exports, &buffers, &HashMap::new());
assert!(out.is_empty(), "unrelated export not rendered: {out:?}");
}
#[test]
fn multiple_exports_ordering_matches_exports_slice() {
let exp_a = make_export("alpha");
let exp_b = make_export("beta");
let exports = vec![&exp_a, &exp_b];
let mut buffers = HashMap::new();
buffers.insert("alpha".to_string(), vec!["line_a".to_string()]);
buffers.insert("beta".to_string(), vec!["line_b".to_string()]);
let out = render_child_stderr(&exports, &buffers, &HashMap::new());
let pos_a = out.find("alpha").expect("alpha in output");
let pos_b = out.find("beta").expect("beta in output");
assert!(pos_a < pos_b, "alpha rendered before beta");
}
#[test]
fn export_with_empty_lines_vec_not_rendered() {
let exp = make_export("events");
let exports = vec![&exp];
let mut buffers = HashMap::new();
buffers.insert("events".to_string(), vec![]);
let out = render_child_stderr(&exports, &buffers, &HashMap::new());
assert!(out.is_empty(), "empty lines vec → no output: {out:?}");
}
}