use std::{
future::Future,
io,
os::fd::{FromRawFd, IntoRawFd},
path::Path,
};
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_driver::{
AsRawFd, ToSharedFd, impl_raw_fd,
op::{BufResultExt, Recv, RecvVectored, Send, SendVectored},
syscall,
};
use compio_io::{AsyncRead, AsyncWrite};
use crate::File;
pub fn anonymous() -> io::Result<(Receiver, Sender)> {
let (receiver, sender) = os_pipe::pipe()?;
let receiver = Receiver::from_file(File::from_std(unsafe {
std::fs::File::from_raw_fd(receiver.into_raw_fd())
})?)?;
let sender = Sender::from_file(File::from_std(unsafe {
std::fs::File::from_raw_fd(sender.into_raw_fd())
})?)?;
Ok((receiver, sender))
}
#[derive(Clone, Debug)]
pub struct OpenOptions {
#[cfg(target_os = "linux")]
read_write: bool,
unchecked: bool,
}
impl OpenOptions {
pub fn new() -> OpenOptions {
OpenOptions {
#[cfg(target_os = "linux")]
read_write: false,
unchecked: false,
}
}
#[cfg(target_os = "linux")]
#[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
pub fn read_write(&mut self, value: bool) -> &mut Self {
self.read_write = value;
self
}
pub fn unchecked(&mut self, value: bool) -> &mut Self {
self.unchecked = value;
self
}
pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
Receiver::from_file(file)
}
pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
Sender::from_file(file)
}
async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
let mut options = crate::OpenOptions::new();
options
.read(pipe_end == PipeEnd::Receiver)
.write(pipe_end == PipeEnd::Sender);
#[cfg(target_os = "linux")]
if self.read_write {
options.read(true).write(true);
}
let file = options.open(path).await?;
if !self.unchecked && !is_fifo(&file).await? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}
Ok(file)
}
}
impl Default for OpenOptions {
fn default() -> OpenOptions {
OpenOptions::new()
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum PipeEnd {
Sender,
Receiver,
}
#[derive(Debug, Clone)]
pub struct Sender {
file: File,
}
impl Sender {
pub(crate) fn from_file(file: File) -> io::Result<Sender> {
set_nonblocking(&file)?;
Ok(Sender { file })
}
pub fn close(self) -> impl Future<Output = io::Result<()>> {
self.file.close()
}
}
impl AsyncWrite for Sender {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
(&*self).write(buf).await
}
#[inline]
async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
(&*self).write_vectored(buf).await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
(&*self).flush().await
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
(&*self).shutdown().await
}
}
impl AsyncWrite for &Sender {
async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
let fd = self.to_shared_fd();
let op = Send::new(fd, buffer);
compio_runtime::submit(op).await.into_inner()
}
async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
let fd = self.to_shared_fd();
let op = SendVectored::new(fd, buffer);
compio_runtime::submit(op).await.into_inner()
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
Ok(())
}
}
impl_raw_fd!(Sender, std::fs::File, file, file);
#[derive(Debug, Clone)]
pub struct Receiver {
file: File,
}
impl Receiver {
pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
set_nonblocking(&file)?;
Ok(Receiver { file })
}
pub fn close(self) -> impl Future<Output = io::Result<()>> {
self.file.close()
}
}
impl AsyncRead for Receiver {
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
(&*self).read(buf).await
}
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
(&*self).read_vectored(buf).await
}
}
impl AsyncRead for &Receiver {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
let fd = self.to_shared_fd();
let op = Recv::new(fd, buffer);
compio_runtime::submit(op).await.into_inner().map_advanced()
}
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
let fd = self.to_shared_fd();
let op = RecvVectored::new(fd, buffer);
compio_runtime::submit(op).await.into_inner().map_advanced()
}
}
impl_raw_fd!(Receiver, std::fs::File, file, file);
async fn is_fifo(file: &File) -> io::Result<bool> {
use std::os::unix::prelude::FileTypeExt;
Ok(file.metadata().await?.file_type().is_fifo())
}
fn set_nonblocking(file: &impl AsRawFd) -> io::Result<()> {
if cfg!(not(all(target_os = "linux", feature = "io-uring")))
|| compio_driver::DriverType::is_polling()
{
let fd = file.as_raw_fd();
let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
let flags = current_flags | libc::O_NONBLOCK;
if flags != current_flags {
syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
}
}
Ok(())
}