use std::{future::Future, io, os::fd::AsFd, path::Path};
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_driver::{
BufferRef, impl_raw_fd,
op::{OFlags, Pipe},
};
use compio_io::{AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite};
use futures_util::Stream;
use rustix::fs::{fcntl_getfl, fcntl_setfl};
use crate::File;
#[cfg(linux_all)]
mod splice;
#[cfg(linux_all)]
pub use splice::*;
pub async fn anonymous() -> io::Result<(Receiver, Sender)> {
let op = Pipe::new();
let BufResult(res, op) = compio_runtime::submit(op).await;
res?;
let (receiver, sender) = op.into_inner();
let receiver = Receiver {
file: File::from_std(std::fs::File::from(receiver))?,
};
let sender = Sender {
file: File::from_std(std::fs::File::from(sender))?,
};
Ok((receiver, sender))
}
#[derive(Clone, Debug)]
pub struct OpenOptions {
#[cfg(target_os = "linux")]
read_write: bool,
unchecked: bool,
}
impl OpenOptions {
pub fn new() -> OpenOptions {
OpenOptions {
#[cfg(target_os = "linux")]
read_write: false,
unchecked: false,
}
}
#[cfg(target_os = "linux")]
#[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
pub fn read_write(&mut self, value: bool) -> &mut Self {
self.read_write = value;
self
}
pub fn unchecked(&mut self, value: bool) -> &mut Self {
self.unchecked = value;
self
}
pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
Receiver::from_file(file)
}
pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
Sender::from_file(file)
}
async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
let mut options = crate::OpenOptions::new();
options
.read(pipe_end == PipeEnd::Receiver)
.write(pipe_end == PipeEnd::Sender);
#[cfg(target_os = "linux")]
if self.read_write {
options.read(true).write(true);
}
let file = options.open(path).await?;
if !self.unchecked && !is_fifo(&file).await? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}
Ok(file)
}
}
impl Default for OpenOptions {
fn default() -> OpenOptions {
OpenOptions::new()
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum PipeEnd {
Sender,
Receiver,
}
#[derive(Debug, Clone)]
pub struct Sender {
file: File,
}
impl Sender {
pub(crate) fn from_file(file: File) -> io::Result<Sender> {
set_nonblocking(&file)?;
Ok(Sender { file })
}
pub fn close(self) -> impl Future<Output = io::Result<()>> {
self.file.close()
}
}
impl AsyncWrite for Sender {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
(&*self).write(buf).await
}
#[inline]
async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
(&*self).write_vectored(buf).await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
(&*self).flush().await
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
(&*self).shutdown().await
}
}
impl AsyncWrite for &Sender {
async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
(&self.file.inner).write(buffer).await
}
async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
(&self.file.inner).write_vectored(buffer).await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
Ok(())
}
}
impl_raw_fd!(Sender, std::fs::File, file, file);
#[derive(Debug, Clone)]
pub struct Receiver {
file: File,
}
impl Receiver {
pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
set_nonblocking(&file)?;
Ok(Receiver { file })
}
pub fn close(self) -> impl Future<Output = io::Result<()>> {
self.file.close()
}
}
impl AsyncRead for Receiver {
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
(&*self).read(buf).await
}
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
(&*self).read_vectored(buf).await
}
}
impl AsyncRead for &Receiver {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
(&self.file.inner).read(buffer).await
}
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
(&self.file.inner).read_vectored(buffer).await
}
}
impl AsyncReadManaged for Receiver {
type Buffer = BufferRef;
async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
(&*self).read_managed(len).await
}
}
impl AsyncReadManaged for &Receiver {
type Buffer = BufferRef;
async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
(&self.file.inner).read_managed(len).await
}
}
impl AsyncReadMulti for Receiver {
fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
self.file.inner.read_multi(len)
}
}
impl AsyncReadMulti for &Receiver {
fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
self.file.inner.read_multi_shared(len)
}
}
impl_raw_fd!(Receiver, std::fs::File, file, file);
async fn is_fifo(file: &File) -> io::Result<bool> {
use std::os::unix::prelude::FileTypeExt;
Ok(file.metadata().await?.file_type().is_fifo())
}
fn set_nonblocking(file: &impl AsFd) -> io::Result<()> {
if compio_runtime::Runtime::with_current(|r| r.driver_type()).is_polling() {
let current_flags = fcntl_getfl(file)?;
let flags = current_flags | OFlags::NONBLOCK;
if flags != current_flags {
fcntl_setfl(file, flags)?;
}
}
Ok(())
}