use std::{
ffi::CString,
fs::{self, File, OpenOptions},
io::{ErrorKind, Read},
os::{
fd::AsRawFd,
unix::{ffi::OsStrExt, fs::OpenOptionsExt},
},
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
use mio::{Interest, Registry, Token, unix::SourceFd};
use crate::error::OhNo;
use super::tokens::Tokens;
#[derive(Debug)]
pub enum FifoResult<'a> {
Unready,
Ready(ReadyFifo<'a>),
}
impl<'a> FifoResult<'a> {
pub fn ready(self) -> Option<ReadyFifo<'a>> {
match self {
FifoResult::Unready => None,
FifoResult::Ready(ready) => Some(ready),
}
}
}
#[derive(Debug)]
pub struct ReadyFifo<'a> {
start_index: usize,
end_index: usize,
buf: &'a mut String,
}
impl ReadyFifo<'_> {
pub fn as_str(&self) -> &str {
&self.buf[self.start_index..self.end_index]
}
}
#[derive(Debug)]
pub struct Fifo {
registry: Arc<Registry>,
tokens: Arc<Mutex<Tokens>>,
path: PathBuf,
file: File,
tkn: Token,
sentinel: String,
buf: String,
}
impl Fifo {
pub fn create(
registry: &Arc<Registry>,
tokens: &Arc<Mutex<Tokens>>,
path: impl Into<PathBuf>,
) -> Result<Self, OhNo> {
let path = path.into();
Self::create_fifo(&path)?;
let file = Self::open_nonblocking(&path)?;
let tkn = Self::register(registry, tokens, &file)?;
let sentinel = uuid::Uuid::new_v4().to_string();
Ok(Self {
registry: registry.clone(),
tokens: tokens.clone(),
path,
file,
tkn,
sentinel,
buf: String::default(),
})
}
fn create_fifo(path: &Path) -> Result<(), OhNo> {
if let Ok(true) = path.try_exists() {
log::debug!("FIFO already exists for path {}", path.display());
return Ok(());
}
let path_bytes = path.as_os_str().as_bytes();
let c_path = CString::new(path_bytes).map_err(|err| OhNo::CannotCreateFifo {
err: err.to_string(),
})?;
let c_err = unsafe { libc::mkfifo(c_path.as_ptr(), 0o644) };
if c_err != 0 {
return Err(OhNo::CannotCreateFifo {
err: format!("cannot create FIFO at path {path}", path = path.display()),
});
}
Ok(())
}
fn open_nonblocking(path: &Path) -> Result<File, OhNo> {
OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.map_err(|err| OhNo::CannotOpenFifo { err })
}
fn register(
registry: &Arc<Registry>,
tokens: &Arc<Mutex<Tokens>>,
file: &File,
) -> Result<Token, OhNo> {
let tkn = tokens.lock().expect("tokens").create();
registry
.register(&mut SourceFd(&file.as_raw_fd()), tkn, Interest::READABLE)
.map_err(|err| OhNo::PollError { err })?;
Ok(tkn)
}
fn unregister(&self) {
if let Err(err) = self
.registry
.deregister(&mut SourceFd(&self.file.as_raw_fd()))
{
log::error!(
"cannot unregister FIFO {path} from poll registry: {err}",
path = self.path.display()
);
}
self.tokens.lock().expect("tokens").recycle(self.tkn);
}
pub fn token(&self) -> Token {
self.tkn
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn sentinel(&self) -> &str {
&self.sentinel
}
pub fn read(&mut self) -> Result<FifoResult<'_>, OhNo> {
loop {
match self.file.read_to_string(&mut self.buf) {
Ok(0) => break, Ok(_) => continue,
Err(err) => match err.kind() {
ErrorKind::WouldBlock => break,
_ => {
self.buf.clear();
return Err(OhNo::CannotReadFifo { err });
}
},
}
}
let Some(index) = self.buf.find(&self.sentinel) else {
return Ok(FifoResult::Unready);
};
log::trace!(
"found sentinel {sentinel} in buffer {path}",
sentinel = self.sentinel,
path = self.path.display()
);
let end_index = index + self.sentinel.len();
if end_index != self.buf.len() {
log::warn!("receiving buffer updates too fast; dropping current data to prioritize next");
self.buf.drain(..end_index);
return Ok(FifoResult::Unready);
}
Ok(FifoResult::Ready(ReadyFifo {
start_index: 0,
end_index: index,
buf: &mut self.buf,
}))
}
pub fn clear(&mut self) {
self.buf.clear();
}
}
impl Drop for Fifo {
fn drop(&mut self) {
self.unregister();
if let Err(err) = fs::remove_file(&self.path) {
log::warn!("cannot remove FIFO at path {}: {err}", self.path.display());
}
}
}