interprocess 2.4.0

Interprocess communication toolkit
Documentation
use {
    super::UnnamedPipeExt,
    crate::{
        os::unix::{unixprelude::*, FdOps},
        unnamed_pipe::{
            tokio::{Recver as PubRecver, Sender as PubSender},
            Recver as SyncRecver, Sender as SyncSender,
        },
    },
    std::{
        io,
        pin::Pin,
        task::{ready, Context, Poll},
    },
    tokio::io::{unix::AsyncFd, AsyncRead, AsyncWrite, Interest, ReadBuf, Ready},
};

type RecverImpl = AsyncFd<FdOps>;
type SenderImpl = AsyncFd<FdOps>;

pub(crate) fn pipe_impl() -> io::Result<(PubSender, PubRecver)> {
    let (tx, rx) = super::pipe(true)?;
    Ok((PubSender(Sender::try_from_nb(tx)?), PubRecver(Recver::try_from_nb(rx)?)))
}

#[derive(Debug)]
pub(crate) struct Recver(RecverImpl);
impl Recver {
    fn try_from_nb(rx: SyncRecver) -> io::Result<Self> {
        Ok(Self(RecverImpl::with_interest(FdOps(rx.into()), Interest::READABLE)?))
    }
}

impl AsyncRead for Recver {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        let slf = self.get_mut();
        loop {
            let fd = slf.0.get_ref().as_raw_fd();
            let mut readiness = ready!(slf.0.poll_read_ready_mut(cx))?;
            unsafe {
                // SAFETY(unfilled_mut): we are not de-initializing anything
                // SAFETY(borrow_raw): we're getting it from an OwnedFd that we don't drop
                match FdOps::read(BorrowedFd::borrow_raw(fd), buf.unfilled_mut()) {
                    Ok(bytes_read) => {
                        buf.assume_init(bytes_read);
                        buf.advance(bytes_read);
                        break Poll::Ready(Ok(()));
                    }
                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                        readiness.clear_ready_matching(Ready::READABLE);
                    }
                    Err(e) => break Poll::Ready(Err(e)),
                }
            }
        }
    }
}

impl TryFrom<SyncRecver> for Recver {
    type Error = io::Error;
    fn try_from(rx: SyncRecver) -> io::Result<Self> {
        rx.set_nonblocking(true)?;
        Self::try_from_nb(rx)
    }
}
impl TryFrom<Recver> for OwnedFd {
    type Error = io::Error;
    fn try_from(rx: Recver) -> io::Result<Self> { Ok(rx.0.into_inner().into()) }
}
impl TryFrom<OwnedFd> for Recver {
    type Error = io::Error;
    fn try_from(rx: OwnedFd) -> io::Result<Self> { SyncRecver::from(rx).try_into() }
}
forward_as_handle!(Recver);

#[derive(Debug)]
pub(crate) struct Sender(SenderImpl);
impl Sender {
    fn try_from_nb(tx: SyncSender) -> io::Result<Self> {
        Ok(Self(SenderImpl::with_interest(FdOps(tx.into()), Interest::WRITABLE)?))
    }
}

impl AsyncWrite for Sender {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        let slf = self.get_mut();
        loop {
            let fd = slf.0.get_ref().as_raw_fd();
            let mut readiness = ready!(slf.0.poll_write_ready_mut(cx))?;
            unsafe {
                // SAFETY(borrow_raw): we're getting it from an OwnedFd that we don't drop
                match FdOps::write(BorrowedFd::borrow_raw(fd), buf) {
                    Ok(bytes_read) => break Poll::Ready(Ok(bytes_read)),
                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                        readiness.clear_ready_matching(Ready::WRITABLE);
                    }
                    Err(e) => break Poll::Ready(Err(e)),
                }
            }
        }
    }
    #[inline]
    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
    #[inline]
    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

impl TryFrom<SyncSender> for Sender {
    type Error = io::Error;
    fn try_from(tx: SyncSender) -> io::Result<Self> {
        tx.set_nonblocking(true)?;
        Self::try_from_nb(tx)
    }
}
impl TryFrom<Sender> for OwnedFd {
    type Error = io::Error;
    fn try_from(rx: Sender) -> io::Result<Self> { Ok(rx.0.into_inner().into()) }
}
impl TryFrom<OwnedFd> for Sender {
    type Error = io::Error;
    fn try_from(tx: OwnedFd) -> io::Result<Self> { SyncSender::from(tx).try_into() }
}
forward_as_handle!(Sender);