use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};
const DEFAULT_N: usize = 16;
const DEFAULT_BUDGET_SECS: u64 = 90;
const GATE_ENV: &str = "ZCCACHE_RUN_REPRO_691";
const N_ENV: &str = "ZCCACHE_SPAWN_STORM_N";
const BUDGET_ENV: &str = "ZCCACHE_SPAWN_STORM_BUDGET_SECS";
fn target_bin_dir() -> PathBuf {
let mut p = std::env::current_exe().expect("current_exe");
p.pop(); p.pop(); p
}
fn binary_path(stem: &str) -> PathBuf {
let mut p = target_bin_dir();
if cfg!(windows) {
p.push(format!("{stem}.exe"));
} else {
p.push(stem);
}
p
}
fn stop_daemon(zccache: &Path, cache_dir: &Path) {
let _ = Command::new(zccache)
.arg("stop")
.env("ZCCACHE_CACHE_DIR", cache_dir)
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
fn run_one_wrapper(
zccache: &Path,
echo_shim: &Path,
cache_dir: &Path,
payload: &[u8],
gate: &Barrier,
) -> std::io::Result<std::process::Output> {
let mut cmd = Command::new(zccache);
cmd.arg(echo_shim)
.arg("0")
.env("ZCCACHE_CACHE_DIR", cache_dir)
.env_remove("ZCCACHE_SESSION_ID")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
gate.wait();
let mut child = cmd.spawn()?;
{
let mut stdin = child.stdin.take().expect("piped stdin");
stdin.write_all(payload)?;
}
child.wait_with_output()
}
fn list_spawn_logs(logs_dir: &Path) -> Vec<PathBuf> {
let mut out = Vec::new();
let Ok(entries) = std::fs::read_dir(logs_dir) else {
return out;
};
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().into_owned();
if name.starts_with("daemon-spawn-") && name.ends_with(".log") {
out.push(entry.path());
}
}
out.sort();
out
}
fn parse_lifecycle_events(logs_dir: &Path) -> Vec<serde_json::Value> {
let path = logs_dir.join("daemon-lifecycle.log");
let Ok(contents) = std::fs::read_to_string(&path) else {
return Vec::new();
};
contents
.lines()
.filter(|l| !l.trim().is_empty())
.map(|l| serde_json::from_str(l).expect("daemon-lifecycle.log line must be valid JSON"))
.collect()
}
fn env_usize(name: &str, default: usize) -> usize {
std::env::var(name)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn env_secs(name: &str, default: u64) -> Duration {
let secs = std::env::var(name)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default);
Duration::from_secs(secs)
}
#[test]
#[ignore = "reproducer for #691 — gated on ZCCACHE_RUN_REPRO_691"]
fn parallel_wrappers_must_share_one_daemon() {
if std::env::var(GATE_ENV).is_err() {
eprintln!(
"skipping: set {GATE_ENV}=1 to run this reproducer (it is expected to FAIL on \
main until the #691 spawn-coordination fix lands)"
);
return;
}
let zccache = binary_path("zccache");
let echo_shim = binary_path("echo_shim");
if !zccache.exists() || !echo_shim.exists() {
eprintln!(
"skipping: required binaries not built ({} or {})",
zccache.display(),
echo_shim.display()
);
return;
}
let n = env_usize(N_ENV, DEFAULT_N);
assert!(n >= 2, "{N_ENV}={n} — need at least 2 to race");
let budget = env_secs(BUDGET_ENV, DEFAULT_BUDGET_SECS);
let cache_dir = tempfile::Builder::new()
.prefix("zccache-spawn-storm-")
.tempdir()
.expect("tempdir");
stop_daemon(&zccache, cache_dir.path());
let gate = Arc::new(Barrier::new(n));
let payload: &[u8] = b"spawn-storm-repro-691\n";
let started = Instant::now();
let handles: Vec<thread::JoinHandle<std::process::Output>> = (0..n)
.map(|i| {
let zccache = zccache.clone();
let echo_shim = echo_shim.clone();
let cache_path = cache_dir.path().to_path_buf();
let gate = Arc::clone(&gate);
thread::Builder::new()
.name(format!("spawn-storm-{i}"))
.spawn(move || {
run_one_wrapper(&zccache, &echo_shim, &cache_path, payload, &gate)
.unwrap_or_else(|e| panic!("wrapper {i} spawn failed: {e}"))
})
.expect("thread spawn")
})
.collect();
let mut failures: Vec<String> = Vec::new();
for (i, h) in handles.into_iter().enumerate() {
let out = h.join().unwrap_or_else(|_| panic!("thread {i} panicked"));
if !out.status.success() {
failures.push(format!(
"wrapper {i} exit={:?} stderr={}",
out.status.code(),
String::from_utf8_lossy(&out.stderr).trim()
));
}
}
let elapsed = started.elapsed();
assert!(
elapsed < budget,
"spawn-storm reproducer exceeded {budget:?} (took {elapsed:?}); raise \
{BUDGET_ENV} or lower {N_ENV}={n} if this is expected on the runner"
);
assert!(
failures.is_empty(),
"{} of {} wrapper invocations failed:\n{}",
failures.len(),
n,
failures.join("\n")
);
stop_daemon(&zccache, cache_dir.path());
let logs_dir = cache_dir.path().join("logs");
assert!(
logs_dir.exists(),
"logs/ directory must exist after running the wrapper"
);
let spawn_logs = list_spawn_logs(&logs_dir);
let events = parse_lifecycle_events(&logs_dir);
let spawn_events: Vec<&serde_json::Value> =
events.iter().filter(|e| e["event"] == "spawn").collect();
let attempt_events: Vec<&serde_json::Value> = events
.iter()
.filter(|e| e["event"] == "spawn-attempt")
.collect();
assert_eq!(
spawn_logs.len(),
1,
"expected exactly one daemon-spawn-*.log under #691 fix, got {} logs and {} \
lifecycle spawn events ({} attempts). Files: {:?}",
spawn_logs.len(),
spawn_events.len(),
attempt_events.len(),
spawn_logs
.iter()
.map(|p| p.file_name().map(|n| n.to_string_lossy().into_owned()))
.collect::<Vec<_>>()
);
assert_eq!(
spawn_events.len(),
1,
"expected exactly one event:\"spawn\" under #691 fix, got {} (attempts: {}) — \
full lifecycle: {:#?}",
spawn_events.len(),
attempt_events.len(),
events
);
}