use super::Error;
use super::Result;
use std::rc::{Rc, Weak};
use std::sync::{Arc, atomic};
use tracing::{info, trace};
pub mod chan;
#[cfg(all(target_os = "linux"))]
pub mod kp;
#[cfg(all(target_os = "linux"))]
pub mod netlink;
pub mod unix;
pub trait Ipc: 'static + Send {
type Addr: Clone + Default + std::cmp::Eq + std::hash::Hash + std::fmt::Debug;
fn name() -> String;
fn send(&self, msg: &[u8], to: &Self::Addr) -> Result<()>;
fn recv(&self, msg: &mut [u8]) -> Result<(usize, Self::Addr)>;
fn close(&mut self) -> Result<()>;
}
pub struct Blocking;
pub struct Nonblocking;
pub struct BackendBuilder<T: Ipc> {
pub sock: T,
}
impl<T: Ipc> BackendBuilder<T> {
pub fn build<'a>(
self,
atomic_bool: Arc<atomic::AtomicBool>,
receive_buf: &'a mut [u8],
) -> Backend<'a, T> {
Backend::new(self.sock, atomic_bool, receive_buf)
}
}
pub struct BackendSender<T: Ipc>(Weak<T>, T::Addr);
impl<T: Ipc> BackendSender<T> {
pub fn send_msg(&self, msg: &[u8]) -> Result<()> {
let s = Weak::upgrade(&self.0)
.ok_or_else(|| Error(String::from("Send on closed IPC socket!")))?;
s.send(msg, &self.1).map_err(Error::from)
}
pub fn clone_with_dest(&self, to: T::Addr) -> Self {
BackendSender(self.0.clone(), to)
}
}
impl<T: Ipc> Clone for BackendSender<T> {
fn clone(&self) -> Self {
BackendSender(self.0.clone(), self.1.clone())
}
}
pub struct Backend<'a, T: Ipc> {
sock: Rc<T>,
continue_listening: Arc<atomic::AtomicBool>,
receive_buf: &'a mut [u8],
tot_read: usize,
read_until: usize,
last_recv_addr: T::Addr,
}
use crate::serialize::Msg;
impl<'a, T: Ipc> Backend<'a, T> {
pub fn new(
sock: T,
continue_listening: Arc<atomic::AtomicBool>,
receive_buf: &'a mut [u8],
) -> Backend<'a, T> {
Backend {
sock: Rc::new(sock),
continue_listening,
receive_buf,
tot_read: 0,
read_until: 0,
last_recv_addr: Default::default(),
}
}
pub fn sender(&self, to: T::Addr) -> BackendSender<T> {
BackendSender(Rc::downgrade(&self.sock), to)
}
pub fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool> {
Arc::clone(&(self.continue_listening))
}
pub fn next(&mut self) -> Option<(Msg<'_>, T::Addr)> {
if self.read_until < self.tot_read {
let (msg, consumed) = Msg::from_buf(&self.receive_buf[self.read_until..]).ok()?;
self.read_until += consumed;
Some((msg, self.last_recv_addr.clone()))
} else {
self.tot_read = self.get_next_read().ok()?;
self.read_until = 0;
let (msg, consumed) =
Msg::from_buf(&self.receive_buf[self.read_until..self.tot_read]).ok()?;
self.read_until += consumed;
Some((msg, self.last_recv_addr.clone()))
}
}
fn get_next_read(&mut self) -> Result<usize> {
loop {
if !self.continue_listening.load(atomic::Ordering::SeqCst) {
info!("recieved kill signal");
return Err(Error(String::from("Done")));
}
let (read, addr) = match self.sock.recv(self.receive_buf) {
Ok(r) => r,
Err(Error(e)) => {
trace!(err = %format!("{:#?}", e), "recv failed" );
continue;
}
};
self.last_recv_addr = addr;
if read == 0 {
continue;
}
return Ok(read);
}
}
}
impl<'a, T: Ipc> Drop for Backend<'a, T> {
fn drop(&mut self) {
Rc::get_mut(&mut self.sock)
.ok_or_else(|| {
Error(String::from(
"Could not get exclusive ref to socket to close",
))
})
.and_then(Ipc::close)
.unwrap_or_else(|_| ());
}
}
#[cfg(test)]
pub mod test;