1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
use futures::sync::mpsc::{unbounded, UnboundedSender};
use futures::Stream;
use rtnetlink::NetlinkMessage;
use LinkHandle;

use errors::NetlinkIpError;

type RequestsTx = UnboundedSender<(UnboundedSender<NetlinkMessage>, NetlinkMessage)>;

/// A handle to pass requests to a [`Connection`](struct.Connection.html).
#[derive(Clone, Debug)]
pub struct ConnectionHandle {
    requests_tx: UnboundedSender<(UnboundedSender<NetlinkMessage>, NetlinkMessage)>,
}

impl ConnectionHandle {
    pub(crate) fn new(requests_tx: RequestsTx) -> Self {
        ConnectionHandle { requests_tx }
    }

    /// Send a new request and get the response as a stream of messages. Note that some messages
    /// are not part of the response stream:
    ///
    /// - **acknowledgements**: when an acknowledgement is received, the stream is closed
    /// - **end of dump messages**: similarly, upon receiving an "end of dump" message, the stream is
    /// closed
    pub fn request(
        &mut self,
        message: NetlinkMessage,
    ) -> impl Stream<Item = NetlinkMessage, Error = NetlinkIpError> {
        let (tx, rx) = unbounded::<NetlinkMessage>();
        // Ignore the result. If this failed, `tx` will be dropped when this funtion returns, and
        // polling rx with fail, carrying the error.
        debug!("handle: forwarding new request to connection");
        let _ = UnboundedSender::unbounded_send(&self.requests_tx, (tx, message));
        rx.map_err(|()| {
            error!("could not forward new request to connection: the connection is closed");
            NetlinkIpError::ConnectionClosed
        })
    }

    /// Create a new handle, specifically for link requests (equivalent to `ip link` commands)
    pub fn link(&self) -> LinkHandle {
        LinkHandle::new(self.clone())
    }
}