use widestring::U16CString;
use super::*;
use crate::os::windows::{named_pipe::WaitTimeout, path_conversion::*, NeedsFlushVal};
use std::{borrow::Cow, mem::take};
impl RawPipeStream {
pub(super) fn new(inner: InnerTokio, nfv: NeedsFlushVal) -> Self {
Self {
inner: Some(inner),
needs_flush: NeedsFlush::from(nfv),
}
}
pub(crate) fn new_server(server: TokioNPServer) -> Self {
Self::new(InnerTokio::Server(server), NeedsFlushVal::No)
}
fn new_client(client: TokioNPClient) -> Self {
Self::new(InnerTokio::Client(client), NeedsFlushVal::No)
}
async fn wait_for_server(path: U16CString) -> io::Result<U16CString> {
tokio::task::spawn_blocking(move || {
c_wrappers::block_for_server(&path, WaitTimeout::DEFAULT)?;
Ok(path)
})
.await
.expect("waiting for server panicked")
}
async fn connect(
mut path: U16CString,
recv: Option<PipeMode>,
send: Option<PipeMode>,
) -> io::Result<Self> {
let client = loop {
match c_wrappers::connect_without_waiting(&path, recv, send, true) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
let path_take = Self::wait_for_server(take(&mut path)).await?;
path = path_take;
}
not_waiting => break not_waiting?,
}
};
let client = unsafe { TokioNPClient::from_raw_handle(client.into_raw_handle())? };
Ok(Self::new_client(client))
}
}
impl<Rm: PipeModeTag, Sm: PipeModeTag> PipeStream<Rm, Sm> {
#[inline]
pub async fn connect_by_path<'s>(path: impl ToWtf16<'s>) -> io::Result<Self> {
RawPipeStream::connect(
path.to_wtf_16().map(Cow::into_owned).map_err(to_io_error)?,
Rm::MODE,
Sm::MODE,
)
.await
.map(Self::new)
}
pub(crate) fn new(raw: RawPipeStream) -> Self {
Self {
raw: MaybeArc::Inline(raw),
flusher: Sm::TokioFlusher::default(),
_phantom: PhantomData,
}
}
}