1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
//! A library wrapping various IPC mechanisms with a datagram-oriented
//! messaging layer. This is how CCP communicates with the datapath.
use super::Error;
use super::Result;
use std::rc::{Rc, Weak};
use std::sync::{atomic, Arc};
use tracing::{info, trace};
/// Thread-channel implementation
pub mod chan;
#[cfg(all(target_os = "linux"))]
/// Character device implementation
pub mod kp;
#[cfg(all(target_os = "linux"))]
/// Netlink socket implementation
pub mod netlink;
/// Unix domain socket implementation
pub mod unix;
/// IPC mechanisms must implement this trait.
///
/// This API enables both connection-oriented (send/recv) and connectionless (sendto/recvfrom)
/// sockets, but currently only unix sockets support connectionless sockets. When using unix
/// sockets, you must provide a valid `Addr` to `send()` and you will also receive a valid
/// `Addr` as a return value from `recv`. When using connection-oriented ipc mechanisms, these
/// values are ignored and should just be the nil value `()`.
pub trait Ipc: 'static + Send {
type Addr: Clone + Default + std::cmp::Eq + std::hash::Hash + std::fmt::Debug;
/// Returns the name of this IPC mechanism (e.g. "netlink" for Linux netlink sockets)
fn name() -> String;
/// Blocking send
fn send(&self, msg: &[u8], to: &Self::Addr) -> Result<()>;
/// Blocking listen.
///
/// Returns how many bytes were read, and (if using unix sockets) the address of the sender.
///
/// Important: should not allocate!
fn recv(&self, msg: &mut [u8]) -> Result<(usize, Self::Addr)>;
/// Close the underlying sockets
fn close(&mut self) -> Result<()>;
}
/// Marker type specifying that the IPC socket should make blocking calls to the underlying socket
pub struct Blocking;
/// Marker type specifying that the IPC socket should make nonblocking calls to the underlying socket
pub struct Nonblocking;
/// Backend builder contains the objects
/// needed to build a new backend.
pub struct BackendBuilder<T: Ipc> {
pub sock: T,
}
impl<T: Ipc> BackendBuilder<T> {
pub fn build<'a>(
self,
atomic_bool: Arc<atomic::AtomicBool>,
receive_buf: &'a mut [u8],
) -> Backend<'a, T> {
Backend::new(self.sock, atomic_bool, receive_buf)
}
}
/// A send-only handle to the underlying IPC socket.
pub struct BackendSender<T: Ipc>(Weak<T>, T::Addr);
impl<T: Ipc> BackendSender<T> {
/// Blocking send.
pub fn send_msg(&self, msg: &[u8]) -> Result<()> {
let s = Weak::upgrade(&self.0)
.ok_or_else(|| Error(String::from("Send on closed IPC socket!")))?;
s.send(msg, &self.1).map_err(Error::from)
}
pub fn clone_with_dest(&self, to: T::Addr) -> Self {
BackendSender(self.0.clone(), to)
}
}
impl<T: Ipc> Clone for BackendSender<T> {
fn clone(&self) -> Self {
BackendSender(self.0.clone(), self.1.clone())
}
}
/// Backend will yield incoming IPC messages forever via `next()`.
/// It owns the socket; `BackendSender` holds weak references.
/// The atomic bool is a way to stop iterating.
pub struct Backend<'a, T: Ipc> {
sock: Rc<T>,
continue_listening: Arc<atomic::AtomicBool>,
receive_buf: &'a mut [u8],
tot_read: usize,
read_until: usize,
last_recv_addr: T::Addr,
}
use crate::serialize::Msg;
impl<'a, T: Ipc> Backend<'a, T> {
pub fn new(
sock: T,
continue_listening: Arc<atomic::AtomicBool>,
receive_buf: &'a mut [u8],
) -> Backend<'a, T> {
Backend {
sock: Rc::new(sock),
continue_listening,
receive_buf,
tot_read: 0,
read_until: 0,
last_recv_addr: Default::default(),
}
}
pub fn sender(&self, to: T::Addr) -> BackendSender<T> {
BackendSender(Rc::downgrade(&self.sock), to)
}
/// Return a copy of the flag variable that indicates that the
/// `Backend` should continue listening (i.e., not exit).
pub fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool> {
Arc::clone(&(self.continue_listening))
}
/// Get the next IPC message.
// This is similar to `impl Iterator`, but the returned value is tied to the lifetime
// of `self`, so we cannot implement that trait.
pub fn next(&mut self) -> Option<(Msg<'_>, T::Addr)> {
// if we have leftover buffer from the last read, parse another message.
if self.read_until < self.tot_read {
let (msg, consumed) = Msg::from_buf(&self.receive_buf[self.read_until..]).ok()?;
self.read_until += consumed;
Some((msg, self.last_recv_addr.clone()))
} else {
self.tot_read = self.get_next_read().ok()?;
self.read_until = 0;
let (msg, consumed) =
Msg::from_buf(&self.receive_buf[self.read_until..self.tot_read]).ok()?;
self.read_until += consumed;
Some((msg, self.last_recv_addr.clone()))
}
}
// calls IPC repeatedly to read one or more messages.
// Returns a slice into self.receive_buf covering the read data
fn get_next_read(&mut self) -> Result<usize> {
loop {
// if continue_loop has been set to false, stop iterating
if !self.continue_listening.load(atomic::Ordering::SeqCst) {
info!("recieved kill signal");
return Err(Error(String::from("Done")));
}
let (read, addr) = match self.sock.recv(self.receive_buf) {
Ok(r) => r,
Err(Error(e)) => {
trace!(err = %format!("{:#?}", e), "recv failed" );
continue;
}
};
// NOTE This may seem precarious, but is safe
// In the case that `recv` returns a buffer containing multiple messages,
// `next()` will continue to hit the first `if` branch (and thus will not
// call `get_next_read()` again) until all of the messages from that buffer
// have been returned. So it is not possible for recvs to interleave and
// interfere with the last_recv_addr value.
self.last_recv_addr = addr;
if read == 0 {
continue;
}
return Ok(read);
}
}
}
impl<'a, T: Ipc> Drop for Backend<'a, T> {
fn drop(&mut self) {
Rc::get_mut(&mut self.sock)
.ok_or_else(|| {
Error(String::from(
"Could not get exclusive ref to socket to close",
))
})
.and_then(Ipc::close)
.unwrap_or_else(|_| ());
}
}
#[cfg(test)]
pub mod test;