use {
super::*,
crate::{os::windows::named_pipe::PmtNotNone, UnpinExt},
tokio::io::AsyncWrite,
};
impl RawPipeStream {
fn poll_write(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
ready!(same_clsrv!(x in self.inner => x.poll_write_ready(cx)))?;
match same_clsrv!(x in self.inner => x.try_write(buf)) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
els => {
self.needs_flush.mark_dirty();
return Poll::Ready(els);
}
}
}
}
}
impl<Rm: PipeModeTag, Sm: PipeModeTag + PmtNotNone> PipeStream<Rm, Sm> {
#[inline]
pub async fn flush(&self) -> io::Result<()> {
self.flusher.flush_atomic(Sm::cast_oai(&self.raw), &self.raw.get().needs_flush).await
}
#[inline]
pub fn poll_flush(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.flusher.poll_flush_atomic(Sm::cast_oai(&self.raw), &self.raw.get().needs_flush, cx)
}
#[inline]
pub fn mark_dirty(&self) { self.raw.get().needs_flush.mark_dirty(); }
#[inline]
pub fn assume_flushed(&self) { self.raw.get().needs_flush.take(); }
#[inline]
pub fn evade_limbo(self) { self.assume_flushed(); }
}
impl<Rm: PipeModeTag> PipeStream<Rm, pipe_mode::Messages> {
#[inline]
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
struct Write<'a>(&'a RawPipeStream, &'a [u8]);
impl Future for Write<'_> {
type Output = io::Result<usize>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let slf = self.get_mut();
slf.0.poll_write(cx, slf.1)
}
}
Write(self.raw.get(), buf).await
}
}
impl<Rm: PipeModeTag> AsyncWrite for &PipeStream<Rm, pipe_mode::Bytes> {
#[inline(always)]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.get_mut().raw.get().poll_write(cx, buf)
}
#[inline(always)]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.get_mut().poll_flush(cx)
}
#[inline(always)]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_flush(self, cx)
}
}
impl<Rm: PipeModeTag> AsyncWrite for PipeStream<Rm, pipe_mode::Bytes> {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
AsyncWrite::poll_write((&mut &*self).pin(), cx, buf)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_flush((&mut &*self).pin(), cx)
}
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_shutdown((&mut &*self).pin(), cx)
}
}