1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
use futures::sync::mpsc::{unbounded, UnboundedSender};
use futures::Stream;
use rtnetlink::NetlinkMessage;
use LinkHandle;
use errors::NetlinkIpError;
type RequestsTx = UnboundedSender<(UnboundedSender<NetlinkMessage>, NetlinkMessage)>;
#[derive(Clone, Debug)]
pub struct ConnectionHandle {
requests_tx: UnboundedSender<(UnboundedSender<NetlinkMessage>, NetlinkMessage)>,
}
impl ConnectionHandle {
pub(crate) fn new(requests_tx: RequestsTx) -> Self {
ConnectionHandle { requests_tx }
}
pub fn request(
&mut self,
message: NetlinkMessage,
) -> impl Stream<Item = NetlinkMessage, Error = NetlinkIpError> {
let (tx, rx) = unbounded::<NetlinkMessage>();
debug!("handle: forwarding new request to connection");
let _ = UnboundedSender::unbounded_send(&self.requests_tx, (tx, message));
rx.map_err(|()| {
error!("could not forward new request to connection: the connection is closed");
NetlinkIpError::ConnectionClosed
})
}
pub fn link(&self) -> LinkHandle {
LinkHandle::new(self.clone())
}
}