use crate::core::NormalizedPath;
use std::process::ExitCode;
use super::super::status_probe_timeout;
use super::util::{connect, resolve_endpoint, run_async, LOST_CONNECTION_MSG};
pub(crate) enum VersionCheck {
Ok,
DaemonNewer {
daemon_ver: String,
},
DaemonOlder {
daemon_ver: String,
},
Unreachable,
CommError,
ClientConfigError(String),
}
pub(crate) async fn check_daemon_version(endpoint: &str) -> VersionCheck {
match crate::ipc::daemon_control_roundtrip(
endpoint,
crate::ipc::DaemonControlRequest::Status,
Some(status_probe_timeout()),
)
.await
{
Ok(Some(crate::protocol::Response::Status(s))) => {
if s.version == crate::core::VERSION {
return VersionCheck::Ok;
}
let client_ver = crate::core::version::current();
match crate::core::version::Version::parse(&s.version) {
Some(daemon_ver) => match daemon_ver.cmp(&client_ver) {
std::cmp::Ordering::Equal => VersionCheck::Ok,
std::cmp::Ordering::Greater => VersionCheck::DaemonNewer {
daemon_ver: s.version,
},
std::cmp::Ordering::Less => VersionCheck::DaemonOlder {
daemon_ver: s.version,
},
},
None => VersionCheck::DaemonOlder {
daemon_ver: s.version,
},
}
}
Err(crate::ipc::IpcError::Endpoint(message))
if message.contains(crate::protocol::wire_prost::WIRE_FORMAT_ENV) =>
{
VersionCheck::ClientConfigError(message)
}
Err(err) if crate::cli::client::is_daemon_unreachable_err(&err) => {
VersionCheck::Unreachable
}
_ => VersionCheck::CommError,
}
}
pub(crate) async fn spawn_and_wait(
endpoint: &str,
reason: &str,
outbound_pid: Option<u32>,
) -> Result<(), String> {
let daemon_bin = find_daemon_binary().ok_or("cannot find zccache-daemon binary")?;
tracing::debug!(?daemon_bin, %endpoint, reason, "spawning daemon");
let meta = crate::core::lifecycle::client_meta(crate::core::VERSION);
crate::core::lifecycle::write_event(
crate::core::lifecycle::EVENT_SPAWN_ATTEMPT,
serde_json::json!({
"reason": reason,
"endpoint": endpoint,
"daemon_namespace": crate::core::config::daemon_namespace_label(),
"client_pid": std::process::id(),
"client_version": meta["client_version"],
"client_binary_path": meta["client_binary_path"],
}),
);
super::super::spawn_daemon(&daemon_bin, endpoint)?;
super::super::wait_for_daemon_ready(endpoint).await?;
if let Some(killed_pid) = outbound_pid {
if let Some(new_pid) = crate::ipc::check_running_daemon() {
crate::core::lifecycle::emit_takeover_lifecycle_events(
killed_pid,
new_pid,
crate::core::VERSION,
endpoint,
);
}
}
Ok(())
}
pub(crate) async fn stop_stale_daemon(endpoint: &str) -> Option<u32> {
let _ = crate::ipc::daemon_control_roundtrip(
endpoint,
crate::ipc::DaemonControlRequest::Shutdown,
None,
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let killed_pid = if let Some(pid) = crate::ipc::check_running_daemon() {
tracing::debug!(pid, "force-killing stale daemon process");
let kill_ok = crate::ipc::force_kill_process(pid).is_ok();
if kill_ok {
for _ in 0..50 {
if !crate::ipc::is_process_alive(pid) {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
crate::ipc::remove_lock_file();
kill_ok.then_some(pid)
} else {
None
};
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
killed_pid
}
pub(crate) async fn ensure_daemon(endpoint: &str) -> Result<(), String> {
match check_daemon_version(endpoint).await {
VersionCheck::Ok => return Ok(()),
VersionCheck::DaemonNewer { daemon_ver } => {
tracing::debug!(
daemon_ver,
client_ver = crate::core::VERSION,
"daemon is newer than client, proceeding"
);
return Ok(());
}
VersionCheck::DaemonOlder { daemon_ver } => {
tracing::info!(
daemon_ver,
client_ver = crate::core::VERSION,
"daemon is older than client, auto-recovering"
);
let killed_pid = stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_STALE_VERSION,
killed_pid,
)
.await;
}
VersionCheck::CommError => {
tracing::info!("cannot communicate with daemon, auto-recovering");
let killed_pid = stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_COMM_ERROR,
killed_pid,
)
.await;
}
VersionCheck::ClientConfigError(message) => return Err(message),
VersionCheck::Unreachable => {
}
}
if let Some(pid) = crate::ipc::check_running_daemon() {
for _ in 0..20 {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
match check_daemon_version(endpoint).await {
VersionCheck::Ok => return Ok(()),
VersionCheck::DaemonNewer { daemon_ver } => {
tracing::debug!(
daemon_ver,
client_ver = crate::core::VERSION,
"daemon is newer than client, proceeding"
);
return Ok(());
}
VersionCheck::DaemonOlder { daemon_ver } => {
tracing::info!(
daemon_ver,
client_ver = crate::core::VERSION,
"daemon is older than client during startup, auto-recovering"
);
let killed_pid = stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_STALE_VERSION,
killed_pid,
)
.await;
}
VersionCheck::CommError => {
tracing::info!(
"cannot communicate with daemon during startup, auto-recovering"
);
let killed_pid = stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_COMM_ERROR,
killed_pid,
)
.await;
}
VersionCheck::ClientConfigError(message) => return Err(message),
VersionCheck::Unreachable => continue,
}
}
return Err(format!(
"daemon process {pid} exists but not accepting connections"
));
}
spawn_and_wait(endpoint, crate::core::lifecycle::REASON_INITIAL_START, None).await
}
pub(crate) fn find_daemon_binary() -> Option<NormalizedPath> {
let name = if cfg!(windows) {
"zccache-daemon.exe"
} else {
"zccache-daemon"
};
if let Ok(exe) = std::env::current_exe() {
if let Some(dir) = exe.parent() {
let candidate = dir.join(name);
if candidate.exists() {
return Some(candidate.into());
}
}
}
which_on_path(name)
}
pub(crate) fn which_on_path(name: &str) -> Option<NormalizedPath> {
let path_var = std::env::var_os("PATH")?;
for dir in std::env::split_paths(&path_var) {
let candidate = dir.join(name);
if candidate.is_file() {
return Some(candidate.into());
}
#[cfg(windows)]
if std::path::Path::new(name).extension().is_none() {
let with_exe = dir.join(format!("{name}.exe"));
if with_exe.is_file() {
return Some(with_exe.into());
}
}
}
None
}
pub(crate) async fn cmd_start(endpoint: &str) -> ExitCode {
match ensure_daemon(endpoint).await {
Ok(()) => {
eprintln!("daemon running at {endpoint}");
ExitCode::SUCCESS
}
Err(e) => {
eprintln!("failed to start daemon: {e}");
ExitCode::FAILURE
}
}
}
pub(crate) async fn cmd_stop(endpoint: &str) -> ExitCode {
let recv_result = match crate::ipc::daemon_control_roundtrip(
endpoint,
crate::ipc::DaemonControlRequest::Shutdown,
None,
)
.await
{
Ok(response) => response,
Err(e) if crate::cli::client::is_daemon_unreachable_err(&e) => {
let Some(pid) = crate::ipc::check_running_daemon() else {
eprintln!("daemon not running at {endpoint}");
wait_for_daemon_teardown(endpoint).await;
return ExitCode::SUCCESS;
};
match crate::ipc::force_kill_process(pid) {
Ok(()) => {
for _ in 0..50 {
if !crate::ipc::is_process_alive(pid) {
crate::ipc::remove_lock_file();
eprintln!(
"daemon process {pid} terminated after IPC connection failed"
);
wait_for_daemon_teardown(endpoint).await;
return ExitCode::SUCCESS;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
eprintln!(
"zccache: sent termination to daemon process {pid}, but it did not exit"
);
return ExitCode::FAILURE;
}
Err(e) => {
eprintln!(
"zccache: cannot connect to daemon at {endpoint}, and failed to kill \
locked process {pid}: {e}"
);
return ExitCode::FAILURE;
}
}
}
Err(e) => {
eprintln!("zccache[err][R]: broken connection to daemon: {e}");
return ExitCode::FAILURE;
}
};
match recv_result {
Some(crate::protocol::Response::ShuttingDown) => {
wait_for_daemon_teardown(endpoint).await;
eprintln!("daemon stopped");
ExitCode::SUCCESS
}
None => {
eprintln!("{LOST_CONNECTION_MSG}");
ExitCode::FAILURE
}
Some(other) => {
eprintln!("zccache[err][U]: unexpected response from daemon: {other:?}");
ExitCode::FAILURE
}
}
}
const STOP_WAIT_DEFAULT_SECS: u64 = 10;
const STOP_WAIT_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
fn stop_wait_timeout() -> std::time::Duration {
let secs = std::env::var("ZCCACHE_STOP_TIMEOUT_SECS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(STOP_WAIT_DEFAULT_SECS);
std::time::Duration::from_secs(secs)
}
pub(crate) async fn wait_for_daemon_teardown(endpoint: &str) {
let deadline = std::time::Instant::now() + stop_wait_timeout();
loop {
if !is_ipc_endpoint_reachable(endpoint).await {
return;
}
if std::time::Instant::now() >= deadline {
eprintln!(
"zccache: timed out waiting for daemon endpoint to disappear after stop; \
continuing anyway. set ZCCACHE_STOP_TIMEOUT_SECS to override."
);
return;
}
tokio::time::sleep(STOP_WAIT_POLL_INTERVAL).await;
}
}
async fn is_ipc_endpoint_reachable(endpoint: &str) -> bool {
connect(endpoint).await.is_ok()
}
pub(crate) fn run_start() -> ExitCode {
let endpoint = resolve_endpoint(None);
run_async(cmd_start(&endpoint))
}
pub(crate) fn run_stop() -> ExitCode {
let endpoint = resolve_endpoint(None);
run_async(cmd_stop(&endpoint))
}