use crate::core::NormalizedPath;
use std::io::Write;
use std::path::Path;
use std::process::ExitCode;
use super::super::super::wedge_recv_timeout;
use super::super::daemon::{ensure_daemon, stop_stale_daemon};
use super::super::util::{connect, exit_code_from_i32, slurp_stdin_if_piped, LOST_CONNECTION_MSG};
pub(super) async fn cmd_compile(
endpoint: &str,
session_id: &str,
args: Vec<String>,
cwd: NormalizedPath,
compiler: NormalizedPath,
client_env: Vec<(String, String)>,
) -> ExitCode {
let stdin_bytes = slurp_stdin_if_piped();
let mut conn = match connect(endpoint).await {
Ok(c) => c,
Err(e) => {
eprintln!("zccache[err][C]: cannot connect to daemon at {endpoint}: {e}");
return ExitCode::FAILURE;
}
};
let wire = crate::protocol::wire_prost::full_family_wire_format_from_env();
let request = crate::protocol::Request::Compile {
session_id: session_id.to_string(),
args: args.clone(),
cwd: cwd.clone(),
compiler: compiler.clone(),
env: Some(client_env.clone()),
stdin: stdin_bytes.clone(),
};
if let Err(e) = conn.send_request(&request, wire).await {
eprintln!("zccache[err][S]: failed to send to daemon: {e}");
return ExitCode::FAILURE;
}
match compile_recv_with_wedge_detection(&mut conn).await {
CompileRecvOutcome::Done(recv_result) => {
relay_compile_response(recv_result, &mut std::io::stdout(), &mut std::io::stderr())
}
CompileRecvOutcome::Wedged => {
eprintln!(
"zccache[warn][W]: daemon at {endpoint} appears wedged \
(no response within wedge budget); recovering — issue #666"
);
drop(conn);
stop_stale_daemon(endpoint).await;
cmd_compile_ephemeral(endpoint, compiler.as_path(), args, cwd, client_env).await
}
CompileRecvOutcome::Failed(msg) => {
eprintln!("zccache[err][R]: {msg}");
ExitCode::FAILURE
}
}
}
#[allow(clippy::large_enum_variant)]
enum CompileRecvOutcome {
Done(Option<crate::protocol::Response>),
Wedged,
Failed(String),
}
async fn compile_recv_with_wedge_detection<C: ConnRecv>(conn: &mut C) -> CompileRecvOutcome {
match wedge_recv_timeout() {
Some(budget) => match conn.recv_with_timeout(budget).await {
Ok(opt) => CompileRecvOutcome::Done(opt),
Err(crate::ipc::IpcError::Timeout(_)) => CompileRecvOutcome::Wedged,
Err(e) => CompileRecvOutcome::Failed(format!("broken connection to daemon: {e}")),
},
None => match conn.recv().await {
Ok(opt) => CompileRecvOutcome::Done(opt),
Err(e) => CompileRecvOutcome::Failed(format!("broken connection to daemon: {e}")),
},
}
}
trait ConnRecv {
async fn recv(&mut self) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError>;
async fn recv_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError>;
}
#[cfg(unix)]
impl ConnRecv for crate::ipc::IpcConnection {
async fn recv(&mut self) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
crate::ipc::IpcConnection::recv_response(self).await
}
async fn recv_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
crate::ipc::IpcConnection::recv_response_with_timeout(self, timeout).await
}
}
#[cfg(windows)]
impl ConnRecv for crate::ipc::IpcClientConnection {
async fn recv(&mut self) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
crate::ipc::IpcClientConnection::recv_response(self).await
}
async fn recv_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
crate::ipc::IpcClientConnection::recv_response_with_timeout(self, timeout).await
}
}
pub(super) async fn cmd_compile_ephemeral(
endpoint: &str,
compiler: &Path,
args: Vec<String>,
cwd: NormalizedPath,
client_env: Vec<(String, String)>,
) -> ExitCode {
if let Err(e) = ensure_daemon(endpoint).await {
eprintln!("zccache[err][D]: cannot start daemon at {endpoint}: {e}");
return ExitCode::FAILURE;
}
let mut conn = match connect(endpoint).await {
Ok(c) => c,
Err(e) => {
eprintln!("zccache[err][C]: cannot connect to daemon at {endpoint}: {e}");
return ExitCode::FAILURE;
}
};
let stdin_bytes = slurp_stdin_if_piped();
let wire = crate::protocol::wire_prost::full_family_wire_format_from_env();
let request = crate::protocol::Request::CompileEphemeral {
client_pid: std::process::id(),
working_dir: cwd.clone(),
compiler: compiler.into(),
args,
cwd,
env: Some(client_env),
stdin: stdin_bytes,
};
if let Err(e) = conn.send_request(&request, wire).await {
eprintln!("zccache[err][S]: failed to send to daemon: {e}");
return ExitCode::FAILURE;
}
match compile_recv_with_wedge_detection(&mut conn).await {
CompileRecvOutcome::Done(recv_result) => {
relay_compile_response(recv_result, &mut std::io::stdout(), &mut std::io::stderr())
}
CompileRecvOutcome::Wedged => {
eprintln!(
"zccache[err][W]: daemon at {endpoint} stopped responding within \
the wedge budget; killing it so the next compile starts fresh — issue #666"
);
drop(conn);
stop_stale_daemon(endpoint).await;
ExitCode::FAILURE
}
CompileRecvOutcome::Failed(msg) => {
eprintln!("zccache[err][R]: {msg}");
ExitCode::FAILURE
}
}
}
pub(super) async fn cmd_link_ephemeral(
endpoint: &str,
tool: &Path,
args: Vec<String>,
cwd: NormalizedPath,
client_env: Vec<(String, String)>,
) -> ExitCode {
if let Err(e) = ensure_daemon(endpoint).await {
eprintln!("zccache[err][D]: cannot start daemon at {endpoint}: {e}");
return ExitCode::FAILURE;
}
let mut conn = match connect(endpoint).await {
Ok(c) => c,
Err(e) => {
eprintln!("zccache[err][C]: cannot connect to daemon at {endpoint}: {e}");
return ExitCode::FAILURE;
}
};
let wire = crate::protocol::wire_prost::full_family_wire_format_from_env();
let request = crate::protocol::Request::LinkEphemeral {
client_pid: std::process::id(),
tool: tool.into(),
args,
cwd,
env: Some(client_env),
};
if let Err(e) = conn.send_request(&request, wire).await {
eprintln!("zccache[err][S]: failed to send to daemon: {e}");
return ExitCode::FAILURE;
}
match compile_recv_with_wedge_detection(&mut conn).await {
CompileRecvOutcome::Done(recv_result) => {
relay_link_response(recv_result, &mut std::io::stdout(), &mut std::io::stderr())
}
CompileRecvOutcome::Wedged => {
eprintln!(
"zccache[err][W]: daemon at {endpoint} stopped responding within \
the wedge budget on a Link; killing it so the next request starts \
fresh — issue #666"
);
drop(conn);
stop_stale_daemon(endpoint).await;
ExitCode::FAILURE
}
CompileRecvOutcome::Failed(msg) => {
eprintln!("zccache[err][R]: {msg}");
ExitCode::FAILURE
}
}
}
fn relay_compile_response<W: Write, E: Write>(
recv_result: Option<crate::protocol::Response>,
stdout: &mut W,
stderr: &mut E,
) -> ExitCode {
match recv_result {
Some(crate::protocol::Response::CompileResult {
exit_code,
stdout: out,
stderr: err,
..
}) => {
let _ = stdout.write_all(&out);
let _ = stderr.write_all(&err);
exit_code_from_i32(exit_code)
}
Some(crate::protocol::Response::Error { message }) => {
let _ = writeln!(stderr, "zccache[err][E]: daemon error: {message}");
ExitCode::FAILURE
}
None => {
let _ = writeln!(stderr, "{LOST_CONNECTION_MSG}");
ExitCode::FAILURE
}
Some(other) => {
let _ = writeln!(
stderr,
"zccache[err][U]: unexpected response from daemon: {other:?}"
);
ExitCode::FAILURE
}
}
}
fn relay_link_response<W: Write, E: Write>(
recv_result: Option<crate::protocol::Response>,
stdout: &mut W,
stderr: &mut E,
) -> ExitCode {
match recv_result {
Some(crate::protocol::Response::LinkResult {
exit_code,
stdout: out,
stderr: err,
warning,
..
}) => {
let _ = stdout.write_all(&out);
let _ = stderr.write_all(&err);
if let Some(w) = warning {
let _ = writeln!(stderr, "zccache warning: {w}");
}
exit_code_from_i32(exit_code)
}
Some(crate::protocol::Response::Error { message }) => {
let _ = writeln!(stderr, "zccache[err][E]: daemon error: {message}");
ExitCode::FAILURE
}
None => {
let _ = writeln!(stderr, "{LOST_CONNECTION_MSG}");
ExitCode::FAILURE
}
Some(other) => {
let _ = writeln!(
stderr,
"zccache[err][U]: unexpected response from daemon: {other:?}"
);
ExitCode::FAILURE
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn compile_response_relay_writes_stdout_stderr_and_exit_code() {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let exit = relay_compile_response(
Some(crate::protocol::Response::CompileResult {
exit_code: 7,
stdout: Arc::new(b"compiler-out".to_vec()),
stderr: Arc::new(b"compiler-err".to_vec()),
cached: false,
}),
&mut stdout,
&mut stderr,
);
assert_eq!(exit, ExitCode::from(7));
assert_eq!(stdout, b"compiler-out");
assert_eq!(stderr, b"compiler-err");
}
struct FakeConn {
behavior: FakeBehavior,
}
#[allow(clippy::large_enum_variant)]
enum FakeBehavior {
Ok(crate::protocol::Response),
TimesOut,
BrokenPipe,
}
impl ConnRecv for FakeConn {
async fn recv(
&mut self,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
match &self.behavior {
FakeBehavior::Ok(r) => Ok(Some(r.clone())),
FakeBehavior::TimesOut => {
futures::future::pending::<()>().await;
unreachable!()
}
FakeBehavior::BrokenPipe => Err(crate::ipc::IpcError::ConnectionClosed),
}
}
async fn recv_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<Option<crate::protocol::Response>, crate::ipc::IpcError> {
match &self.behavior {
FakeBehavior::Ok(r) => Ok(Some(r.clone())),
FakeBehavior::TimesOut => {
tokio::time::sleep(timeout).await;
Err(crate::ipc::IpcError::Timeout(timeout))
}
FakeBehavior::BrokenPipe => Err(crate::ipc::IpcError::ConnectionClosed),
}
}
}
#[tokio::test]
async fn wedge_detection_returns_done_on_normal_response() {
std::env::set_var("ZCCACHE_WEDGE_RECV_TIMEOUT_SECS", "1");
let mut conn = FakeConn {
behavior: FakeBehavior::Ok(crate::protocol::Response::Pong),
};
let outcome = compile_recv_with_wedge_detection(&mut conn).await;
std::env::remove_var("ZCCACHE_WEDGE_RECV_TIMEOUT_SECS");
assert!(matches!(
outcome,
CompileRecvOutcome::Done(Some(crate::protocol::Response::Pong))
));
}
#[tokio::test]
async fn wedge_detection_returns_wedged_on_recv_timeout() {
std::env::set_var("ZCCACHE_WEDGE_RECV_TIMEOUT_SECS", "1");
let mut conn = FakeConn {
behavior: FakeBehavior::TimesOut,
};
let started = std::time::Instant::now();
let outcome = compile_recv_with_wedge_detection(&mut conn).await;
let elapsed = started.elapsed();
std::env::remove_var("ZCCACHE_WEDGE_RECV_TIMEOUT_SECS");
assert!(matches!(outcome, CompileRecvOutcome::Wedged));
assert!(
elapsed < std::time::Duration::from_secs(3),
"wedge detection took {elapsed:?} against a never-responding fake — \
issue #666 expects bounded fail-fast at the configured budget"
);
}
#[tokio::test]
async fn wedge_detection_does_not_misclassify_broken_pipe_as_wedge() {
std::env::set_var("ZCCACHE_WEDGE_RECV_TIMEOUT_SECS", "1");
let mut conn = FakeConn {
behavior: FakeBehavior::BrokenPipe,
};
let outcome = compile_recv_with_wedge_detection(&mut conn).await;
std::env::remove_var("ZCCACHE_WEDGE_RECV_TIMEOUT_SECS");
assert!(matches!(outcome, CompileRecvOutcome::Failed(_)));
}
#[tokio::test]
async fn wedge_detection_disabled_when_env_is_zero() {
std::env::set_var("ZCCACHE_WEDGE_RECV_TIMEOUT_SECS", "0");
let mut conn = FakeConn {
behavior: FakeBehavior::BrokenPipe,
};
let outcome = compile_recv_with_wedge_detection(&mut conn).await;
std::env::remove_var("ZCCACHE_WEDGE_RECV_TIMEOUT_SECS");
assert!(matches!(outcome, CompileRecvOutcome::Failed(_)));
}
#[test]
fn link_response_relay_preserves_warning_after_tool_stderr() {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let exit = relay_link_response(
Some(crate::protocol::Response::LinkResult {
exit_code: 0,
stdout: Arc::new(b"link-out".to_vec()),
stderr: Arc::new(b"link-err\n".to_vec()),
cached: true,
warning: Some("non-deterministic archive flags".to_string()),
}),
&mut stdout,
&mut stderr,
);
assert_eq!(exit, ExitCode::SUCCESS);
assert_eq!(stdout, b"link-out");
assert_eq!(
String::from_utf8(stderr).unwrap(),
"link-err\nzccache warning: non-deterministic archive flags\n"
);
}
}