use crate::io::interest::Interest;
use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
use mio::unix::pipe as mio_pipe;
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
cfg_io_util! {
use bytes::BufMut;
}
pub fn pipe() -> io::Result<(Sender, Receiver)> {
let (tx, rx) = mio_pipe::new()?;
Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?))
}
#[derive(Clone, Debug)]
pub struct OpenOptions {
#[cfg(any(target_os = "linux", target_os = "android"))]
read_write: bool,
unchecked: bool,
}
impl OpenOptions {
pub fn new() -> OpenOptions {
OpenOptions {
#[cfg(any(target_os = "linux", target_os = "android"))]
read_write: false,
unchecked: false,
}
}
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
pub fn read_write(&mut self, value: bool) -> &mut Self {
self.read_write = value;
self
}
pub fn unchecked(&mut self, value: bool) -> &mut Self {
self.unchecked = value;
self
}
pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
Receiver::from_file_unchecked(file)
}
pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
let file = self.open(path.as_ref(), PipeEnd::Sender)?;
Sender::from_file_unchecked(file)
}
fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
let mut options = std::fs::OpenOptions::new();
options
.read(pipe_end == PipeEnd::Receiver)
.write(pipe_end == PipeEnd::Sender)
.custom_flags(libc::O_NONBLOCK);
#[cfg(any(target_os = "linux", target_os = "android"))]
if self.read_write {
options.read(true).write(true);
}
let file = options.open(path)?;
if !self.unchecked && !is_pipe(file.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}
Ok(file)
}
}
impl Default for OpenOptions {
fn default() -> OpenOptions {
OpenOptions::new()
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum PipeEnd {
Sender,
Receiver,
}
#[derive(Debug)]
pub struct Sender {
io: PollEvented<mio_pipe::Sender>,
}
impl Sender {
fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
Ok(Sender { io })
}
pub fn from_file(file: File) -> io::Result<Sender> {
Sender::from_owned_fd(file.into())
}
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> {
if !is_pipe(owned_fd.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}
let flags = get_file_flags(owned_fd.as_fd())?;
if has_write_access(flags) {
set_nonblocking(owned_fd.as_fd(), flags)?;
Sender::from_owned_fd_unchecked(owned_fd)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"not in O_WRONLY or O_RDWR access mode",
))
}
}
pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
Sender::from_owned_fd_unchecked(file.into())
}
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> {
let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) };
Sender::from_mio(mio_tx)
}
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
pub async fn writable(&self) -> io::Result<()> {
self.ready(Interest::WRITABLE).await?;
Ok(())
}
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
pub fn try_io<R>(&self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
self.io
.registration()
.try_io(Interest::WRITABLE, || self.io.try_io(f))
}
pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
let fd = self.into_nonblocking_fd()?;
set_blocking(&fd)?;
Ok(fd)
}
pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
Ok(owned_fd)
}
}
impl AsyncWrite for Sender {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.io.poll_write(cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.io.poll_write_vectored(cx, bufs)
}
fn is_write_vectored(&self) -> bool {
true
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsRawFd for Sender {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
impl AsFd for Sender {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
}
}
#[derive(Debug)]
pub struct Receiver {
io: PollEvented<mio_pipe::Receiver>,
}
impl Receiver {
fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
Ok(Receiver { io })
}
pub fn from_file(file: File) -> io::Result<Receiver> {
Receiver::from_owned_fd(file.into())
}
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> {
if !is_pipe(owned_fd.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}
let flags = get_file_flags(owned_fd.as_fd())?;
if has_read_access(flags) {
set_nonblocking(owned_fd.as_fd(), flags)?;
Receiver::from_owned_fd_unchecked(owned_fd)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"not in O_RDONLY or O_RDWR access mode",
))
}
}
pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
Receiver::from_owned_fd_unchecked(file.into())
}
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> {
let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) };
Receiver::from_mio(mio_rx)
}
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
pub async fn readable(&self) -> io::Result<()> {
self.ready(Interest::READABLE).await?;
Ok(())
}
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}
pub fn try_io<R>(&self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
self.io
.registration()
.try_io(Interest::READABLE, || self.io.try_io(f))
}
cfg_io_util! {
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
use std::io::Read;
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
let n = (&*self.io).read(dst)?;
unsafe {
buf.advance_mut(n);
}
Ok(n)
})
}
}
pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
let fd = self.into_nonblocking_fd()?;
set_blocking(&fd)?;
Ok(fd)
}
pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
Ok(owned_fd)
}
}
impl AsyncRead for Receiver {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
unsafe { self.io.poll_read(cx, buf) }
}
}
impl AsRawFd for Receiver {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
impl AsFd for Receiver {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
}
}
fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> {
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) };
if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO)
}
}
fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> {
let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
if flags < 0 {
Err(io::Error::last_os_error())
} else {
Ok(flags)
}
}
fn has_read_access(flags: libc::c_int) -> bool {
let mode = flags & libc::O_ACCMODE;
mode == libc::O_RDONLY || mode == libc::O_RDWR
}
fn has_write_access(flags: libc::c_int) -> bool {
let mode = flags & libc::O_ACCMODE;
mode == libc::O_WRONLY || mode == libc::O_RDWR
}
fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> {
let flags = current_flags | libc::O_NONBLOCK;
if flags != current_flags {
let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) };
if ret < 0 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> {
let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
if previous == -1 {
return Err(io::Error::last_os_error());
}
let new = previous & !libc::O_NONBLOCK;
let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) };
if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}