#[cfg(windows)]
mod imp {
use std::ffi::OsString;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
use tokio::sync::watch;
use tokio_util::bytes::{Bytes, BytesMut};
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
use crate::comms::daemon::Broker;
use crate::comms::protocol::{CommsOut, CommsRequest};
use crate::comms::transport::{
CommsFrontend, CommsLink, MAX_FRAME_BYTES, PeerCred, serve_link,
};
const READ_CHUNK: usize = 8 * 1024;
pub struct NamedPipeLink {
server: NamedPipeServer,
codec: LengthDelimitedCodec,
read_buf: BytesMut,
}
impl NamedPipeLink {
pub fn new(server: NamedPipeServer) -> Self {
let mut codec = LengthDelimitedCodec::new();
codec.set_max_frame_length(MAX_FRAME_BYTES);
Self {
server,
codec,
read_buf: BytesMut::with_capacity(READ_CHUNK),
}
}
}
impl CommsLink for NamedPipeLink {
async fn recv(&mut self) -> std::io::Result<Option<CommsRequest>> {
loop {
if let Some(frame) = self.codec.decode(&mut self.read_buf)? {
let req = rmp_serde::from_slice(&frame).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})?;
return Ok(Some(req));
}
let n = self.server.read_buf(&mut self.read_buf).await?;
if n == 0 {
if self.read_buf.is_empty() {
return Ok(None);
}
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"peer closed mid-frame",
));
}
}
}
async fn send(&mut self, out: CommsOut) -> std::io::Result<()> {
let body = rmp_serde::to_vec_named(&out)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let mut framed = BytesMut::new();
self.codec.encode(Bytes::from(body), &mut framed)?;
self.server.write_all(&framed).await?;
self.server.flush().await
}
fn peer_cred(&self) -> PeerCred {
PeerCred::default()
}
}
pub struct NamedPipeFrontend {
first: NamedPipeServer,
pipe_name: OsString,
}
impl NamedPipeFrontend {
pub fn from_first_instance(first: NamedPipeServer, pipe_name: OsString) -> Self {
Self { first, pipe_name }
}
}
impl CommsFrontend for NamedPipeFrontend {
async fn serve(
self: Box<Self>,
broker: Arc<Broker>,
mut shutdown: watch::Receiver<bool>,
) -> std::io::Result<()> {
broker.mark_active().await;
let mut server = self.first;
loop {
tokio::select! {
conn = server.connect() => {
if let Err(e) = conn {
tracing::warn!(error = %e, "comms: pipe connect failed");
server = ServerOptions::new().create(&self.pipe_name)?;
continue;
}
let connected = server;
server = ServerOptions::new().create(&self.pipe_name)?;
let broker = broker.clone();
tokio::spawn(serve_link(broker, NamedPipeLink::new(connected)));
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
break;
}
}
}
}
Ok(())
}
}
}
#[cfg(windows)]
pub use imp::{NamedPipeFrontend, NamedPipeLink};