pub use super::shared::*;
use anyhow::bail;
use nix::libc::O_NONBLOCK;
use nix::sys::stat;
use nix::unistd::{self, unlink};
use std::fs::{File, OpenOptions};
use std::io::Read;
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
pub struct BenchGuard {
ctl_fifo: FifoIpc,
ack_fifo: FifoIpc,
}
impl BenchGuard {
pub fn new(ctl_fifo: &str, ack_fifo: &str) -> anyhow::Result<Self> {
let mut instance = Self {
ctl_fifo: FifoIpc::connect(ctl_fifo)?.with_writer()?,
ack_fifo: FifoIpc::connect(ack_fifo)?.with_reader()?,
};
instance.send_cmd(Command::SetIntegration {
name: "codspeed-rust".into(),
version: env!("CARGO_PKG_VERSION").into(),
})?; instance.send_cmd(Command::StartBenchmark)?;
Ok(instance)
}
pub fn new_with_runner_fifo() -> anyhow::Result<Self> {
Self::new(RUNNER_CTL_FIFO, RUNNER_ACK_FIFO)
}
fn send_cmd(&mut self, cmd: Command) -> anyhow::Result<()> {
self.ctl_fifo.send_cmd(cmd)?;
self.ack_fifo.wait_for_ack();
Ok(())
}
}
impl Drop for BenchGuard {
fn drop(&mut self) {
self.send_cmd(Command::StopBenchmark)
.expect("Failed to send stop command");
}
}
pub fn send_cmd(cmd: Command) -> anyhow::Result<()> {
let mut writer = FifoIpc::connect(RUNNER_CTL_FIFO)?.with_writer()?;
writer.send_cmd(cmd).unwrap();
let mut reader = FifoIpc::connect(RUNNER_ACK_FIFO)?.with_reader()?;
reader.wait_for_ack();
Ok(())
}
pub struct FifoIpc {
path: PathBuf,
reader: Option<File>,
writer: Option<File>,
}
impl FifoIpc {
pub fn create<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
let _ = unlink(path.as_ref());
unistd::mkfifo(path.as_ref(), stat::Mode::S_IRWXU)?;
Self::connect(path.as_ref())
}
pub fn connect<P: Into<PathBuf>>(path: P) -> anyhow::Result<Self> {
let path = path.into();
if !path.exists() {
bail!("FIFO does not exist: {}", path.display());
}
Ok(Self {
path,
reader: None,
writer: None,
})
}
pub fn with_reader(mut self) -> anyhow::Result<Self> {
self.reader = Some(
OpenOptions::new()
.write(true)
.read(true)
.custom_flags(O_NONBLOCK)
.open(&self.path)?,
);
Ok(self)
}
pub fn with_writer(mut self) -> anyhow::Result<Self> {
self.writer = Some(
OpenOptions::new()
.write(true)
.custom_flags(O_NONBLOCK)
.open(&self.path)?,
);
Ok(self)
}
pub fn recv_cmd(&mut self) -> anyhow::Result<Command> {
let mut len_buffer = [0u8; 4];
self.read_exact(&mut len_buffer)?;
let message_len = u32::from_le_bytes(len_buffer) as usize;
let mut buffer = vec![0u8; message_len];
loop {
if self.read_exact(&mut buffer).is_ok() {
break;
}
}
let decoded = bincode::deserialize(&buffer)?;
Ok(decoded)
}
pub fn send_cmd(&mut self, cmd: Command) -> anyhow::Result<()> {
use std::io::Write;
let encoded = bincode::serialize(&cmd)?;
self.write_all(&(encoded.len() as u32).to_le_bytes())?;
self.write_all(&encoded)?;
Ok(())
}
pub fn wait_for_ack(&mut self) {
loop {
if let Ok(Command::Ack) = self.recv_cmd() {
break;
}
}
}
}
impl std::io::Write for FifoIpc {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if let Some(writer) = self.writer.as_mut() {
writer.write(buf)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"Writer not initialized",
))
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl std::io::Read for FifoIpc {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if let Some(reader) = self.reader.as_mut() {
reader.read(buf)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"Reader not initialized",
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
#[test]
fn test_ipc_write_read() {
let mut fifo = FifoIpc::create("/tmp/test1.fifo")
.unwrap()
.with_reader()
.unwrap()
.with_writer()
.unwrap();
fifo.write_all(b"Hello").unwrap();
let mut buffer = [0; 5];
fifo.read_exact(&mut buffer).unwrap();
assert_eq!(&buffer, b"Hello");
}
#[test]
fn test_ipc_send_recv_cmd() {
let mut fifo = FifoIpc::create("/tmp/test2.fifo")
.unwrap()
.with_reader()
.unwrap()
.with_writer()
.unwrap();
fifo.send_cmd(Command::StartBenchmark).unwrap();
let cmd = fifo.recv_cmd().unwrap();
assert_eq!(cmd, Command::StartBenchmark);
}
}