use futures_util::{SinkExt, StreamExt};
use interprocess::os::windows::named_pipe::{pipe_mode, tokio::DuplexPipeStream};
use lhm_shared::PIPE_NAME;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll, ready},
};
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use crate::codec::{LHMFrame, LHMFrameCodec};
pub type Pipe = Framed<DuplexPipeStream<pipe_mode::Bytes>, LHMFrameCodec>;
pub struct PipeFuture {
pipe: Pipe,
inbound_tx: Option<mpsc::UnboundedSender<PipeMessage>>,
outbound_rx: mpsc::UnboundedReceiver<PipeMessage>,
buffered_item: Option<PipeMessage>,
}
pub type PipeTx = mpsc::UnboundedSender<PipeMessage>;
pub type PipeRx = mpsc::UnboundedReceiver<PipeMessage>;
pub type PipeMessage = LHMFrame;
impl PipeFuture {
pub async fn connect() -> std::io::Result<(PipeFuture, PipeRx, PipeTx)> {
let pipe = DuplexPipeStream::<pipe_mode::Bytes>::connect_by_path(PIPE_NAME).await?;
let framed = Framed::new(pipe, LHMFrameCodec::default());
Ok(Self::new(framed))
}
pub fn new(pipe: Pipe) -> (PipeFuture, PipeRx, PipeTx) {
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
let (outbound_tx, outbound_rx) = mpsc::unbounded_channel();
let future = PipeFuture {
pipe,
inbound_tx: Some(inbound_tx),
outbound_rx,
buffered_item: None,
};
(future, inbound_rx, outbound_tx)
}
}
impl Future for PipeFuture {
type Output = Result<(), std::io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(inbound_tx) = &mut this.inbound_tx {
let msg = match this.pipe.poll_next_unpin(cx) {
Poll::Ready(Some(result)) => result?,
Poll::Ready(None) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"connection closed unexpectedly",
)));
}
Poll::Pending => break,
};
if inbound_tx.send(msg).is_err() {
this.inbound_tx.take();
break;
}
}
loop {
if this.buffered_item.is_some() {
ready!(this.pipe.poll_ready_unpin(cx))?;
let packet = this
.buffered_item
.take()
.expect("unexpected write state without a packet");
this.pipe.start_send_unpin(packet)?;
}
match this.outbound_rx.poll_recv(cx) {
Poll::Ready(Some(item)) => {
this.buffered_item = Some(item);
}
Poll::Ready(None) => {
ready!(this.pipe.poll_close_unpin(cx))?;
return Poll::Ready(Ok(()));
}
Poll::Pending => {
ready!(this.pipe.poll_flush_unpin(cx))?;
return Poll::Pending;
}
}
}
}
}