use std::future::poll_fn;
use std::io::{self, IoSlice};
use std::mem::{ManuallyDrop, MaybeUninit};
use std::os::fd::OwnedFd;
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll};
use mio::Interest;
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite, ReadBuf};
use crate::io::{
AsInnerRawHandle, IoBuf, IoBufMut, IoBufTemporaryPoll, IoVectoredBuf, IoVectoredBufMut,
IoVectoredBufTemporaryPoll,
};
use crate::op::{ReadOp, ReadvOp, WriteOp, WritevOp};
use crate::{
driver::RegistrationMode,
fd_inner::InnerRawHandle,
io::{AsyncRead, AsyncWrite},
};
fn pipe_inner() -> std::io::Result<(OwnedFd, OwnedFd)> {
let mut fds: Box<[MaybeUninit<RawFd>]> = Box::new_uninit_slice(2);
let fds = unsafe {
libc::pipe(fds.as_mut_ptr().cast::<RawFd>());
fds.assume_init()
};
Ok((unsafe { OwnedFd::from_raw_fd(fds[0]) }, unsafe {
OwnedFd::from_raw_fd(fds[1])
}))
}
pub fn pipe() -> std::io::Result<(Pipe, Pipe)> {
let (read, write) = pipe_inner()?;
Ok((
Pipe::from_std_with_mode(read, RegistrationMode::Completion)?,
Pipe::from_std_with_mode(write, RegistrationMode::Completion)?,
))
}
pub struct Pipe {
inner: OwnedFd,
handle: ManuallyDrop<InnerRawHandle>,
}
pub struct PollPipe {
stream: Pipe,
}
impl Pipe {
#[inline]
pub(crate) fn from_std_with_mode(
inner: OwnedFd,
mode: RegistrationMode,
) -> Result<Self, io::Error> {
#[cfg(unix)]
let handle = ManuallyDrop::new(InnerRawHandle::new_with_mode(
inner.as_raw_fd(),
Interest::READABLE | Interest::WRITABLE,
mode,
)?);
let flags = unsafe { libc::fcntl(inner.as_raw_fd(), libc::F_GETFL) };
if flags != -1 {
let mut new_flags = flags | libc::O_NONBLOCK;
if handle.uses_completion() {
new_flags &= !libc::O_NONBLOCK;
}
unsafe { libc::fcntl(inner.as_raw_fd(), libc::F_SETFL, new_flags) };
}
Ok(Self { inner, handle })
}
#[inline]
pub fn into_poll(self) -> Result<PollPipe, io::Error> {
let mut stream = self;
stream.handle.rebind_mode(RegistrationMode::Poll)?;
let flags = unsafe { libc::fcntl(stream.inner.as_raw_fd(), libc::F_GETFL) };
if flags != -1 {
let mut new_flags = flags | libc::O_NONBLOCK;
if stream.handle.uses_completion() {
new_flags &= !libc::O_NONBLOCK;
}
unsafe { libc::fcntl(stream.inner.as_raw_fd(), libc::F_SETFL, new_flags) };
}
Ok(PollPipe { stream })
}
}
impl PollPipe {
#[inline]
pub fn into_adaptive(self) -> Pipe {
self.stream
}
#[inline]
pub fn into_completion(self) -> Result<Pipe, io::Error> {
let mut stream = self.stream;
stream.handle.rebind_mode(RegistrationMode::Completion)?;
let flags = unsafe { libc::fcntl(stream.inner.as_raw_fd(), libc::F_GETFL) };
if flags != -1 {
let mut new_flags = flags | libc::O_NONBLOCK;
if stream.handle.uses_completion() {
new_flags &= !libc::O_NONBLOCK;
}
unsafe { libc::fcntl(stream.inner.as_raw_fd(), libc::F_SETFL, new_flags) };
}
Ok(stream)
}
}
impl AsRawFd for Pipe {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
impl AsRawFd for PollPipe {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.stream.inner.as_raw_fd()
}
}
impl IntoRawFd for Pipe {
#[inline]
fn into_raw_fd(self) -> RawFd {
let mut this = ManuallyDrop::new(self);
unsafe {
ManuallyDrop::drop(&mut this.handle);
std::ptr::read(&this.inner).into_raw_fd()
}
}
}
impl IntoRawFd for PollPipe {
#[inline]
fn into_raw_fd(self) -> RawFd {
self.stream.into_raw_fd()
}
}
impl<'a> AsInnerRawHandle<'a> for Pipe {
#[inline]
fn as_inner_raw_handle(&'a self) -> &'a InnerRawHandle {
&self.handle
}
}
impl<'a> AsInnerRawHandle<'a> for PollPipe {
#[inline]
fn as_inner_raw_handle(&'a self) -> &'a InnerRawHandle {
self.stream.as_inner_raw_handle()
}
}
impl AsyncRead for Pipe {
#[inline]
async fn read<B: IoBufMut>(&mut self, buf: B) -> (Result<usize, io::Error>, B) {
let handle = &self.handle;
let mut op = ReadOp::new(handle, buf);
let result = poll_fn(|cx| handle.poll_op(cx, &mut op)).await;
(result, op.take_bufs())
}
#[inline]
async fn read_vectored<B: IoVectoredBufMut>(
&mut self,
bufs: B,
) -> (Result<usize, io::Error>, B) {
if bufs.is_empty() {
return (Ok(0), bufs);
}
let handle = &self.handle;
let mut op = ReadvOp::new(handle, bufs);
let result = poll_fn(|cx| handle.poll_op(cx, &mut op)).await;
(result, op.take_bufs())
}
}
impl TokioAsyncRead for PollPipe {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), io::Error>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}
let this = self.get_mut();
let unfilled = unsafe { &mut *(buf.unfilled_mut() as *mut [MaybeUninit<u8>] as *mut [u8]) };
let buf_temp = unsafe { IoBufTemporaryPoll::new(unfilled.as_mut_ptr(), unfilled.len()) };
let mut op = ReadOp::new(&this.stream.handle, buf_temp);
match this.stream.handle.poll_op_poll(cx, &mut op) {
Poll::Ready(Ok(read)) => {
unsafe {
buf.assume_init(read);
}
buf.advance(read);
Poll::Ready(Ok(()))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
impl AsyncWrite for Pipe {
#[inline]
async fn write<B: IoBuf>(&mut self, buf: B) -> (Result<usize, io::Error>, B) {
let handle = &self.handle;
let mut op = WriteOp::new(handle, buf);
let result = poll_fn(|cx| handle.poll_op(cx, &mut op)).await;
(result, op.take_bufs())
}
#[inline]
async fn flush(&mut self) -> Result<(), io::Error> {
Ok(())
}
#[inline]
async fn write_vectored<B: IoVectoredBuf>(&mut self, bufs: B) -> (Result<usize, io::Error>, B) {
if bufs.is_empty() {
return (Ok(0), bufs);
}
let handle = &self.handle;
let mut op = WritevOp::new(handle, bufs);
let result = poll_fn(|cx| handle.poll_op(cx, &mut op)).await;
(result, op.take_bufs())
}
}
impl TokioAsyncWrite for PollPipe {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let this = self.get_mut();
let buf = unsafe { IoBufTemporaryPoll::new(buf.as_ptr() as *mut u8, buf.len()) };
let mut op = WriteOp::new(&this.stream.handle, buf);
this.stream.handle.poll_op_poll(cx, &mut op)
}
#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
if bufs.is_empty() {
return Poll::Ready(Ok(0));
}
let this = self.get_mut();
let bufs = unsafe { IoVectoredBufTemporaryPoll::new(bufs) };
let mut op = WritevOp::new(&this.stream.handle, bufs);
this.stream.handle.poll_op_poll(cx, &mut op)
}
#[inline]
fn is_write_vectored(&self) -> bool {
true
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
impl Drop for Pipe {
#[inline]
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.handle);
}
}
}