use std::time::Duration;
use bytes::{Buf, BytesMut};
use prost::Message as _;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use super::error::IpcError;
pub const DEFAULT_CLIENT_RECV_TIMEOUT: Duration = Duration::from_secs(300);
#[cfg(unix)]
type StreamType = tokio::net::UnixStream;
#[cfg(windows)]
type StreamType = tokio::net::windows::named_pipe::NamedPipeServer;
pub struct IpcConnection {
reader: tokio::io::ReadHalf<StreamType>,
writer: tokio::io::WriteHalf<StreamType>,
read_buf: BytesMut,
recv_timeout: Option<Duration>,
next_frame_request_id: u64,
}
#[cfg(windows)]
pub struct IpcClientConnection {
reader: tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>,
writer: tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>,
read_buf: BytesMut,
recv_timeout: Option<Duration>,
next_frame_request_id: u64,
}
impl IpcConnection {
pub async fn try_serve_backend_handle_probe(
&mut self,
daemon: &running_process::broker::backend_handle::DaemonProcess,
) -> Result<bool, IpcError> {
try_serve_backend_handle_probe(
&mut self.reader,
&mut self.writer,
&mut self.read_buf,
daemon,
)
.await
}
pub async fn send<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), IpcError> {
let buf = crate::protocol::encode_message(msg)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(())
}
pub async fn send_prost<M: prost::Message>(&mut self, msg: &M) -> Result<(), IpcError> {
let buf = crate::protocol::wire_prost::encode_prost_message(msg)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(())
}
pub async fn send_frame_v1_request<M: prost::Message>(
&mut self,
msg: &M,
) -> Result<u64, IpcError> {
let request_id = self.next_frame_request_id;
self.next_frame_request_id = self.next_frame_request_id.wrapping_add(1);
let buf = crate::protocol::wire_frame::encode_frame_v1_request(msg, request_id)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(request_id)
}
pub async fn send_frame_v1_response<M: prost::Message>(
&mut self,
msg: &M,
request_id: u64,
) -> Result<(), IpcError> {
let buf = crate::protocol::wire_frame::encode_frame_v1_response(msg, request_id)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(())
}
pub fn set_recv_timeout(&mut self, timeout: Duration) {
self.recv_timeout = Some(timeout);
}
pub fn clear_recv_timeout(&mut self) {
self.recv_timeout = None;
}
pub fn recv_timeout(&self) -> Option<Duration> {
self.recv_timeout
}
pub async fn recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, IpcError> {
match self.recv_timeout {
Some(t) => self.recv_with_timeout(t).await,
None => self.recv_loop().await,
}
}
pub async fn recv_with_timeout<T: serde::de::DeserializeOwned>(
&mut self,
timeout: Duration,
) -> Result<Option<T>, IpcError> {
match tokio::time::timeout(timeout, self.recv_loop()).await {
Ok(result) => result,
Err(_) => Err(IpcError::Timeout(timeout)),
}
}
pub async fn recv_wire<Bincode, Prost>(
&mut self,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
match self.recv_timeout {
Some(t) => self.recv_wire_with_timeout(t).await,
None => self.recv_wire_loop().await,
}
}
pub async fn recv_wire_with_timeout<Bincode, Prost>(
&mut self,
timeout: Duration,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
match tokio::time::timeout(timeout, self.recv_wire_loop()).await {
Ok(result) => result,
Err(_) => Err(IpcError::Timeout(timeout)),
}
}
pub async fn send_request(
&mut self,
request: &crate::protocol::Request,
wire: crate::protocol::wire_prost::WireFormat,
) -> Result<(), IpcError> {
match wire {
crate::protocol::wire_prost::WireFormat::BincodeV15 => self.send(request).await,
crate::protocol::wire_prost::WireFormat::ProstV16 => {
let request_id = crate::protocol::wire_prost::default_request_id(request);
let request = crate::protocol::wire_prost::request_to_prost(request, request_id);
self.send_prost(&request).await
}
crate::protocol::wire_prost::WireFormat::FrameV1 => {
let request_id = crate::protocol::wire_prost::default_request_id(request);
let request = crate::protocol::wire_prost::request_to_prost(request, request_id);
self.send_frame_v1_request(&request).await.map(|_| ())
}
}
}
pub async fn recv_response(&mut self) -> Result<Option<crate::protocol::Response>, IpcError> {
let message = self
.recv_wire::<crate::protocol::Response, crate::protocol::wire_prost::zccache_v1::Response>()
.await?;
decode_response_wire(message)
}
pub async fn recv_response_with_timeout(
&mut self,
timeout: Duration,
) -> Result<Option<crate::protocol::Response>, IpcError> {
let message = self
.recv_wire_with_timeout::<crate::protocol::Response, crate::protocol::wire_prost::zccache_v1::Response>(timeout)
.await?;
decode_response_wire(message)
}
async fn recv_loop<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, IpcError> {
recv_bincode_loop(&mut self.reader, &mut self.read_buf).await
}
async fn recv_wire_loop<Bincode, Prost>(
&mut self,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
recv_wire_loop(&mut self.reader, &mut self.read_buf).await
}
}
#[cfg(windows)]
impl IpcClientConnection {
pub async fn send<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), IpcError> {
let buf = crate::protocol::encode_message(msg)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(())
}
pub async fn send_prost<M: prost::Message>(&mut self, msg: &M) -> Result<(), IpcError> {
let buf = crate::protocol::wire_prost::encode_prost_message(msg)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(())
}
pub async fn send_frame_v1_request<M: prost::Message>(
&mut self,
msg: &M,
) -> Result<u64, IpcError> {
let request_id = self.next_frame_request_id;
self.next_frame_request_id = self.next_frame_request_id.wrapping_add(1);
let buf = crate::protocol::wire_frame::encode_frame_v1_request(msg, request_id)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(request_id)
}
pub fn set_recv_timeout(&mut self, timeout: Duration) {
self.recv_timeout = Some(timeout);
}
pub fn clear_recv_timeout(&mut self) {
self.recv_timeout = None;
}
pub fn recv_timeout(&self) -> Option<Duration> {
self.recv_timeout
}
pub async fn recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, IpcError> {
match self.recv_timeout {
Some(t) => self.recv_with_timeout(t).await,
None => self.recv_loop().await,
}
}
pub async fn recv_with_timeout<T: serde::de::DeserializeOwned>(
&mut self,
timeout: Duration,
) -> Result<Option<T>, IpcError> {
match tokio::time::timeout(timeout, self.recv_loop()).await {
Ok(result) => result,
Err(_) => Err(IpcError::Timeout(timeout)),
}
}
pub async fn recv_wire<Bincode, Prost>(
&mut self,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
match self.recv_timeout {
Some(t) => self.recv_wire_with_timeout(t).await,
None => self.recv_wire_loop().await,
}
}
pub async fn recv_wire_with_timeout<Bincode, Prost>(
&mut self,
timeout: Duration,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
match tokio::time::timeout(timeout, self.recv_wire_loop()).await {
Ok(result) => result,
Err(_) => Err(IpcError::Timeout(timeout)),
}
}
pub async fn send_request(
&mut self,
request: &crate::protocol::Request,
wire: crate::protocol::wire_prost::WireFormat,
) -> Result<(), IpcError> {
match wire {
crate::protocol::wire_prost::WireFormat::BincodeV15 => self.send(request).await,
crate::protocol::wire_prost::WireFormat::ProstV16 => {
let request_id = crate::protocol::wire_prost::default_request_id(request);
let request = crate::protocol::wire_prost::request_to_prost(request, request_id);
self.send_prost(&request).await
}
crate::protocol::wire_prost::WireFormat::FrameV1 => {
let request_id = crate::protocol::wire_prost::default_request_id(request);
let request = crate::protocol::wire_prost::request_to_prost(request, request_id);
self.send_frame_v1_request(&request).await.map(|_| ())
}
}
}
pub async fn recv_response(&mut self) -> Result<Option<crate::protocol::Response>, IpcError> {
let message = self
.recv_wire::<crate::protocol::Response, crate::protocol::wire_prost::zccache_v1::Response>()
.await?;
decode_response_wire(message)
}
pub async fn recv_response_with_timeout(
&mut self,
timeout: Duration,
) -> Result<Option<crate::protocol::Response>, IpcError> {
let message = self
.recv_wire_with_timeout::<crate::protocol::Response, crate::protocol::wire_prost::zccache_v1::Response>(timeout)
.await?;
decode_response_wire(message)
}
async fn recv_loop<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, IpcError> {
recv_bincode_loop(&mut self.reader, &mut self.read_buf).await
}
async fn recv_wire_loop<Bincode, Prost>(
&mut self,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
recv_wire_loop(&mut self.reader, &mut self.read_buf).await
}
}
fn decode_response_wire(
message: Option<
crate::protocol::DecodedWireMessage<
crate::protocol::Response,
crate::protocol::wire_prost::zccache_v1::Response,
>,
>,
) -> Result<Option<crate::protocol::Response>, IpcError> {
message
.map(crate::protocol::wire_prost::response_from_decoded_wire)
.transpose()
.map_err(IpcError::Protocol)
}
async fn recv_bincode_loop<R, T>(
reader: &mut R,
read_buf: &mut BytesMut,
) -> Result<Option<T>, IpcError>
where
R: AsyncRead + Unpin,
T: serde::de::DeserializeOwned,
{
loop {
if let Some(msg) = crate::protocol::decode_message::<T>(read_buf)? {
return Ok(Some(msg));
}
if !read_next_chunk(reader, read_buf).await? {
return Ok(None);
}
}
}
async fn recv_wire_loop<R, Bincode, Prost>(
reader: &mut R,
read_buf: &mut BytesMut,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
R: AsyncRead + Unpin,
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
loop {
if let Some(msg) = crate::protocol::decode_wire_message::<Bincode, Prost>(read_buf)? {
return Ok(Some(msg));
}
if !read_next_chunk(reader, read_buf).await? {
return Ok(None);
}
}
}
async fn read_next_chunk<R>(reader: &mut R, read_buf: &mut BytesMut) -> Result<bool, IpcError>
where
R: AsyncRead + Unpin,
{
let mut tmp = [0u8; 4096];
let n = reader.read(&mut tmp).await?;
if n == 0 {
if read_buf.is_empty() {
return Ok(false);
}
return Err(IpcError::ConnectionClosed);
}
read_buf.extend_from_slice(&tmp[..n]);
Ok(true)
}
async fn try_serve_backend_handle_probe<R, W>(
reader: &mut R,
writer: &mut W,
read_buf: &mut BytesMut,
daemon: &running_process::broker::backend_handle::DaemonProcess,
) -> Result<bool, IpcError>
where
R: AsyncRead + Unpin,
W: tokio::io::AsyncWrite + Unpin,
{
ensure_buffered(reader, read_buf, 8).await?;
if read_buf.is_empty() {
return Ok(false);
}
let running_process_version = running_process::broker::protocol::ENVELOPE_VERSION;
if read_buf[0] != running_process_version {
return Ok(false);
}
let zccache_len = u32::from_le_bytes([read_buf[0], read_buf[1], read_buf[2], read_buf[3]]);
let zccache_version = u32::from_le_bytes([read_buf[4], read_buf[5], read_buf[6], read_buf[7]]);
if zccache_len >= 4
&& matches!(
zccache_version,
crate::protocol::BINCODE_PROTOCOL_VERSION | crate::protocol::PROST_PROTOCOL_VERSION
)
{
return Ok(false);
}
let body_len =
u32::from_le_bytes([read_buf[1], read_buf[2], read_buf[3], read_buf[4]]) as usize;
if body_len > running_process::broker::protocol::MAX_FRAME_BYTES {
return Err(IpcError::Endpoint(format!(
"running-process BackendHandle probe frame too large: {body_len} bytes"
)));
}
ensure_buffered(reader, read_buf, 5 + body_len).await?;
let frame = running_process::broker::protocol::Frame::decode(&read_buf[5..5 + body_len])
.map_err(|err| IpcError::Endpoint(format!("BackendHandle probe decode failed: {err}")))?;
if !is_backend_handle_probe_request(&frame) {
if frame.payload_protocol == crate::protocol::wire_frame::ZCCACHE_FRAME_PAYLOAD_PROTOCOL {
return Ok(false);
}
return Err(IpcError::Endpoint(
"unexpected running-process frame on zccache daemon endpoint".to_string(),
));
}
read_buf.advance(5 + body_len);
let response = backend_handle_probe_response(&frame, daemon)?;
write_running_process_frame(writer, &response).await?;
Ok(true)
}
async fn ensure_buffered<R>(
reader: &mut R,
read_buf: &mut BytesMut,
min_len: usize,
) -> Result<(), IpcError>
where
R: AsyncRead + Unpin,
{
while read_buf.len() < min_len {
if !read_next_chunk(reader, read_buf).await? {
if read_buf.is_empty() {
return Ok(());
}
return Err(IpcError::ConnectionClosed);
}
}
Ok(())
}
fn is_backend_handle_probe_request(frame: &running_process::broker::protocol::Frame) -> bool {
use running_process::broker::backend_lifecycle::probe::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL;
use running_process::broker::protocol::{FrameKind, PayloadEncoding};
frame.envelope_version == 1
&& FrameKind::try_from(frame.kind) == Ok(FrameKind::Request)
&& frame.payload_protocol == BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL
&& PayloadEncoding::try_from(frame.payload_encoding) == Ok(PayloadEncoding::None)
&& frame.payload.len() == 32
}
fn backend_handle_probe_response(
request: &running_process::broker::protocol::Frame,
daemon: &running_process::broker::backend_handle::DaemonProcess,
) -> Result<running_process::broker::protocol::Frame, IpcError> {
use running_process::broker::backend_lifecycle::probe::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL;
use running_process::broker::protocol::{Frame, FrameKind, PayloadEncoding};
let mut payload = Vec::with_capacity(32 + 128);
payload.extend_from_slice(&request.payload);
daemon.to_proto().encode(&mut payload).map_err(|err| {
IpcError::Endpoint(format!("BackendHandle identity encode failed: {err}"))
})?;
Ok(Frame {
envelope_version: 1,
kind: FrameKind::Response as i32,
payload_protocol: BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL,
payload,
request_id: request.request_id,
payload_encoding: PayloadEncoding::None as i32,
deadline_unix_ms: 0,
traceparent: request.traceparent.clone(),
tracestate: request.tracestate.clone(),
})
}
async fn write_running_process_frame<W>(
writer: &mut W,
frame: &running_process::broker::protocol::Frame,
) -> Result<(), IpcError>
where
W: tokio::io::AsyncWrite + Unpin,
{
let mut body = Vec::new();
frame.encode(&mut body).map_err(|err| {
IpcError::Endpoint(format!("BackendHandle response encode failed: {err}"))
})?;
if body.len() > running_process::broker::protocol::MAX_FRAME_BYTES {
return Err(IpcError::Endpoint(format!(
"BackendHandle response frame too large: {} bytes",
body.len()
)));
}
writer
.write_all(&[running_process::broker::protocol::ENVELOPE_VERSION])
.await?;
writer.write_all(&(body.len() as u32).to_le_bytes()).await?;
writer.write_all(&body).await?;
writer.flush().await?;
Ok(())
}
pub struct IpcListener {
inner: ListenerInner,
}
#[cfg(unix)]
struct ListenerInner {
listener: tokio::net::UnixListener,
}
#[cfg(windows)]
struct ListenerInner {
endpoint: String,
pool: std::collections::VecDeque<tokio::net::windows::named_pipe::NamedPipeServer>,
}
impl IpcListener {
pub fn bind(endpoint: &str) -> Result<Self, IpcError> {
#[cfg(unix)]
{
let _ = std::fs::remove_file(endpoint);
if let Some(parent) = std::path::Path::new(endpoint).parent() {
std::fs::create_dir_all(parent)?;
}
let listener = tokio::net::UnixListener::bind(endpoint)?;
Ok(Self {
inner: ListenerInner { listener },
})
}
#[cfg(windows)]
{
use std::collections::VecDeque;
use tokio::net::windows::named_pipe::ServerOptions;
let pool_size = std::env::var("ZCCACHE_PIPE_POOL_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get().saturating_mul(4))
.unwrap_or(64)
.clamp(16, 128)
});
let mut pool = VecDeque::with_capacity(pool_size);
for i in 0..pool_size {
let pipe = ServerOptions::new()
.first_pipe_instance(i == 0)
.create(endpoint)?;
pool.push_back(pipe);
}
Ok(Self {
inner: ListenerInner {
endpoint: endpoint.to_string(),
pool,
},
})
}
}
pub async fn accept(&mut self) -> Result<IpcConnection, IpcError> {
#[cfg(unix)]
{
let (stream, _addr) = self.inner.listener.accept().await?;
let (reader, writer) = tokio::io::split(stream);
Ok(IpcConnection {
reader,
writer,
read_buf: BytesMut::with_capacity(4096),
recv_timeout: None,
next_frame_request_id: 1,
})
}
#[cfg(windows)]
{
self.accept_windows().await
}
}
#[cfg(windows)]
async fn accept_windows(&mut self) -> Result<IpcConnection, IpcError> {
loop {
let pipe = match self.inner.pool.pop_front() {
Some(p) => p,
None => {
tracing::warn!(
endpoint = %self.inner.endpoint,
"named-pipe pool exhausted — attempting emergency create (issue #666)"
);
create_replacement_pipe(&self.inner.endpoint)?
}
};
if let Err(e) = pipe.connect().await {
tracing::warn!(
endpoint = %self.inner.endpoint,
error = %e,
"named-pipe connect failed — replenishing and retrying"
);
if let Ok(replacement) = create_replacement_pipe(&self.inner.endpoint) {
self.inner.pool.push_back(replacement);
}
continue;
}
match create_replacement_pipe_with_retry(&self.inner.endpoint).await {
Some(replacement) => self.inner.pool.push_back(replacement),
None => tracing::error!(
endpoint = %self.inner.endpoint,
"named-pipe replacement create failed after retries — \
pool slot temporarily unfilled; next accept will recreate (issue #666)"
),
}
let (reader, writer) = tokio::io::split(pipe);
return Ok(IpcConnection {
reader,
writer,
read_buf: BytesMut::with_capacity(4096),
recv_timeout: None,
next_frame_request_id: 1,
});
}
}
#[cfg(all(windows, test))]
pub(crate) fn test_drain_pool(&mut self) -> usize {
let drained = self.inner.pool.len();
self.inner.pool.clear();
drained
}
}
#[cfg(windows)]
fn create_replacement_pipe(
endpoint: &str,
) -> Result<tokio::net::windows::named_pipe::NamedPipeServer, IpcError> {
use tokio::net::windows::named_pipe::ServerOptions;
Ok(ServerOptions::new()
.first_pipe_instance(false)
.create(endpoint)?)
}
#[cfg(windows)]
async fn create_replacement_pipe_with_retry(
endpoint: &str,
) -> Option<tokio::net::windows::named_pipe::NamedPipeServer> {
const ATTEMPTS: u32 = 5;
const INITIAL_DELAY_MS: u64 = 5;
const MAX_DELAY_MS: u64 = 80;
let mut delay_ms = INITIAL_DELAY_MS;
for attempt in 0..ATTEMPTS {
match create_replacement_pipe(endpoint) {
Ok(p) => return Some(p),
Err(e) => {
tracing::warn!(
attempt = attempt + 1,
max_attempts = ATTEMPTS,
error = %e,
endpoint = %endpoint,
"named-pipe replacement create retry"
);
if attempt + 1 < ATTEMPTS {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
}
}
}
}
None
}
#[cfg(unix)]
pub async fn connect(endpoint: &str) -> Result<IpcConnection, IpcError> {
let stream = tokio::net::UnixStream::connect(endpoint).await?;
let (reader, writer) = tokio::io::split(stream);
Ok(IpcConnection {
reader,
writer,
read_buf: BytesMut::with_capacity(4096),
recv_timeout: None,
next_frame_request_id: 1,
})
}
#[cfg(windows)]
pub async fn connect(endpoint: &str) -> Result<IpcClientConnection, IpcError> {
use tokio::net::windows::named_pipe::ClientOptions;
const MAX_PIPE_BUSY_RETRIES: u32 = 50;
const INITIAL_BACKOFF_MS: u64 = 10;
const MAX_BACKOFF_MS: u64 = 500;
let client = {
let mut attempts = 0u32;
let mut backoff_ms = INITIAL_BACKOFF_MS;
loop {
match ClientOptions::new().open(endpoint) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(231) => {
attempts += 1;
if attempts >= MAX_PIPE_BUSY_RETRIES {
return Err(IpcError::Io(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!(
"cannot connect to daemon at {endpoint}: \
all pipe instances busy after {attempts} retries (~{:.0}s). \
The daemon may be overloaded — reduce parallel compilation jobs \
or restart the daemon with `zccache stop && zccache start`",
backoff_ms as f64 * attempts as f64 / 2000.0
),
)));
}
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
}
Err(e) => return Err(IpcError::Io(e)),
}
}
};
let (reader, writer) = tokio::io::split(client);
Ok(IpcClientConnection {
reader,
writer,
read_buf: BytesMut::with_capacity(4096),
recv_timeout: None,
next_frame_request_id: 1,
})
}
pub fn unique_test_endpoint() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
#[cfg(unix)]
{
format!("/tmp/zccache-test-{pid}-{id}.sock")
}
#[cfg(windows)]
{
format!(r"\\.\pipe\zccache-test-{pid}-{id}")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::{wire_prost::zccache_v1 as pb, DecodedWireMessage, Request, Response};
#[tokio::test]
async fn test_ping_pong() {
let endpoint = unique_test_endpoint();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
let msg: Option<Request> = conn.recv().await.unwrap();
assert_eq!(msg, Some(Request::Ping));
conn.send(&Response::Pong).await.unwrap();
});
let mut client = connect(&endpoint).await.unwrap();
client.send(&Request::Ping).await.unwrap();
let resp: Option<Response> = client.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
server.await.unwrap();
}
#[tokio::test]
async fn test_multiple_messages() {
let endpoint = unique_test_endpoint();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
for _ in 0..5 {
let msg: Option<Request> = conn.recv().await.unwrap();
assert_eq!(msg, Some(Request::Ping));
conn.send(&Response::Pong).await.unwrap();
}
});
let mut client = connect(&endpoint).await.unwrap();
for _ in 0..5 {
client.send(&Request::Ping).await.unwrap();
let resp: Option<Response> = client.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
}
server.await.unwrap();
}
#[tokio::test]
async fn recv_wire_accepts_bincode_request_on_live_ipc() {
let endpoint = unique_test_endpoint();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
let msg: Option<DecodedWireMessage<Request, pb::Request>> =
conn.recv_wire().await.unwrap();
assert_eq!(msg, Some(DecodedWireMessage::BincodeV15(Request::Ping)));
conn.send(&Response::Pong).await.unwrap();
});
let mut client = connect(&endpoint).await.unwrap();
client.send(&Request::Ping).await.unwrap();
let resp: Option<Response> = client.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
server.await.unwrap();
}
#[tokio::test]
async fn recv_wire_accepts_prost_request_on_live_ipc() {
let endpoint = unique_test_endpoint();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
let msg: Option<DecodedWireMessage<Request, pb::Request>> =
conn.recv_wire().await.unwrap();
match msg {
Some(DecodedWireMessage::ProstV16(request)) => {
assert_eq!(request.request_id, "live-prost");
assert!(matches!(request.body, Some(pb::request::Body::Ping(_))));
}
other => panic!("expected prost request, got {other:?}"),
}
conn.send(&Response::Pong).await.unwrap();
});
let mut client = connect(&endpoint).await.unwrap();
let request = pb::Request {
body: Some(pb::request::Body::Ping(pb::Empty {})),
request_id: "live-prost".to_string(),
};
client.send_prost(&request).await.unwrap();
let resp: Option<Response> = client.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
server.await.unwrap();
}
#[tokio::test]
async fn backend_handle_probe_detector_preserves_zccache_requests() {
let endpoint = unique_test_endpoint();
let daemon = crate::ipc::current_backend_identity(&endpoint).unwrap();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
assert!(!conn.try_serve_backend_handle_probe(&daemon).await.unwrap());
let msg: Option<Request> = conn.recv().await.unwrap();
assert_eq!(msg, Some(Request::Ping));
conn.send(&Response::Pong).await.unwrap();
});
let mut client = connect(&endpoint).await.unwrap();
client.send(&Request::Ping).await.unwrap();
let resp: Option<Response> = client.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
server.await.unwrap();
}
#[tokio::test]
async fn backend_handle_probe_succeeds_on_direct_endpoint() {
let endpoint = unique_test_endpoint();
let daemon = crate::ipc::current_backend_identity(&endpoint).unwrap();
let probe_endpoint = daemon.ipc_endpoint.clone();
let expected_daemon = daemon.clone();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.unwrap();
assert!(conn.try_serve_backend_handle_probe(&daemon).await.unwrap());
});
let (service_name, handle_endpoint) = tokio::task::spawn_blocking(move || {
let handle =
running_process::broker::backend_handle::BackendHandle::probe_with_service(
"zccache",
crate::core::VERSION,
&probe_endpoint,
&expected_daemon,
)
.unwrap();
(
handle.service_name.clone(),
handle.daemon_process.ipc_endpoint.path.clone(),
)
})
.await
.unwrap();
assert_eq!(service_name, "zccache");
assert_eq!(
handle_endpoint,
crate::ipc::running_process_endpoint(&endpoint).path
);
server.await.unwrap();
}
#[tokio::test]
async fn test_connection_closed() {
let endpoint = unique_test_endpoint();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let server = tokio::spawn(async move {
let _conn = listener.accept().await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let mut client = connect(&endpoint).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let resp: Result<Option<Response>, _> = client.recv().await;
match resp {
Ok(None) => {}
Err(IpcError::ConnectionClosed) => {}
Err(IpcError::Io(_)) => {}
other => panic!("unexpected result: {other:?}"),
}
server.await.unwrap();
}
#[cfg(windows)]
#[tokio::test]
async fn pool_recovers_from_full_depletion() {
let endpoint = unique_test_endpoint();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let drained = listener.test_drain_pool();
assert!(drained > 0, "fresh listener should have pre-created pipes");
let server = tokio::spawn(async move {
let mut conn = listener.accept().await.expect("accept after drain");
let msg: Option<Request> = conn.recv().await.unwrap();
assert_eq!(msg, Some(Request::Ping));
conn.send(&Response::Pong).await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut client = connect(&endpoint).await.unwrap();
client.send(&Request::Ping).await.unwrap();
let resp: Option<Response> = client.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
server.await.unwrap();
}
#[tokio::test]
async fn test_parallel_connections() {
let endpoint = unique_test_endpoint();
let mut listener = IpcListener::bind(&endpoint).unwrap();
let n = 8;
let server = tokio::spawn(async move {
for _ in 0..n {
let mut conn = listener.accept().await.unwrap();
let msg: Option<Request> = conn.recv().await.unwrap();
assert_eq!(msg, Some(Request::Ping));
conn.send(&Response::Pong).await.unwrap();
}
});
let mut handles = Vec::new();
let ep = endpoint.clone();
for _ in 0..n {
let ep = ep.clone();
handles.push(tokio::spawn(async move {
let mut client = connect(&ep).await.unwrap();
client.send(&Request::Ping).await.unwrap();
let resp: Option<Response> = client.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
}));
}
for h in handles {
h.await.unwrap();
}
server.await.unwrap();
}
}