use super::*;
use crate::{
os::windows::{named_pipe::PmtNotNone, winprelude::*},
UnpinExt,
};
use tokio::io::AsyncWrite;
impl RawPipeStream {
fn poll_write(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
ready!(same_clsrv!(x in self.inner() => x.poll_write_ready(cx)))?;
match same_clsrv!(x in self.inner() => x.try_write(buf)) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
els => {
self.needs_flush.mark_dirty();
return Poll::Ready(els);
}
}
}
}
}
impl<Rm: PipeModeTag, Sm: PipeModeTag + PmtNotNone> PipeStream<Rm, Sm> {
#[inline]
pub async fn flush(&self) -> io::Result<()> {
self.flusher
.flush_atomic(self.as_handle(), &self.raw.needs_flush)
.await
}
#[inline]
pub fn poll_flush(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.flusher
.poll_flush_atomic(self.as_handle(), &self.raw.needs_flush, cx)
}
#[inline]
pub fn mark_dirty(&self) {
self.raw.needs_flush.mark_dirty();
}
#[inline]
pub fn assume_flushed(&self) {
self.raw.needs_flush.take();
}
#[inline]
pub fn evade_limbo(self) {
self.assume_flushed();
}
}
impl<Rm: PipeModeTag> PipeStream<Rm, pipe_mode::Messages> {
#[inline]
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
struct Write<'a>(&'a RawPipeStream, &'a [u8]);
impl Future for Write<'_> {
type Output = io::Result<usize>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let slf = self.get_mut();
slf.0.poll_write(cx, slf.1)
}
}
Write(&self.raw, buf).await
}
}
impl<Rm: PipeModeTag> AsyncWrite for &PipeStream<Rm, pipe_mode::Bytes> {
#[inline(always)]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.get_mut().raw.poll_write(cx, buf)
}
#[inline(always)]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.get_mut().poll_flush(cx)
}
#[inline(always)]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_flush(self, cx)
}
}
impl<Rm: PipeModeTag> AsyncWrite for PipeStream<Rm, pipe_mode::Bytes> {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
AsyncWrite::poll_write((&mut &*self).pin(), cx, buf)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_flush((&mut &*self).pin(), cx)
}
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_shutdown((&mut &*self).pin(), cx)
}
}