use bytes::{Buf, BytesMut};
use prost::Message as _;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use crate::ipc::error::IpcError;
use super::framing::ensure_buffered;
pub(super) async fn try_serve_backend_handle_probe<R, W>(
reader: &mut R,
writer: &mut W,
read_buf: &mut BytesMut,
daemon: &running_process::broker::backend_handle::DaemonProcess,
) -> Result<bool, IpcError>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
ensure_buffered(reader, read_buf, 8).await?;
if read_buf.is_empty() {
return Ok(false);
}
let running_process_version = running_process::broker::protocol::ENVELOPE_VERSION;
if read_buf[0] != running_process_version {
return Ok(false);
}
let zccache_len = u32::from_le_bytes([read_buf[0], read_buf[1], read_buf[2], read_buf[3]]);
let zccache_version = u32::from_le_bytes([read_buf[4], read_buf[5], read_buf[6], read_buf[7]]);
if zccache_len >= 4
&& matches!(
zccache_version,
crate::protocol::BINCODE_PROTOCOL_VERSION | crate::protocol::PROST_PROTOCOL_VERSION
)
{
return Ok(false);
}
let body_len =
u32::from_le_bytes([read_buf[1], read_buf[2], read_buf[3], read_buf[4]]) as usize;
if body_len > running_process::broker::protocol::MAX_FRAME_BYTES {
return Err(IpcError::Endpoint(format!(
"running-process BackendHandle probe frame too large: {body_len} bytes"
)));
}
ensure_buffered(reader, read_buf, 5 + body_len).await?;
let frame = running_process::broker::protocol::Frame::decode(&read_buf[5..5 + body_len])
.map_err(|err| IpcError::Endpoint(format!("BackendHandle probe decode failed: {err}")))?;
if !is_backend_handle_probe_request(&frame) {
if frame.payload_protocol == crate::protocol::wire_frame::ZCCACHE_FRAME_PAYLOAD_PROTOCOL {
return Ok(false);
}
return Err(IpcError::Endpoint(
"unexpected running-process frame on zccache daemon endpoint".to_string(),
));
}
read_buf.advance(5 + body_len);
let response = backend_handle_probe_response(&frame, daemon)?;
write_running_process_frame(writer, &response).await?;
Ok(true)
}
fn is_backend_handle_probe_request(frame: &running_process::broker::protocol::Frame) -> bool {
use running_process::broker::backend_lifecycle::probe::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL;
use running_process::broker::protocol::{FrameKind, PayloadEncoding};
frame.envelope_version == 1
&& FrameKind::try_from(frame.kind) == Ok(FrameKind::Request)
&& frame.payload_protocol == BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL
&& PayloadEncoding::try_from(frame.payload_encoding) == Ok(PayloadEncoding::None)
&& frame.payload.len() == 32
}
fn backend_handle_probe_response(
request: &running_process::broker::protocol::Frame,
daemon: &running_process::broker::backend_handle::DaemonProcess,
) -> Result<running_process::broker::protocol::Frame, IpcError> {
use running_process::broker::backend_lifecycle::probe::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL;
use running_process::broker::protocol::{Frame, FrameKind, PayloadEncoding};
let mut payload = Vec::with_capacity(32 + 128);
payload.extend_from_slice(&request.payload);
daemon.to_proto().encode(&mut payload).map_err(|err| {
IpcError::Endpoint(format!("BackendHandle identity encode failed: {err}"))
})?;
Ok(Frame {
envelope_version: 1,
kind: FrameKind::Response as i32,
payload_protocol: BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL,
payload,
request_id: request.request_id,
payload_encoding: PayloadEncoding::None as i32,
deadline_unix_ms: 0,
traceparent: request.traceparent.clone(),
tracestate: request.tracestate.clone(),
})
}
async fn write_running_process_frame<W>(
writer: &mut W,
frame: &running_process::broker::protocol::Frame,
) -> Result<(), IpcError>
where
W: AsyncWrite + Unpin,
{
let mut body = Vec::new();
frame.encode(&mut body).map_err(|err| {
IpcError::Endpoint(format!("BackendHandle response encode failed: {err}"))
})?;
if body.len() > running_process::broker::protocol::MAX_FRAME_BYTES {
return Err(IpcError::Endpoint(format!(
"BackendHandle response frame too large: {} bytes",
body.len()
)));
}
writer
.write_all(&[running_process::broker::protocol::ENVELOPE_VERSION])
.await?;
writer.write_all(&(body.len() as u32).to_le_bytes()).await?;
writer.write_all(&body).await?;
writer.flush().await?;
Ok(())
}