use crate::core::NormalizedPath;
use std::path::Path;
pub fn run_async<T>(
future: impl std::future::Future<Output = Result<T, String>>,
) -> Result<T, String> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("failed to create tokio runtime: {e}"))?
.block_on(future)
}
#[derive(Debug)]
enum VersionCheck {
Ok,
Unreachable,
DaemonOlder { daemon_ver: String },
DaemonNewer,
CommError,
ClientConfigError(String),
}
#[cfg(unix)]
pub async fn connect_client(
endpoint: &str,
) -> Result<crate::ipc::IpcConnection, crate::ipc::IpcError> {
let mut conn = crate::ipc::connect_daemon(endpoint).await?;
conn.set_recv_timeout(crate::ipc::DEFAULT_CLIENT_RECV_TIMEOUT);
Ok(conn)
}
#[cfg(windows)]
pub async fn connect_client(
endpoint: &str,
) -> Result<crate::ipc::IpcClientConnection, crate::ipc::IpcError> {
let mut conn = crate::ipc::connect_daemon(endpoint).await?;
conn.set_recv_timeout(crate::ipc::DEFAULT_CLIENT_RECV_TIMEOUT);
Ok(conn)
}
async fn check_daemon_version(endpoint: &str) -> VersionCheck {
match crate::ipc::daemon_control_roundtrip(
endpoint,
crate::ipc::DaemonControlRequest::Status,
Some(super::status_probe_timeout()),
)
.await
{
Ok(Some(crate::protocol::Response::Status(s))) => {
if s.version == crate::core::VERSION {
return VersionCheck::Ok;
}
let client_ver = crate::core::version::current();
match crate::core::version::Version::parse(&s.version) {
Some(daemon_ver) => match daemon_ver.cmp(&client_ver) {
std::cmp::Ordering::Equal => VersionCheck::Ok,
std::cmp::Ordering::Greater => VersionCheck::DaemonNewer,
std::cmp::Ordering::Less => VersionCheck::DaemonOlder {
daemon_ver: s.version,
},
},
None => VersionCheck::DaemonOlder {
daemon_ver: s.version,
},
}
}
Err(crate::ipc::IpcError::Endpoint(message))
if message.contains(crate::protocol::wire_prost::WIRE_FORMAT_ENV) =>
{
VersionCheck::ClientConfigError(message)
}
Err(err) if crate::cli::client::is_daemon_unreachable_err(&err) => {
VersionCheck::Unreachable
}
_ => VersionCheck::CommError,
}
}
async fn spawn_and_wait(endpoint: &str, reason: &str) -> Result<(), String> {
let daemon_bin = find_daemon_binary().ok_or("cannot find zccache-daemon binary")?;
crate::core::lifecycle::write_event(
crate::core::lifecycle::EVENT_SPAWN_ATTEMPT,
serde_json::json!({
"reason": reason,
"endpoint": endpoint,
"daemon_namespace": crate::core::config::daemon_namespace_label(),
"client_pid": std::process::id(),
}),
);
spawn_daemon(&daemon_bin, endpoint)?;
wait_for_daemon_ready(endpoint).await
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct AdaptiveWaitConfig {
pub poll_interval: std::time::Duration,
pub no_daemon_grace: std::time::Duration,
pub hard_ceiling: std::time::Duration,
}
impl Default for AdaptiveWaitConfig {
fn default() -> Self {
Self {
poll_interval: std::time::Duration::from_millis(100),
no_daemon_grace: std::time::Duration::from_secs(10),
hard_ceiling: std::time::Duration::from_secs(60),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum WaitTick {
Pending,
HardCeilingHit { observed_pid: Option<u32> },
NoDaemonGracePassed,
DaemonExited { pid: u32 },
}
pub(crate) fn classify_wait_tick(
elapsed: std::time::Duration,
daemon_pid: Option<u32>,
last_observed_pid: Option<u32>,
cfg: &AdaptiveWaitConfig,
) -> WaitTick {
if let Some(pid) = daemon_pid {
if elapsed >= cfg.hard_ceiling {
return WaitTick::HardCeilingHit {
observed_pid: Some(pid),
};
}
return WaitTick::Pending;
}
if let Some(pid) = last_observed_pid {
return WaitTick::DaemonExited { pid };
}
if elapsed >= cfg.no_daemon_grace {
return WaitTick::NoDaemonGracePassed;
}
WaitTick::Pending
}
pub async fn wait_for_daemon_ready(endpoint: &str) -> Result<(), String> {
wait_for_daemon_ready_with(
endpoint,
crate::ipc::check_running_daemon,
AdaptiveWaitConfig::default(),
)
.await
}
pub(crate) async fn wait_for_daemon_ready_with(
endpoint: &str,
daemon_alive_check: impl Fn() -> Option<u32>,
cfg: AdaptiveWaitConfig,
) -> Result<(), String> {
let start = std::time::Instant::now();
let mut last_observed_pid: Option<u32> = None;
loop {
tokio::time::sleep(cfg.poll_interval).await;
if connect_client(endpoint).await.is_ok() {
return Ok(());
}
let elapsed = start.elapsed();
let daemon_pid = daemon_alive_check();
if daemon_pid.is_some() {
last_observed_pid = daemon_pid;
}
match classify_wait_tick(elapsed, daemon_pid, last_observed_pid, &cfg) {
WaitTick::Pending => continue,
WaitTick::HardCeilingHit { observed_pid } => {
let pid_str = observed_pid
.map(|p| p.to_string())
.unwrap_or_else(|| "<unknown>".to_string());
return Err(format!(
"daemon process {pid_str} still not accepting connections after {}s (hard cap)",
cfg.hard_ceiling.as_secs()
));
}
WaitTick::NoDaemonGracePassed => {
return Err(format!(
"no daemon lockfile observed within {}s of spawn (spawn likely failed)",
cfg.no_daemon_grace.as_secs()
));
}
WaitTick::DaemonExited { pid } => {
return Err(format!(
"daemon process {pid} exited before accepting connections"
));
}
}
}
}
async fn stop_stale_daemon(endpoint: &str) {
let _ = crate::ipc::daemon_control_roundtrip(
endpoint,
crate::ipc::DaemonControlRequest::Shutdown,
None,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
if let Some(pid) = crate::ipc::check_running_daemon() {
if crate::ipc::force_kill_process(pid).is_ok() {
for _ in 0..50 {
if !crate::ipc::is_process_alive(pid) {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
crate::ipc::remove_lock_file();
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
pub async fn ensure_daemon(endpoint: &str) -> Result<(), String> {
match check_daemon_version(endpoint).await {
VersionCheck::Ok | VersionCheck::DaemonNewer => return Ok(()),
VersionCheck::DaemonOlder { daemon_ver } => {
tracing::info!(
daemon_ver,
client_ver = crate::core::VERSION,
"daemon is older than client, auto-recovering"
);
stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_STALE_VERSION,
)
.await;
}
VersionCheck::CommError => {
tracing::info!("cannot communicate with daemon, auto-recovering");
stop_stale_daemon(endpoint).await;
return spawn_and_wait(endpoint, crate::core::lifecycle::REASON_REPLACED_COMM_ERROR)
.await;
}
VersionCheck::ClientConfigError(message) => return Err(message),
VersionCheck::Unreachable => {}
}
if let Some(pid) = crate::ipc::check_running_daemon() {
let mut backoff = std::time::Duration::from_millis(100);
for _ in 0..20 {
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(std::time::Duration::from_millis(500));
match check_daemon_version(endpoint).await {
VersionCheck::Ok | VersionCheck::DaemonNewer => return Ok(()),
VersionCheck::DaemonOlder { daemon_ver } => {
tracing::info!(
daemon_ver,
client_ver = crate::core::VERSION,
"daemon is older than client during startup, auto-recovering"
);
stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_STALE_VERSION,
)
.await;
}
VersionCheck::CommError => {
stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_COMM_ERROR,
)
.await;
}
VersionCheck::ClientConfigError(message) => return Err(message),
VersionCheck::Unreachable => continue,
}
}
return Err(format!(
"daemon process {pid} exists but not accepting connections after retrying"
));
}
spawn_and_wait(endpoint, crate::core::lifecycle::REASON_INITIAL_START).await
}
fn find_daemon_binary() -> Option<NormalizedPath> {
let name = if cfg!(windows) {
"zccache-daemon.exe"
} else {
"zccache-daemon"
};
if let Ok(exe) = std::env::current_exe() {
if let Some(dir) = exe.parent() {
let candidate = dir.join(name);
if candidate.exists() {
return Some(candidate.into());
}
}
}
which_on_path(name)
}
fn which_on_path(name: &str) -> Option<NormalizedPath> {
let path_var = std::env::var_os("PATH")?;
for dir in std::env::split_paths(&path_var) {
let candidate = dir.join(name);
if candidate.is_file() {
return Some(candidate.into());
}
#[cfg(windows)]
if Path::new(name).extension().is_none() {
let with_exe = dir.join(format!("{name}.exe"));
if with_exe.is_file() {
return Some(with_exe.into());
}
}
}
None
}
#[cfg(not(windows))]
fn apply_cli_spawn_lineage(cmd: &mut std::process::Command) {
for (k, v) in cli_spawn_lineage_env() {
cmd.env(k, v);
}
}
fn cli_spawn_lineage_env() -> Vec<(String, String)> {
const ENV_ORIGINATOR: &str = "RUNNING_PROCESS_ORIGINATOR";
const ENV_LINEAGE: &str = "ZCCACHE_LINEAGE";
const ENV_PARENT_PID: &str = "ZCCACHE_PARENT_PID";
const ENV_CLIENT_PID: &str = "ZCCACHE_CLIENT_PID";
let cli_pid = std::process::id();
let mut out: Vec<(String, String)> = Vec::with_capacity(4);
if std::env::var(ENV_ORIGINATOR).is_err() {
out.push((ENV_ORIGINATOR.to_string(), format!("zccache-cli:{cli_pid}")));
}
let chain = match std::env::var(ENV_LINEAGE) {
Ok(existing)
if existing
.rsplit_once('>')
.map_or(existing.as_str(), |(_, last)| last)
!= cli_pid.to_string() =>
{
format!("{existing}>{cli_pid}")
}
Ok(existing) => existing,
Err(_) => cli_pid.to_string(),
};
out.push((ENV_LINEAGE.to_string(), chain));
out.push((ENV_PARENT_PID.to_string(), cli_pid.to_string()));
out.push((ENV_CLIENT_PID.to_string(), cli_pid.to_string()));
out
}
const RUNTIME_BINARIES_SUBDIR: &str = "runtime-binaries";
#[must_use]
pub fn runtime_binaries_dir() -> NormalizedPath {
crate::core::config::default_cache_dir().join(RUNTIME_BINARIES_SUBDIR)
}
pub fn prepare_daemon_exe(canonical: &Path) -> Result<std::path::PathBuf, std::io::Error> {
prepare_daemon_exe_in(canonical, runtime_binaries_dir().as_path())
}
pub fn prepare_daemon_exe_in(
canonical: &Path,
dir: &Path,
) -> Result<std::path::PathBuf, std::io::Error> {
std::fs::create_dir_all(dir)?;
let rand_id: u32 = std::process::id()
^ std::time::UNIX_EPOCH
.elapsed()
.unwrap_or_default()
.subsec_nanos();
let extension = canonical.extension().and_then(|s| s.to_str()).unwrap_or("");
let file_name = if extension.is_empty() {
format!("zccache-daemon.{rand_id}")
} else {
format!("zccache-daemon.{rand_id}.{extension}")
};
let dest = dir.join(&file_name);
std::fs::copy(canonical, &dest)?;
Ok(dest)
}
pub fn gc_runtime_binaries() {
gc_runtime_binaries_in(runtime_binaries_dir().as_path());
}
pub fn gc_runtime_binaries_in(dir: &Path) {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let _ = std::fs::remove_file(entry.path());
}
}
const DAEMON_SPAWN_LOGS_SUBDIR: &str = "logs";
fn allocate_daemon_spawn_log_path() -> std::path::PathBuf {
let dir = crate::core::config::default_cache_dir().join(DAEMON_SPAWN_LOGS_SUBDIR);
let _ = std::fs::create_dir_all(dir.as_path());
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let pid = std::process::id();
let file_name = match crate::core::config::daemon_namespace() {
Some(namespace) => format!("daemon-spawn-{namespace}-{pid}-{nanos}.log"),
None => format!("daemon-spawn-{pid}-{nanos}.log"),
};
dir.as_path().join(file_name)
}
const LOG_GC_CUTOFF: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
pub fn gc_log_directory() {
let dir = crate::core::config::default_cache_dir().join(DAEMON_SPAWN_LOGS_SUBDIR);
gc_log_directory_in(dir.as_path(), LOG_GC_CUTOFF);
}
pub fn gc_log_directory_in(dir: &Path, cutoff: std::time::Duration) {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
let now = std::time::SystemTime::now();
for entry in entries.flatten() {
let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
continue;
};
if crate::core::lifecycle::is_live_lifecycle_log_name(&name) {
continue;
}
let file_type = entry.file_type();
if file_type.map(|t| !t.is_file()).unwrap_or(true) {
continue;
}
let modified = entry
.metadata()
.and_then(|m| m.modified())
.ok()
.and_then(|t| now.duration_since(t).ok());
if let Some(age) = modified {
if age > cutoff {
let _ = std::fs::remove_file(entry.path());
}
}
}
}
#[deprecated(note = "use gc_log_directory instead — sweeps the full logs/ directory")]
pub fn gc_daemon_spawn_logs() {
gc_log_directory();
}
pub fn spawn_daemon(bin: &Path, endpoint: &str) -> Result<(), String> {
gc_runtime_binaries();
gc_log_directory();
let bin_owned: std::path::PathBuf;
let spawn_bin: &Path = match prepare_daemon_exe(bin) {
Ok(p) => {
bin_owned = p;
&bin_owned
}
Err(_) => bin,
};
let log_path = allocate_daemon_spawn_log_path();
let log_arg = log_path.to_string_lossy().into_owned();
let mut cmd = std::process::Command::new(spawn_bin);
cmd.args([
"--foreground",
"--endpoint",
endpoint,
"--log-file",
&log_arg,
]);
#[cfg(not(windows))]
apply_cli_spawn_lineage(&mut cmd);
#[cfg(windows)]
{
for (k, v) in cli_spawn_lineage_env() {
cmd.env(k, v);
}
}
running_process::spawn_daemon(&mut cmd)
.map(|_child| ())
.map_err(|e| format!("failed to spawn daemon (sanitized): {e}"))
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
fn cfg(grace_ms: u64, ceiling_ms: u64, poll_ms: u64) -> AdaptiveWaitConfig {
AdaptiveWaitConfig {
poll_interval: Duration::from_millis(poll_ms),
no_daemon_grace: Duration::from_millis(grace_ms),
hard_ceiling: Duration::from_millis(ceiling_ms),
}
}
fn dead_endpoint() -> &'static str {
if cfg!(windows) {
r"\\.\pipe\zccache-test-issue-673-dead"
} else {
"/tmp/zccache-test-issue-673-dead.sock"
}
}
#[test]
fn pending_when_daemon_visible_and_below_hard_ceiling() {
let c = cfg(1_000, 5_000, 100);
let tick = classify_wait_tick(Duration::from_millis(500), Some(42), Some(42), &c);
assert_eq!(tick, WaitTick::Pending);
}
#[test]
fn hard_ceiling_hit_only_when_daemon_visible() {
let c = cfg(1_000, 5_000, 100);
let tick = classify_wait_tick(Duration::from_millis(5_000), Some(42), Some(42), &c);
assert_eq!(
tick,
WaitTick::HardCeilingHit {
observed_pid: Some(42)
}
);
}
#[test]
fn daemon_exited_when_previously_observed_then_gone() {
let c = cfg(1_000, 5_000, 100);
let tick = classify_wait_tick(Duration::from_millis(200), None, Some(42), &c);
assert_eq!(tick, WaitTick::DaemonExited { pid: 42 });
}
#[test]
fn no_daemon_grace_passed_when_never_observed_and_grace_elapsed() {
let c = cfg(1_000, 5_000, 100);
let tick = classify_wait_tick(Duration::from_millis(1_000), None, None, &c);
assert_eq!(tick, WaitTick::NoDaemonGracePassed);
}
#[test]
fn pending_when_never_observed_but_grace_still_running() {
let c = cfg(1_000, 5_000, 100);
let tick = classify_wait_tick(Duration::from_millis(500), None, None, &c);
assert_eq!(tick, WaitTick::Pending);
}
#[tokio::test(flavor = "current_thread")]
async fn returns_grace_error_when_no_lockfile_ever_observed() {
let c = cfg(150, 5_000, 25);
let err = wait_for_daemon_ready_with(dead_endpoint(), || None, c)
.await
.expect_err("no-daemon path must fail, not hang");
assert!(
err.contains("no daemon lockfile observed"),
"wrong error: {err}"
);
}
#[tokio::test(flavor = "current_thread")]
async fn returns_hard_ceiling_error_when_daemon_visible_but_unreachable() {
let c = cfg(5_000, 200, 25);
let err = wait_for_daemon_ready_with(dead_endpoint(), || Some(12_345), c)
.await
.expect_err("hard ceiling path must fail, not hang");
assert!(err.contains("hard cap"), "wrong error: {err}");
assert!(err.contains("12345"), "PID should appear: {err}");
}
#[tokio::test(flavor = "current_thread")]
async fn returns_daemon_exited_error_when_lockfile_disappears() {
let polls = Arc::new(AtomicU32::new(0));
let c = cfg(10_000, 10_000, 25);
let polls_for_check = Arc::clone(&polls);
let err = wait_for_daemon_ready_with(
dead_endpoint(),
move || {
let n = polls_for_check.fetch_add(1, Ordering::SeqCst);
if n == 0 {
Some(99_999)
} else {
None
}
},
c,
)
.await
.expect_err("daemon-exit path must fail, not hang");
assert!(err.contains("exited"), "wrong error: {err}");
assert!(err.contains("99999"), "PID should appear: {err}");
}
}