use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::windows::named_pipe::{
ClientOptions, NamedPipeClient, NamedPipeServer, ServerOptions,
};
use crate::error::ServiceError;
pub enum TransportStream {
Server(NamedPipeServer),
Client(NamedPipeClient),
}
impl AsyncRead for TransportStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match self.get_mut() {
TransportStream::Server(s) => Pin::new(s).poll_read(cx, buf),
TransportStream::Client(c) => Pin::new(c).poll_read(cx, buf),
}
}
}
impl AsyncWrite for TransportStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
match self.get_mut() {
TransportStream::Server(s) => Pin::new(s).poll_write(cx, buf),
TransportStream::Client(c) => Pin::new(c).poll_write(cx, buf),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match self.get_mut() {
TransportStream::Server(s) => Pin::new(s).poll_flush(cx),
TransportStream::Client(c) => Pin::new(c).poll_flush(cx),
}
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match self.get_mut() {
TransportStream::Server(s) => Pin::new(s).poll_shutdown(cx),
TransportStream::Client(c) => Pin::new(c).poll_shutdown(cx),
}
}
}
pub type TransportReadHalf = tokio::io::ReadHalf<TransportStream>;
pub type TransportWriteHalf = tokio::io::WriteHalf<TransportStream>;
pub struct TransportListener {
pipe_path: String,
pending: Option<NamedPipeServer>,
}
pub fn bind(path: PathBuf) -> Result<TransportListener, ServiceError> {
let pipe_path = path.to_string_lossy().into_owned();
let server = ServerOptions::new()
.first_pipe_instance(true)
.create(&pipe_path)
.map_err(|e| ServiceError::Ipc(format!("bind failed: {e}")))?;
Ok(TransportListener {
pipe_path,
pending: Some(server),
})
}
pub async fn accept(listener: &mut TransportListener) -> Result<TransportStream, std::io::Error> {
let server = listener.pending.take().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotConnected, "no pending pipe instance")
})?;
server.connect().await?;
let next = ServerOptions::new().create(&listener.pipe_path)?;
listener.pending = Some(next);
Ok(TransportStream::Server(server))
}
pub async fn connect(path: &PathBuf) -> Result<TransportStream, ServiceError> {
const ERROR_PIPE_BUSY: i32 = 231;
for attempt in 0u32..5 {
match ClientOptions::new().open(path) {
Ok(client) => return Ok(TransportStream::Client(client)),
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY) => {
let delay = std::time::Duration::from_millis(50 * 2u64.pow(attempt));
tokio::time::sleep(delay).await;
}
Err(e) => return Err(ServiceError::Ipc(format!("connect failed: {e}"))),
}
}
Err(ServiceError::Ipc(
"connect failed: pipe busy after retries".into(),
))
}
pub fn peer_cred_check(_stream: &TransportStream) -> bool {
true
}
pub fn cleanup(_path: &PathBuf) {
}
pub fn split(stream: TransportStream) -> (TransportReadHalf, TransportWriteHalf) {
tokio::io::split(stream)
}