use std::error::Error;
use std::fmt::Debug;
use std::fs::OpenOptions;
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::path::Path;
use fs4::fs_std::FileExt;
use fs4::lock_contended_error;
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
#[derive(Debug)]
struct FileMessageWriter<T> {
file: std::fs::File,
buf: Vec<u8>,
_phantom: PhantomData<T>,
}
impl<T: Serialize> FileMessageWriter<T> {
pub fn new(file: std::fs::File) -> Self {
Self {
file,
buf: Vec::new(),
_phantom: PhantomData,
}
}
pub fn send(&mut self, msg: T) -> Result<(), Box<dyn Error + Send + Sync>> {
self.buf.clear();
serde_json::to_writer(&mut self.buf, &msg)?;
let len = u32::try_from(self.buf.len())?;
self.file.write_all(&len.to_be_bytes())?;
self.file.write_all(&self.buf)?;
Ok(())
}
}
#[derive(Debug)]
struct FileMessageReader<T> {
file: std::fs::File,
buf: Vec<u8>,
_phantom: PhantomData<T>,
}
impl<T: DeserializeOwned> FileMessageReader<T> {
pub fn new(file: std::fs::File) -> Self {
Self {
file,
buf: Vec::new(),
_phantom: PhantomData,
}
}
pub fn recv(&mut self) -> Result<T, Box<dyn Error + Send + Sync>> {
let mut len_bytes = [0; 4];
self.file.read_exact(&mut len_bytes)?;
let msg_len = u32::from_be_bytes(len_bytes);
let msg_len = usize::try_from(msg_len).unwrap();
self.buf.resize(msg_len, 0);
self.file.read_exact(&mut self.buf)?;
let msg_res = serde_json::from_slice(&self.buf);
Ok(msg_res?)
}
}
#[derive(Debug)]
pub struct Receiver<T> {
reader: FileMessageReader<T>,
current_lock: std::fs::File,
next_lock: std::fs::File,
}
impl<T: DeserializeOwned> Receiver<T> {
pub fn open(path: &Path, create: bool) -> std::io::Result<Self> {
let mut options = OpenOptions::new();
options.create(create).write(create).read(true);
let msgs_file = options.open(path)?;
let current_lock = options.open(path.with_extension("lock1"))?;
let next_lock = options.open(path.with_extension("lock2"))?;
let reader = FileMessageReader::new(msgs_file);
Ok(Self {
reader,
current_lock,
next_lock,
})
}
fn poll_until_other_side_exists(&self) -> std::io::Result<()> {
loop {
match self.current_lock.try_lock_exclusive() {
Ok(()) => {
FileExt::unlock(&self.current_lock).unwrap();
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) if e.raw_os_error() == lock_contended_error().raw_os_error() => {
break;
}
Err(e) => return Err(e),
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
Ok(())
}
pub fn recv_blocking(&mut self) -> Result<T, Box<dyn Error + Send + Sync>> {
self.current_lock.lock_exclusive()?; let msg = self.reader.recv()?;
FileExt::unlock(&self.current_lock)?;
std::mem::swap(&mut self.current_lock, &mut self.next_lock);
Ok(msg)
}
}
#[derive(Debug)]
pub struct Sender<T> {
writer: FileMessageWriter<T>,
current_lock: std::fs::File,
next_lock: std::fs::File,
}
impl<T: Serialize> Sender<T> {
pub fn open(path: &Path, create: bool) -> std::io::Result<Self> {
let mut options = OpenOptions::new();
options.create(create).write(true);
let msgs_file = options.open(path)?;
let current_lock = options.open(path.with_extension("lock1"))?;
let next_lock = options.open(path.with_extension("lock2"))?;
current_lock.lock_exclusive()?;
let writer = FileMessageWriter::new(msgs_file);
Ok(Self {
writer,
current_lock,
next_lock,
})
}
pub fn send(&mut self, msg: T) -> Result<(), Box<dyn Error + Send + Sync>> {
self.writer.send(msg)?;
self.next_lock.lock_exclusive()?; std::mem::swap(&mut self.current_lock, &mut self.next_lock);
FileExt::unlock(&self.next_lock)?; Ok(())
}
}
pub struct BidiChannelCreator<ParentToChildMsg, ChildToParentMsg> {
receiver: Receiver<ChildToParentMsg>,
sender: Sender<ParentToChildMsg>,
}
impl<ParentToChildMsg, ChildToParentMsg> BidiChannelCreator<ParentToChildMsg, ChildToParentMsg>
where
ParentToChildMsg: Serialize + DeserializeOwned,
ChildToParentMsg: Serialize + DeserializeOwned,
{
pub fn create_in_parent(ipc_dir: &Path) -> std::io::Result<Self> {
let msgs_to_child_path = ipc_dir.join("msgs_to_child.txt");
let sender = Sender::open(&msgs_to_child_path, true)?;
let msgs_to_parent_path = ipc_dir.join("msgs_to_parent.txt");
let receiver = Receiver::open(&msgs_to_parent_path, true)?;
Ok(Self { receiver, sender })
}
pub fn open_in_child(
ipc_dir: &Path,
) -> std::io::Result<(Receiver<ParentToChildMsg>, Sender<ChildToParentMsg>)> {
let msgs_to_child_path = ipc_dir.join("msgs_to_child.txt");
let receiver = Receiver::open(&msgs_to_child_path, false)?;
let msgs_to_parent_path = ipc_dir.join("msgs_to_parent.txt");
let sender = Sender::open(&msgs_to_parent_path, false)?;
Ok((receiver, sender))
}
pub fn wait_for_child_to_connect(
self,
) -> std::io::Result<(Receiver<ChildToParentMsg>, Sender<ParentToChildMsg>)> {
self.receiver.poll_until_other_side_exists()?;
Ok((self.receiver, self.sender))
}
}