use crate::error::SysError;
use crate::shim::{self, SelectFd};
use rustix::io::{Errno, retry_on_intr};
use rustix::pipe;
use std::io::{Error, Read};
use std::os::fd::{AsFd, OwnedFd};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(PartialEq)]
enum ReaderMode {
Timeout(Duration),
NoTimeout,
Closed,
}
pub struct InterruptibleReader<Fd: AsFd> {
mode: Mutex<ReaderMode>,
fd: Fd,
pipe_rd: OwnedFd,
pipe_wr: OwnedFd,
}
impl<Fd: AsFd> InterruptibleReader<Fd> {
pub fn open(fd: Fd) -> Result<Self, SysError> {
let (pipe_rd, pipe_wr) = match retry_on_intr(|| pipe::pipe()) {
Ok(fds) => fds,
Err(err) => return Err(SysError("pipe()", err)),
};
shim::fcntl_nonblock(&fd, true).map_err(|err| SysError("fcntl(fd)", err))?;
shim::fcntl_nonblock(&pipe_rd, true).map_err(|err| SysError("fcntl(pipe)", err))?;
shim::fcntl_nonblock(&pipe_wr, true).map_err(|err| SysError("fcntl(pipe)", err))?;
Ok(InterruptibleReader {
mode: Mutex::new(ReaderMode::NoTimeout),
fd,
pipe_rd,
pipe_wr,
})
}
pub fn close(&self) -> Result<(), SysError> {
{
let mut locked_mode = self.mode.lock().unwrap();
if *locked_mode == ReaderMode::Closed {
return Ok(());
}
*locked_mode = ReaderMode::Closed;
}
if let Err(err) = shim::write(&self.pipe_wr, &[0u8]) {
if err != Errno::AGAIN {
return Err(SysError("write(pipe)", err));
}
}
Ok(())
}
pub fn set_timeout(&self, duration: Duration) -> Result<(), SysError> {
{
let mut locked_mode = self.mode.lock().unwrap();
if *locked_mode == ReaderMode::Closed {
return Ok(());
}
*locked_mode = ReaderMode::Timeout(duration);
}
if let Err(err) = shim::write(&self.pipe_wr, &[0u8]) {
if err != Errno::AGAIN {
return Err(SysError("write(pipe)", err));
}
}
Ok(())
}
pub fn blocking_reader(self: &Arc<Self>) -> ArcTimeoutReader<Fd> {
ArcTimeoutReader(Arc::clone(self))
}
fn read_imp(&self, buf: &mut [u8]) -> Result<usize, Error> {
loop {
let timeout = {
let locked_mode = self.mode.lock().unwrap();
match *locked_mode {
ReaderMode::Timeout(d) => Some(d),
ReaderMode::NoTimeout => None,
ReaderMode::Closed => {
return Ok(0);
}
}
};
let mut pipe_fd = SelectFd {
fd: self.pipe_rd.as_fd(),
mask: SelectFd::READABLE,
};
let mut data_fd = SelectFd {
fd: self.fd.as_fd(),
mask: SelectFd::READABLE,
};
shim::select(&mut [&mut pipe_fd, &mut data_fd], timeout)?;
if pipe_fd.mask != 0 {
_ = shim::read(&self.pipe_rd, &mut [0u8; 128]);
}
if data_fd.mask != 0 {
break;
}
if pipe_fd.mask == 0 && data_fd.mask == 0 && timeout.is_some() {
return Ok(0);
}
}
match shim::read(&self.fd, buf) {
Ok(n) => Ok(n),
Err(err) => Err(Error::new(err.kind(), err)),
}
}
}
pub struct ArcTimeoutReader<Fd: AsFd>(Arc<InterruptibleReader<Fd>>);
impl<Fd: AsFd> Read for ArcTimeoutReader<Fd> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
self.0.read_imp(buf)
}
}