1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
//! A library wrapping various IPC mechanisms with a datagram-oriented
//! messaging layer. This is how CCP communicates with the datapath.

use super::Error;
use super::Result;
use std::rc::{Rc, Weak};
use std::sync::{atomic, Arc};
use tracing::{info, trace};

/// Thread-channel implementation
pub mod chan;
#[cfg(all(target_os = "linux"))]
/// Character device implementation
pub mod kp;
#[cfg(all(target_os = "linux"))]
/// Netlink socket implementation
pub mod netlink;
/// Unix domain socket implementation
pub mod unix;

/// IPC mechanisms must implement this trait.
///
/// This API enables both connection-oriented (send/recv) and connectionless (sendto/recvfrom)
/// sockets, but currently only unix sockets support connectionless sockets. When using unix
/// sockets, you must provide a valid `Addr` to `send()` and you will also receive a valid
/// `Addr` as a return value from `recv`. When using connection-oriented ipc mechanisms, these
/// values are ignored and should just be the nil value `()`.
pub trait Ipc: 'static + Send {
    type Addr: Clone + Default + std::cmp::Eq + std::hash::Hash + std::fmt::Debug;
    /// Returns the name of this IPC mechanism (e.g. "netlink" for Linux netlink sockets)
    fn name() -> String;
    /// Blocking send
    fn send(&self, msg: &[u8], to: &Self::Addr) -> Result<()>;
    /// Blocking listen.
    ///
    /// Returns how many bytes were read, and (if using unix sockets) the address of the sender.
    ///
    /// Important: should not allocate!
    fn recv(&self, msg: &mut [u8]) -> Result<(usize, Self::Addr)>;
    /// Close the underlying sockets
    fn close(&mut self) -> Result<()>;
}

/// Marker type specifying that the IPC socket should make blocking calls to the underlying socket
pub struct Blocking;
/// Marker type specifying that the IPC socket should make nonblocking calls to the underlying socket
pub struct Nonblocking;

/// Backend builder contains the objects
/// needed to build a new backend.
pub struct BackendBuilder<T: Ipc> {
    pub sock: T,
}

impl<T: Ipc> BackendBuilder<T> {
    pub fn build<'a>(
        self,
        atomic_bool: Arc<atomic::AtomicBool>,
        receive_buf: &'a mut [u8],
    ) -> Backend<'a, T> {
        Backend::new(self.sock, atomic_bool, receive_buf)
    }
}

/// A send-only handle to the underlying IPC socket.
pub struct BackendSender<T: Ipc>(Weak<T>, T::Addr);

impl<T: Ipc> BackendSender<T> {
    /// Blocking send.
    pub fn send_msg(&self, msg: &[u8]) -> Result<()> {
        let s = Weak::upgrade(&self.0)
            .ok_or_else(|| Error(String::from("Send on closed IPC socket!")))?;
        s.send(msg, &self.1).map_err(Error::from)
    }
    pub fn clone_with_dest(&self, to: T::Addr) -> Self {
        BackendSender(self.0.clone(), to)
    }
}

impl<T: Ipc> Clone for BackendSender<T> {
    fn clone(&self) -> Self {
        BackendSender(self.0.clone(), self.1.clone())
    }
}

/// Backend will yield incoming IPC messages forever via `next()`.
/// It owns the socket; `BackendSender` holds weak references.
/// The atomic bool is a way to stop iterating.
pub struct Backend<'a, T: Ipc> {
    sock: Rc<T>,
    continue_listening: Arc<atomic::AtomicBool>,
    receive_buf: &'a mut [u8],
    tot_read: usize,
    read_until: usize,
    last_recv_addr: T::Addr,
}

use crate::serialize::Msg;
impl<'a, T: Ipc> Backend<'a, T> {
    pub fn new(
        sock: T,
        continue_listening: Arc<atomic::AtomicBool>,
        receive_buf: &'a mut [u8],
    ) -> Backend<'a, T> {
        Backend {
            sock: Rc::new(sock),
            continue_listening,
            receive_buf,
            tot_read: 0,
            read_until: 0,
            last_recv_addr: Default::default(),
        }
    }

    pub fn sender(&self, to: T::Addr) -> BackendSender<T> {
        BackendSender(Rc::downgrade(&self.sock), to)
    }

    /// Return a copy of the flag variable that indicates that the
    /// `Backend` should continue listening (i.e., not exit).
    pub fn clone_atomic_bool(&self) -> Arc<atomic::AtomicBool> {
        Arc::clone(&(self.continue_listening))
    }

    /// Get the next IPC message.
    // This is similar to `impl Iterator`, but the returned value is tied to the lifetime
    // of `self`, so we cannot implement that trait.
    pub fn next(&mut self) -> Option<(Msg<'_>, T::Addr)> {
        // if we have leftover buffer from the last read, parse another message.
        if self.read_until < self.tot_read {
            let (msg, consumed) = Msg::from_buf(&self.receive_buf[self.read_until..]).ok()?;
            self.read_until += consumed;
            Some((msg, self.last_recv_addr.clone()))
        } else {
            self.tot_read = self.get_next_read().ok()?;
            self.read_until = 0;
            let (msg, consumed) =
                Msg::from_buf(&self.receive_buf[self.read_until..self.tot_read]).ok()?;
            self.read_until += consumed;

            Some((msg, self.last_recv_addr.clone()))
        }
    }

    // calls IPC repeatedly to read one or more messages.
    // Returns a slice into self.receive_buf covering the read data
    fn get_next_read(&mut self) -> Result<usize> {
        loop {
            // if continue_loop has been set to false, stop iterating
            if !self.continue_listening.load(atomic::Ordering::SeqCst) {
                info!("recieved kill signal");
                return Err(Error(String::from("Done")));
            }

            let (read, addr) = match self.sock.recv(self.receive_buf) {
                Ok(r) => r,
                Err(Error(e)) => {
                    trace!(err = %format!("{:#?}", e), "recv failed" );
                    continue;
                }
            };

            // NOTE This may seem precarious, but is safe
            // In the case that `recv` returns a buffer containing multiple messages,
            // `next()` will continue to hit the first `if` branch (and thus will not
            // call `get_next_read()` again) until all of the messages from that buffer
            // have been returned. So it is not possible for recvs to interleave and
            // interfere with the last_recv_addr value.
            self.last_recv_addr = addr;

            if read == 0 {
                continue;
            }

            return Ok(read);
        }
    }
}

impl<'a, T: Ipc> Drop for Backend<'a, T> {
    fn drop(&mut self) {
        Rc::get_mut(&mut self.sock)
            .ok_or_else(|| {
                Error(String::from(
                    "Could not get exclusive ref to socket to close",
                ))
            })
            .and_then(Ipc::close)
            .unwrap_or_else(|_| ());
    }
}

#[cfg(test)]
pub mod test;