use crate::core::NormalizedPath;
use std::io::Write;
use std::path::Path;
use std::process::ExitCode;
use super::super::super::{link_retry_budget, wedge_recv_timeout};
use super::super::daemon::{ensure_daemon, stop_stale_daemon};
use super::super::util::{connect, exit_code_from_i32, slurp_stdin_if_piped, LOST_CONNECTION_MSG};
pub(super) async fn cmd_compile(
endpoint: &str,
session_id: &str,
args: Vec<String>,
cwd: NormalizedPath,
compiler: NormalizedPath,
client_env: Vec<(String, String)>,
) -> ExitCode {
let stdin_bytes = slurp_stdin_if_piped();
let mut conn = match connect(endpoint).await {
Ok(c) => c,
Err(e) => {
eprintln!("zccache[err][C]: cannot connect to daemon at {endpoint}: {e}");
return ExitCode::FAILURE;
}
};
let wire = crate::protocol::wire_prost::full_family_wire_format_from_env();
let request = crate::protocol::Request::Compile {
session_id: session_id.to_string(),
args: args.clone(),
cwd: cwd.clone(),
compiler: compiler.clone(),
env: Some(client_env.clone()),
stdin: stdin_bytes.clone(),
};
if let Err(e) = conn.send_request(&request, wire).await {
eprintln!("zccache[err][S]: failed to send to daemon: {e}");
return ExitCode::FAILURE;
}
match compile_recv_with_wedge_detection(&mut conn, wedge_recv_timeout()).await {
CompileRecvOutcome::Done(recv_result) => {
relay_compile_response(recv_result, &mut std::io::stdout(), &mut std::io::stderr())
}
CompileRecvOutcome::Wedged => {
drop(conn);
let action = match wedge_probe_budget() {
Some(budget) => {
classify_probe_outcome(probe_daemon_responsive(endpoint, budget).await)
}
None => WedgeAction::EscalateKill, };
match action {
WedgeAction::DowngradeNoKill => {
eprintln!(
"zccache[warn][W]: daemon at {endpoint} answered probe within \
budget but missed the per-request wedge budget — burst load, \
not a hung daemon. Recovering via ephemeral without killing — \
issue #753"
);
cmd_compile_ephemeral(endpoint, compiler.as_path(), args, cwd, client_env).await
}
WedgeAction::EscalateKill | WedgeAction::EscalateKillProbeError => {
eprintln!(
"zccache[warn][W]: daemon at {endpoint} appears wedged \
(probe failed within budget); recovering — issue #666"
);
stop_stale_daemon(endpoint).await;
cmd_compile_ephemeral(endpoint, compiler.as_path(), args, cwd, client_env).await
}
}
}
CompileRecvOutcome::Failed(msg) => {
emit_client_disconnected_event(
endpoint,
crate::core::lifecycle::CAUSE_COMM_ERROR,
&msg,
);
eprintln!("zccache[err][R]: {msg}");
ExitCode::FAILURE
}
}
}
#[allow(clippy::large_enum_variant)]
enum CompileRecvOutcome {
Done(Option<crate::protocol::Response>),
Wedged,
Failed(String),
}
async fn compile_recv_with_wedge_detection<C: ConnRecv>(
conn: &mut C,
budget: Option<std::time::Duration>,
) -> CompileRecvOutcome {
match budget {
Some(budget) => match conn.recv_with_timeout(budget).await {
Ok(opt) => CompileRecvOutcome::Done(opt),
Err(crate::ipc::IpcError::Timeout(_)) => CompileRecvOutcome::Wedged,
Err(e) => CompileRecvOutcome::Failed(format!("broken connection to daemon: {e}")),
},
None => match conn.recv().await {
Ok(opt) => CompileRecvOutcome::Done(opt),
Err(e) => CompileRecvOutcome::Failed(format!("broken connection to daemon: {e}")),
},
}
}
trait ConnRecv {
async fn recv(&mut self) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError>;
async fn recv_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError>;
}
async fn link_with_retry<A, AF, R, RF>(
mut attempt: A,
mut recover: R,
max_recoveries: u32,
) -> CompileRecvOutcome
where
A: FnMut() -> AF,
AF: std::future::Future<Output = CompileRecvOutcome>,
R: FnMut() -> RF,
RF: std::future::Future<Output = ()>,
{
let mut outcome = attempt().await;
let mut recoveries_used = 0;
while matches!(outcome, CompileRecvOutcome::Failed(_)) && recoveries_used < max_recoveries {
recover().await;
recoveries_used += 1;
outcome = attempt().await;
}
outcome
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum WedgeAction {
DowngradeNoKill,
EscalateKill,
EscalateKillProbeError,
}
pub(super) fn classify_probe_outcome(
probe: Result<Result<(), crate::ipc::IpcError>, tokio::time::error::Elapsed>,
) -> WedgeAction {
match probe {
Ok(Ok(())) => WedgeAction::DowngradeNoKill,
Ok(Err(_)) => WedgeAction::EscalateKillProbeError,
Err(_) => WedgeAction::EscalateKill,
}
}
async fn probe_daemon_responsive(
endpoint: &str,
budget: std::time::Duration,
) -> Result<Result<(), crate::ipc::IpcError>, tokio::time::error::Elapsed> {
tokio::time::timeout(budget, async {
let mut conn = connect(endpoint).await?;
let wire = crate::protocol::wire_prost::full_family_wire_format_from_env();
conn.send_request(&crate::protocol::Request::Ping, wire)
.await?;
let _ = conn.recv::<crate::protocol::Response>().await?;
Ok::<(), crate::ipc::IpcError>(())
})
.await
}
pub(super) const WEDGE_PROBE_DEFAULT_MS: u64 = 3_000;
pub(super) fn wedge_probe_budget() -> Option<std::time::Duration> {
let ms = std::env::var("ZCCACHE_WEDGE_PROBE_BUDGET_MS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(WEDGE_PROBE_DEFAULT_MS);
if ms == 0 {
None
} else {
Some(std::time::Duration::from_millis(ms))
}
}
#[cfg(unix)]
impl ConnRecv for crate::ipc::IpcConnection {
async fn recv(&mut self) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
crate::ipc::IpcConnection::recv_response(self).await
}
async fn recv_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
crate::ipc::IpcConnection::recv_response_with_timeout(self, timeout).await
}
}
#[cfg(windows)]
impl ConnRecv for crate::ipc::IpcClientConnection {
async fn recv(&mut self) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
crate::ipc::IpcClientConnection::recv_response(self).await
}
async fn recv_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
crate::ipc::IpcClientConnection::recv_response_with_timeout(self, timeout).await
}
}
pub(super) async fn cmd_compile_ephemeral(
endpoint: &str,
compiler: &Path,
args: Vec<String>,
cwd: NormalizedPath,
client_env: Vec<(String, String)>,
) -> ExitCode {
let stdin_bytes = slurp_stdin_if_piped();
let request = crate::protocol::Request::CompileEphemeral {
client_pid: std::process::id(),
working_dir: cwd.clone(),
compiler: compiler.into(),
args,
cwd,
env: Some(client_env),
stdin: stdin_bytes,
};
let outcome = link_with_retry(
|| run_ephemeral_attempt(endpoint, &request),
retry_backoff_with_jitter,
link_retry_budget(),
)
.await;
match outcome {
CompileRecvOutcome::Done(recv_result) => {
relay_compile_response(recv_result, &mut std::io::stdout(), &mut std::io::stderr())
}
CompileRecvOutcome::Wedged => {
eprintln!(
"zccache[err][W]: daemon at {endpoint} stopped responding within \
the wedge budget; killing it so the next compile starts fresh — issue #666"
);
stop_stale_daemon(endpoint).await;
ExitCode::FAILURE
}
CompileRecvOutcome::Failed(msg) => {
eprintln!("zccache[err][R]: {msg}");
ExitCode::FAILURE
}
}
}
pub(super) async fn cmd_link_ephemeral(
endpoint: &str,
tool: &Path,
args: Vec<String>,
cwd: NormalizedPath,
client_env: Vec<(String, String)>,
) -> ExitCode {
let request = crate::protocol::Request::LinkEphemeral {
client_pid: std::process::id(),
tool: tool.into(),
args,
cwd,
env: Some(client_env),
};
let outcome = link_with_retry(
|| run_ephemeral_attempt(endpoint, &request),
retry_backoff_with_jitter,
link_retry_budget(),
)
.await;
match outcome {
CompileRecvOutcome::Done(recv_result) => {
relay_link_response(recv_result, &mut std::io::stdout(), &mut std::io::stderr())
}
CompileRecvOutcome::Wedged => {
eprintln!(
"zccache[err][W]: daemon at {endpoint} stopped responding within \
the wedge budget on a Link; killing it so the next request starts \
fresh — issue #666"
);
stop_stale_daemon(endpoint).await;
ExitCode::FAILURE
}
CompileRecvOutcome::Failed(msg) => {
eprintln!("zccache[err][R]: {msg}");
ExitCode::FAILURE
}
}
}
async fn retry_backoff_with_jitter() {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
let jitter_ms = 50 + u64::from(nanos % 201); tokio::time::sleep(std::time::Duration::from_millis(jitter_ms)).await;
}
async fn run_ephemeral_attempt(
endpoint: &str,
request: &crate::protocol::Request,
) -> CompileRecvOutcome {
if let Err(e) = ensure_daemon(endpoint).await {
return failed_with_disconnect_event(
endpoint,
crate::core::lifecycle::CAUSE_COMM_ERROR,
format!("cannot start daemon at {endpoint}: {e}"),
);
}
let mut conn = match connect(endpoint).await {
Ok(c) => c,
Err(e) => {
return failed_with_disconnect_event(
endpoint,
crate::core::lifecycle::CAUSE_COMM_ERROR,
format!("cannot connect to daemon at {endpoint}: {e}"),
);
}
};
let wire = crate::protocol::wire_prost::full_family_wire_format_from_env();
if let Err(e) = conn.send_request(request, wire).await {
return failed_with_disconnect_event(
endpoint,
crate::core::lifecycle::CAUSE_PIPE_CLOSED_MID_WRITE,
format!("failed to send to daemon: {e}"),
);
}
let outcome = compile_recv_with_wedge_detection(&mut conn, wedge_recv_timeout()).await;
if let CompileRecvOutcome::Failed(msg) = &outcome {
emit_client_disconnected_event(endpoint, crate::core::lifecycle::CAUSE_COMM_ERROR, msg);
}
outcome
}
fn failed_with_disconnect_event(endpoint: &str, cause: &str, msg: String) -> CompileRecvOutcome {
emit_client_disconnected_event(endpoint, cause, &msg);
CompileRecvOutcome::Failed(msg)
}
fn emit_client_disconnected_event(endpoint: &str, cause: &str, detail: &str) {
let meta = crate::core::lifecycle::client_meta(crate::core::VERSION);
crate::core::lifecycle::write_event(
crate::core::lifecycle::EVENT_CLIENT_DISCONNECTED,
serde_json::json!({
"endpoint": endpoint,
"client_pid": std::process::id(),
"client_version": meta["client_version"],
"client_binary_path": meta["client_binary_path"],
"cause": cause,
"detail": detail,
}),
);
}
fn relay_compile_response<W: Write, E: Write>(
recv_result: Option<crate::protocol::Response>,
stdout: &mut W,
stderr: &mut E,
) -> ExitCode {
match recv_result {
Some(crate::protocol::Response::CompileResult {
exit_code,
stdout: out,
stderr: err,
..
}) => {
let _ = stdout.write_all(&out);
let _ = stderr.write_all(&err);
exit_code_from_i32(exit_code)
}
Some(crate::protocol::Response::Error { message }) => {
let _ = writeln!(stderr, "zccache[err][E]: daemon error: {message}");
ExitCode::FAILURE
}
None => {
let _ = writeln!(stderr, "{LOST_CONNECTION_MSG}");
ExitCode::FAILURE
}
Some(other) => {
let _ = writeln!(
stderr,
"zccache[err][U]: unexpected response from daemon: {other:?}"
);
ExitCode::FAILURE
}
}
}
fn relay_link_response<W: Write, E: Write>(
recv_result: Option<crate::protocol::Response>,
stdout: &mut W,
stderr: &mut E,
) -> ExitCode {
match recv_result {
Some(crate::protocol::Response::LinkResult {
exit_code,
stdout: out,
stderr: err,
warning,
..
}) => {
let _ = stdout.write_all(&out);
let _ = stderr.write_all(&err);
if let Some(w) = warning {
let _ = writeln!(stderr, "zccache warning: {w}");
}
exit_code_from_i32(exit_code)
}
Some(crate::protocol::Response::Error { message }) => {
let _ = writeln!(stderr, "zccache[err][E]: daemon error: {message}");
ExitCode::FAILURE
}
None => {
let _ = writeln!(stderr, "{LOST_CONNECTION_MSG}");
ExitCode::FAILURE
}
Some(other) => {
let _ = writeln!(
stderr,
"zccache[err][U]: unexpected response from daemon: {other:?}"
);
ExitCode::FAILURE
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn compile_response_relay_writes_stdout_stderr_and_exit_code() {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let exit = relay_compile_response(
Some(crate::protocol::Response::CompileResult {
exit_code: 7,
stdout: Arc::new(b"compiler-out".to_vec()),
stderr: Arc::new(b"compiler-err".to_vec()),
cached: false,
}),
&mut stdout,
&mut stderr,
);
assert_eq!(exit, ExitCode::from(7));
assert_eq!(stdout, b"compiler-out");
assert_eq!(stderr, b"compiler-err");
}
struct FakeConn {
behavior: FakeBehavior,
}
#[allow(clippy::large_enum_variant)]
enum FakeBehavior {
Ok(crate::protocol::Response),
TimesOut,
BrokenPipe,
}
impl ConnRecv for FakeConn {
async fn recv(
&mut self,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
match &self.behavior {
FakeBehavior::Ok(r) => Ok(Some(r.clone())),
FakeBehavior::TimesOut => {
futures::future::pending::<()>().await;
unreachable!()
}
FakeBehavior::BrokenPipe => Err(crate::ipc::IpcError::ConnectionClosed),
}
}
async fn recv_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
match &self.behavior {
FakeBehavior::Ok(r) => Ok(Some(r.clone())),
FakeBehavior::TimesOut => {
tokio::time::sleep(timeout).await;
Err(crate::ipc::IpcError::Timeout(timeout))
}
FakeBehavior::BrokenPipe => Err(crate::ipc::IpcError::ConnectionClosed),
}
}
}
const TEST_BUDGET: Option<std::time::Duration> = Some(std::time::Duration::from_secs(1));
#[tokio::test]
async fn wedge_detection_returns_done_on_normal_response() {
let mut conn = FakeConn {
behavior: FakeBehavior::Ok(crate::protocol::Response::Pong),
};
let outcome = compile_recv_with_wedge_detection(&mut conn, TEST_BUDGET).await;
assert!(matches!(
outcome,
CompileRecvOutcome::Done(Some(crate::protocol::Response::Pong))
));
}
#[tokio::test(start_paused = true)]
async fn wedge_detection_returns_wedged_on_recv_timeout() {
let mut conn = FakeConn {
behavior: FakeBehavior::TimesOut,
};
let started = tokio::time::Instant::now();
let outcome = compile_recv_with_wedge_detection(&mut conn, TEST_BUDGET).await;
let elapsed = started.elapsed();
assert!(matches!(outcome, CompileRecvOutcome::Wedged));
assert!(
elapsed >= std::time::Duration::from_secs(1)
&& elapsed < std::time::Duration::from_millis(1100),
"wedge detection took {elapsed:?} against a never-responding fake; \
issue #666 expects fail-fast at the configured budget"
);
}
#[tokio::test]
async fn wedge_detection_does_not_misclassify_broken_pipe_as_wedge() {
let mut conn = FakeConn {
behavior: FakeBehavior::BrokenPipe,
};
let outcome = compile_recv_with_wedge_detection(&mut conn, TEST_BUDGET).await;
assert!(matches!(outcome, CompileRecvOutcome::Failed(_)));
}
#[tokio::test]
async fn link_retry_returns_done_when_first_attempt_succeeds() {
let attempts = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let recoveries = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let outcome = link_with_retry(
|| {
attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async { CompileRecvOutcome::Done(Some(crate::protocol::Response::Pong)) }
},
|| {
recoveries.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async {}
},
1,
)
.await;
assert!(matches!(
outcome,
CompileRecvOutcome::Done(Some(crate::protocol::Response::Pong))
));
assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 1);
assert_eq!(recoveries.load(std::sync::atomic::Ordering::SeqCst), 0);
}
#[tokio::test]
async fn link_retry_recovers_after_one_transport_failure() {
let attempts = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let recoveries = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let outcome = link_with_retry(
|| {
let n = attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
async move {
if n == 1 {
CompileRecvOutcome::Failed("lost connection to daemon".to_string())
} else {
CompileRecvOutcome::Done(Some(crate::protocol::Response::Pong))
}
}
},
|| {
recoveries.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async {}
},
1,
)
.await;
assert!(
matches!(
outcome,
CompileRecvOutcome::Done(Some(crate::protocol::Response::Pong))
),
"retry should drive a transport-flaky link to a Done outcome (#752)"
);
assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 2);
assert_eq!(recoveries.load(std::sync::atomic::Ordering::SeqCst), 1);
}
#[tokio::test]
async fn link_retry_surfaces_failure_after_exhausting_budget() {
let attempts = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let recoveries = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let outcome = link_with_retry(
|| {
attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async { CompileRecvOutcome::Failed("daemon really gone".to_string()) }
},
|| {
recoveries.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async {}
},
1,
)
.await;
assert!(matches!(outcome, CompileRecvOutcome::Failed(_)));
assert_eq!(
attempts.load(std::sync::atomic::Ordering::SeqCst),
2,
"exactly the initial attempt plus one retry — no infinite loop"
);
assert_eq!(recoveries.load(std::sync::atomic::Ordering::SeqCst), 1);
}
#[tokio::test]
async fn link_retry_does_not_retry_on_wedge() {
let attempts = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let recoveries = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let outcome = link_with_retry(
|| {
attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async { CompileRecvOutcome::Wedged }
},
|| {
recoveries.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async {}
},
5,
)
.await;
assert!(matches!(outcome, CompileRecvOutcome::Wedged));
assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 1);
assert_eq!(recoveries.load(std::sync::atomic::Ordering::SeqCst), 0);
}
#[tokio::test]
async fn link_retry_disabled_when_budget_is_zero() {
let attempts = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let recoveries = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let outcome = link_with_retry(
|| {
attempts.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async { CompileRecvOutcome::Failed("once".to_string()) }
},
|| {
recoveries.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async {}
},
0,
)
.await;
assert!(matches!(outcome, CompileRecvOutcome::Failed(_)));
assert_eq!(attempts.load(std::sync::atomic::Ordering::SeqCst), 1);
assert_eq!(recoveries.load(std::sync::atomic::Ordering::SeqCst), 0);
}
#[test]
fn classify_probe_outcome_pong_within_budget_means_no_kill() {
let probe: Result<Result<(), crate::ipc::IpcError>, tokio::time::error::Elapsed> =
Ok(Ok(()));
assert_eq!(classify_probe_outcome(probe), WedgeAction::DowngradeNoKill);
}
#[test]
fn classify_probe_outcome_probe_error_escalates_to_kill() {
let probe: Result<Result<(), crate::ipc::IpcError>, tokio::time::error::Elapsed> =
Ok(Err(crate::ipc::IpcError::ConnectionClosed));
assert_eq!(
classify_probe_outcome(probe),
WedgeAction::EscalateKillProbeError
);
}
#[test]
fn classify_probe_outcome_probe_timeout_escalates_to_kill() {
let elapsed = {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
rt.block_on(async {
tokio::time::timeout(
std::time::Duration::from_nanos(1),
std::future::pending::<()>(),
)
.await
.unwrap_err()
})
};
let probe: Result<Result<(), crate::ipc::IpcError>, tokio::time::error::Elapsed> =
Err(elapsed);
assert_eq!(classify_probe_outcome(probe), WedgeAction::EscalateKill);
}
#[test]
fn wedge_probe_budget_default_is_three_seconds() {
assert_eq!(
WEDGE_PROBE_DEFAULT_MS, 3_000,
"schema commits to 3s default — tooling docs reference this number"
);
}
#[tokio::test]
async fn wedge_detection_disabled_when_budget_is_none() {
let mut conn = FakeConn {
behavior: FakeBehavior::BrokenPipe,
};
let outcome = compile_recv_with_wedge_detection(&mut conn, None).await;
assert!(matches!(outcome, CompileRecvOutcome::Failed(_)));
}
#[test]
fn link_response_relay_preserves_warning_after_tool_stderr() {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let exit = relay_link_response(
Some(crate::protocol::Response::LinkResult {
exit_code: 0,
stdout: Arc::new(b"link-out".to_vec()),
stderr: Arc::new(b"link-err\n".to_vec()),
cached: true,
warning: Some("non-deterministic archive flags".to_string()),
}),
&mut stdout,
&mut stderr,
);
assert_eq!(exit, ExitCode::SUCCESS);
assert_eq!(stdout, b"link-out");
assert_eq!(
String::from_utf8(stderr).unwrap(),
"link-err\nzccache warning: non-deterministic archive flags\n"
);
}
}