use crate::core::NormalizedPath;
use std::path::Path;
pub fn run_async<T>(
future: impl std::future::Future<Output = Result<T, String>>,
) -> Result<T, String> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("failed to create tokio runtime: {e}"))?
.block_on(future)
}
#[derive(Debug)]
enum VersionCheck {
Ok,
Unreachable,
DaemonOlder { daemon_ver: String },
DaemonNewer,
CommError,
}
#[cfg(unix)]
pub async fn connect_client(
endpoint: &str,
) -> Result<crate::ipc::IpcConnection, crate::ipc::IpcError> {
let mut conn = crate::ipc::connect(endpoint).await?;
conn.set_recv_timeout(crate::ipc::DEFAULT_CLIENT_RECV_TIMEOUT);
Ok(conn)
}
#[cfg(windows)]
pub async fn connect_client(
endpoint: &str,
) -> Result<crate::ipc::IpcClientConnection, crate::ipc::IpcError> {
let mut conn = crate::ipc::connect(endpoint).await?;
conn.set_recv_timeout(crate::ipc::DEFAULT_CLIENT_RECV_TIMEOUT);
Ok(conn)
}
async fn check_daemon_version(endpoint: &str) -> VersionCheck {
let mut conn = match connect_client(endpoint).await {
Ok(c) => c,
Err(_) => return VersionCheck::Unreachable,
};
if conn.send(&crate::protocol::Request::Status).await.is_err() {
return VersionCheck::CommError;
}
match conn
.recv_with_timeout::<crate::protocol::Response>(super::status_probe_timeout())
.await
{
Ok(Some(crate::protocol::Response::Status(s))) => {
if s.version == crate::core::VERSION {
return VersionCheck::Ok;
}
let client_ver = crate::core::version::current();
match crate::core::version::Version::parse(&s.version) {
Some(daemon_ver) => match daemon_ver.cmp(&client_ver) {
std::cmp::Ordering::Equal => VersionCheck::Ok,
std::cmp::Ordering::Greater => VersionCheck::DaemonNewer,
std::cmp::Ordering::Less => VersionCheck::DaemonOlder {
daemon_ver: s.version,
},
},
None => VersionCheck::DaemonOlder {
daemon_ver: s.version,
},
}
}
_ => VersionCheck::CommError,
}
}
async fn spawn_and_wait(endpoint: &str, reason: &str) -> Result<(), String> {
let daemon_bin = find_daemon_binary().ok_or("cannot find zccache-daemon binary")?;
crate::core::lifecycle::write_event(
crate::core::lifecycle::EVENT_SPAWN_ATTEMPT,
serde_json::json!({
"reason": reason,
"endpoint": endpoint,
"daemon_namespace": crate::core::config::daemon_namespace_label(),
"client_pid": std::process::id(),
}),
);
spawn_daemon(&daemon_bin, endpoint)?;
for _ in 0..100 {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if connect_client(endpoint).await.is_ok() {
return Ok(());
}
}
Err("daemon started but not accepting connections after 10s".to_string())
}
async fn stop_stale_daemon(endpoint: &str) {
if let Ok(mut conn) = connect_client(endpoint).await {
let _ = conn.send(&crate::protocol::Request::Shutdown).await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
if let Some(pid) = crate::ipc::check_running_daemon() {
if crate::ipc::force_kill_process(pid).is_ok() {
for _ in 0..50 {
if !crate::ipc::is_process_alive(pid) {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
crate::ipc::remove_lock_file();
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
pub async fn ensure_daemon(endpoint: &str) -> Result<(), String> {
match check_daemon_version(endpoint).await {
VersionCheck::Ok | VersionCheck::DaemonNewer => return Ok(()),
VersionCheck::DaemonOlder { daemon_ver } => {
tracing::info!(
daemon_ver,
client_ver = crate::core::VERSION,
"daemon is older than client, auto-recovering"
);
stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_STALE_VERSION,
)
.await;
}
VersionCheck::CommError => {
tracing::info!("cannot communicate with daemon, auto-recovering");
stop_stale_daemon(endpoint).await;
return spawn_and_wait(endpoint, crate::core::lifecycle::REASON_REPLACED_COMM_ERROR)
.await;
}
VersionCheck::Unreachable => {}
}
if let Some(pid) = crate::ipc::check_running_daemon() {
let mut backoff = std::time::Duration::from_millis(100);
for _ in 0..20 {
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(std::time::Duration::from_millis(500));
match check_daemon_version(endpoint).await {
VersionCheck::Ok | VersionCheck::DaemonNewer => return Ok(()),
VersionCheck::DaemonOlder { daemon_ver } => {
tracing::info!(
daemon_ver,
client_ver = crate::core::VERSION,
"daemon is older than client during startup, auto-recovering"
);
stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_STALE_VERSION,
)
.await;
}
VersionCheck::CommError => {
stop_stale_daemon(endpoint).await;
return spawn_and_wait(
endpoint,
crate::core::lifecycle::REASON_REPLACED_COMM_ERROR,
)
.await;
}
VersionCheck::Unreachable => continue,
}
}
return Err(format!(
"daemon process {pid} exists but not accepting connections after retrying"
));
}
spawn_and_wait(endpoint, crate::core::lifecycle::REASON_INITIAL_START).await
}
fn find_daemon_binary() -> Option<NormalizedPath> {
let name = if cfg!(windows) {
"zccache-daemon.exe"
} else {
"zccache-daemon"
};
if let Ok(exe) = std::env::current_exe() {
if let Some(dir) = exe.parent() {
let candidate = dir.join(name);
if candidate.exists() {
return Some(candidate.into());
}
}
}
which_on_path(name)
}
fn which_on_path(name: &str) -> Option<NormalizedPath> {
let path_var = std::env::var_os("PATH")?;
for dir in std::env::split_paths(&path_var) {
let candidate = dir.join(name);
if candidate.is_file() {
return Some(candidate.into());
}
#[cfg(windows)]
if Path::new(name).extension().is_none() {
let with_exe = dir.join(format!("{name}.exe"));
if with_exe.is_file() {
return Some(with_exe.into());
}
}
}
None
}
#[cfg(not(windows))]
fn apply_cli_spawn_lineage(cmd: &mut std::process::Command) {
for (k, v) in cli_spawn_lineage_env() {
cmd.env(k, v);
}
}
fn cli_spawn_lineage_env() -> Vec<(String, String)> {
const ENV_ORIGINATOR: &str = "RUNNING_PROCESS_ORIGINATOR";
const ENV_LINEAGE: &str = "ZCCACHE_LINEAGE";
const ENV_PARENT_PID: &str = "ZCCACHE_PARENT_PID";
const ENV_CLIENT_PID: &str = "ZCCACHE_CLIENT_PID";
let cli_pid = std::process::id();
let mut out: Vec<(String, String)> = Vec::with_capacity(4);
if std::env::var(ENV_ORIGINATOR).is_err() {
out.push((ENV_ORIGINATOR.to_string(), format!("zccache-cli:{cli_pid}")));
}
let chain = match std::env::var(ENV_LINEAGE) {
Ok(existing)
if existing
.rsplit_once('>')
.map_or(existing.as_str(), |(_, last)| last)
!= cli_pid.to_string() =>
{
format!("{existing}>{cli_pid}")
}
Ok(existing) => existing,
Err(_) => cli_pid.to_string(),
};
out.push((ENV_LINEAGE.to_string(), chain));
out.push((ENV_PARENT_PID.to_string(), cli_pid.to_string()));
out.push((ENV_CLIENT_PID.to_string(), cli_pid.to_string()));
out
}
const RUNTIME_BINARIES_SUBDIR: &str = "runtime-binaries";
#[must_use]
pub fn runtime_binaries_dir() -> NormalizedPath {
crate::core::config::default_cache_dir().join(RUNTIME_BINARIES_SUBDIR)
}
pub fn prepare_daemon_exe(canonical: &Path) -> Result<std::path::PathBuf, std::io::Error> {
prepare_daemon_exe_in(canonical, runtime_binaries_dir().as_path())
}
pub fn prepare_daemon_exe_in(
canonical: &Path,
dir: &Path,
) -> Result<std::path::PathBuf, std::io::Error> {
std::fs::create_dir_all(dir)?;
let rand_id: u32 = std::process::id()
^ std::time::UNIX_EPOCH
.elapsed()
.unwrap_or_default()
.subsec_nanos();
let extension = canonical.extension().and_then(|s| s.to_str()).unwrap_or("");
let file_name = if extension.is_empty() {
format!("zccache-daemon.{rand_id}")
} else {
format!("zccache-daemon.{rand_id}.{extension}")
};
let dest = dir.join(&file_name);
std::fs::copy(canonical, &dest)?;
Ok(dest)
}
pub fn gc_runtime_binaries() {
gc_runtime_binaries_in(runtime_binaries_dir().as_path());
}
pub fn gc_runtime_binaries_in(dir: &Path) {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let _ = std::fs::remove_file(entry.path());
}
}
const DAEMON_SPAWN_LOGS_SUBDIR: &str = "logs";
fn allocate_daemon_spawn_log_path() -> std::path::PathBuf {
let dir = crate::core::config::default_cache_dir().join(DAEMON_SPAWN_LOGS_SUBDIR);
let _ = std::fs::create_dir_all(dir.as_path());
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let pid = std::process::id();
let file_name = match crate::core::config::daemon_namespace() {
Some(namespace) => format!("daemon-spawn-{namespace}-{pid}-{nanos}.log"),
None => format!("daemon-spawn-{pid}-{nanos}.log"),
};
dir.as_path().join(file_name)
}
const LOG_GC_CUTOFF: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
pub fn gc_log_directory() {
let dir = crate::core::config::default_cache_dir().join(DAEMON_SPAWN_LOGS_SUBDIR);
gc_log_directory_in(dir.as_path(), LOG_GC_CUTOFF);
}
pub fn gc_log_directory_in(dir: &Path, cutoff: std::time::Duration) {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
let now = std::time::SystemTime::now();
for entry in entries.flatten() {
let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
continue;
};
if crate::core::lifecycle::is_live_lifecycle_log_name(&name) {
continue;
}
let file_type = entry.file_type();
if file_type.map(|t| !t.is_file()).unwrap_or(true) {
continue;
}
let modified = entry
.metadata()
.and_then(|m| m.modified())
.ok()
.and_then(|t| now.duration_since(t).ok());
if let Some(age) = modified {
if age > cutoff {
let _ = std::fs::remove_file(entry.path());
}
}
}
}
#[deprecated(note = "use gc_log_directory instead — sweeps the full logs/ directory")]
pub fn gc_daemon_spawn_logs() {
gc_log_directory();
}
pub fn spawn_daemon(bin: &Path, endpoint: &str) -> Result<(), String> {
gc_runtime_binaries();
gc_log_directory();
let bin_owned: std::path::PathBuf;
let spawn_bin: &Path = match prepare_daemon_exe(bin) {
Ok(p) => {
bin_owned = p;
&bin_owned
}
Err(_) => bin,
};
let log_path = allocate_daemon_spawn_log_path();
let log_arg = log_path.to_string_lossy().into_owned();
let mut cmd = std::process::Command::new(spawn_bin);
cmd.args([
"--foreground",
"--endpoint",
endpoint,
"--log-file",
&log_arg,
]);
#[cfg(not(windows))]
apply_cli_spawn_lineage(&mut cmd);
#[cfg(windows)]
{
for (k, v) in cli_spawn_lineage_env() {
cmd.env(k, v);
}
}
running_process::spawn_daemon(&mut cmd)
.map(|_child| ())
.map_err(|e| format!("failed to spawn daemon (sanitized): {e}"))
}