use std::time::Duration;
use bytes::BytesMut;
use tokio::io::{AsyncWriteExt, ReadHalf, WriteHalf};
use tokio::net::windows::named_pipe::{NamedPipeClient, NamedPipeServer};
use crate::ipc::error::IpcError;
use super::framing::{decode_response_wire, recv_bincode_loop, recv_wire_loop};
use super::{IpcConnection, IpcListener};
pub struct IpcClientConnection {
pub(super) reader: ReadHalf<NamedPipeClient>,
pub(super) writer: WriteHalf<NamedPipeClient>,
pub(super) read_buf: BytesMut,
pub(super) recv_timeout: Option<Duration>,
pub(super) next_frame_request_id: u64,
}
impl IpcClientConnection {
pub async fn send<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), IpcError> {
let buf = crate::protocol::encode_message(msg)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(())
}
pub async fn send_prost<M: prost::Message>(&mut self, msg: &M) -> Result<(), IpcError> {
let buf = crate::protocol::wire_prost::encode_prost_message(msg)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(())
}
pub async fn send_frame_v1_request<M: prost::Message>(
&mut self,
msg: &M,
) -> Result<u64, IpcError> {
let request_id = self.next_frame_request_id;
self.next_frame_request_id = self.next_frame_request_id.wrapping_add(1);
let buf = crate::protocol::wire_frame::encode_frame_v1_request(msg, request_id)?;
self.writer.write_all(&buf).await?;
self.writer.flush().await?;
Ok(request_id)
}
pub fn set_recv_timeout(&mut self, timeout: Duration) {
self.recv_timeout = Some(timeout);
}
pub fn clear_recv_timeout(&mut self) {
self.recv_timeout = None;
}
pub fn recv_timeout(&self) -> Option<Duration> {
self.recv_timeout
}
pub async fn recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, IpcError> {
match self.recv_timeout {
Some(t) => self.recv_with_timeout(t).await,
None => self.recv_loop().await,
}
}
pub async fn recv_with_timeout<T: serde::de::DeserializeOwned>(
&mut self,
timeout: Duration,
) -> Result<Option<T>, IpcError> {
match tokio::time::timeout(timeout, self.recv_loop()).await {
Ok(result) => result,
Err(_) => Err(IpcError::Timeout(timeout)),
}
}
pub async fn recv_wire<Bincode, Prost>(
&mut self,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
match self.recv_timeout {
Some(t) => self.recv_wire_with_timeout(t).await,
None => self.recv_wire_loop().await,
}
}
pub async fn recv_wire_with_timeout<Bincode, Prost>(
&mut self,
timeout: Duration,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
match tokio::time::timeout(timeout, self.recv_wire_loop()).await {
Ok(result) => result,
Err(_) => Err(IpcError::Timeout(timeout)),
}
}
pub async fn send_request(
&mut self,
request: &crate::protocol::Request,
wire: crate::protocol::wire_prost::WireFormat,
) -> Result<(), IpcError> {
match wire {
crate::protocol::wire_prost::WireFormat::BincodeV15 => self.send(request).await,
crate::protocol::wire_prost::WireFormat::ProstV16 => {
let request_id = crate::protocol::wire_prost::default_request_id(request);
let request = crate::protocol::wire_prost::request_to_prost(request, request_id);
self.send_prost(&request).await
}
crate::protocol::wire_prost::WireFormat::FrameV1 => {
let request_id = crate::protocol::wire_prost::default_request_id(request);
let request = crate::protocol::wire_prost::request_to_prost(request, request_id);
self.send_frame_v1_request(&request).await.map(|_| ())
}
}
}
pub async fn recv_response(&mut self) -> Result<Option<crate::protocol::Response>, IpcError> {
let message = self
.recv_wire::<crate::protocol::Response, crate::protocol::wire_prost::zccache_v1::Response>()
.await?;
decode_response_wire(message)
}
pub async fn recv_response_with_timeout(
&mut self,
timeout: Duration,
) -> Result<Option<crate::protocol::Response>, IpcError> {
let message = self
.recv_wire_with_timeout::<crate::protocol::Response, crate::protocol::wire_prost::zccache_v1::Response>(timeout)
.await?;
decode_response_wire(message)
}
async fn recv_loop<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, IpcError> {
recv_bincode_loop(&mut self.reader, &mut self.read_buf).await
}
async fn recv_wire_loop<Bincode, Prost>(
&mut self,
) -> Result<Option<crate::protocol::DecodedWireMessage<Bincode, Prost>>, IpcError>
where
Bincode: serde::de::DeserializeOwned,
Prost: prost::Message + Default,
{
recv_wire_loop(&mut self.reader, &mut self.read_buf).await
}
}
impl IpcListener {
pub(super) async fn accept_windows(&mut self) -> Result<IpcConnection, IpcError> {
loop {
let pipe = match self.inner.pool.pop_front() {
Some(p) => p,
None => {
tracing::warn!(
endpoint = %self.inner.endpoint,
"named-pipe pool exhausted — attempting emergency create (issue #666)"
);
create_replacement_pipe(&self.inner.endpoint)?
}
};
if let Err(e) = pipe.connect().await {
tracing::warn!(
endpoint = %self.inner.endpoint,
error = %e,
"named-pipe connect failed — replenishing and retrying"
);
if let Ok(replacement) = create_replacement_pipe(&self.inner.endpoint) {
self.inner.pool.push_back(replacement);
}
continue;
}
match create_replacement_pipe_with_retry(&self.inner.endpoint).await {
Some(replacement) => self.inner.pool.push_back(replacement),
None => tracing::error!(
endpoint = %self.inner.endpoint,
"named-pipe replacement create failed after retries — \
pool slot temporarily unfilled; next accept will recreate (issue #666)"
),
}
let (reader, writer) = tokio::io::split(pipe);
return Ok(IpcConnection {
reader,
writer,
read_buf: BytesMut::with_capacity(4096),
recv_timeout: None,
next_frame_request_id: 1,
});
}
}
#[cfg(test)]
pub(crate) fn test_drain_pool(&mut self) -> usize {
let drained = self.inner.pool.len();
self.inner.pool.clear();
drained
}
}
pub(super) fn create_replacement_pipe(endpoint: &str) -> Result<NamedPipeServer, IpcError> {
use tokio::net::windows::named_pipe::ServerOptions;
Ok(ServerOptions::new()
.first_pipe_instance(false)
.create(endpoint)?)
}
async fn create_replacement_pipe_with_retry(endpoint: &str) -> Option<NamedPipeServer> {
const ATTEMPTS: u32 = 5;
const INITIAL_DELAY_MS: u64 = 5;
const MAX_DELAY_MS: u64 = 80;
let mut delay_ms = INITIAL_DELAY_MS;
for attempt in 0..ATTEMPTS {
match create_replacement_pipe(endpoint) {
Ok(p) => return Some(p),
Err(e) => {
tracing::warn!(
attempt = attempt + 1,
max_attempts = ATTEMPTS,
error = %e,
endpoint = %endpoint,
"named-pipe replacement create retry"
);
if attempt + 1 < ATTEMPTS {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
}
}
}
}
None
}
pub async fn connect(endpoint: &str) -> Result<IpcClientConnection, IpcError> {
use tokio::net::windows::named_pipe::ClientOptions;
const MAX_PIPE_BUSY_RETRIES: u32 = 50;
const INITIAL_BACKOFF_MS: u64 = 10;
const MAX_BACKOFF_MS: u64 = 500;
let client = {
let mut attempts = 0u32;
let mut backoff_ms = INITIAL_BACKOFF_MS;
loop {
match ClientOptions::new().open(endpoint) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(231) => {
attempts += 1;
if attempts >= MAX_PIPE_BUSY_RETRIES {
return Err(IpcError::Io(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!(
"cannot connect to daemon at {endpoint}: \
all pipe instances busy after {attempts} retries (~{:.0}s). \
The daemon may be overloaded — reduce parallel compilation jobs \
or restart the daemon with `zccache stop && zccache start`",
backoff_ms as f64 * attempts as f64 / 2000.0
),
)));
}
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
}
Err(e) => return Err(IpcError::Io(e)),
}
}
};
let (reader, writer) = tokio::io::split(client);
Ok(IpcClientConnection {
reader,
writer,
read_buf: BytesMut::with_capacity(4096),
recv_timeout: None,
next_frame_request_id: 1,
})
}