1use futures::{
4 channel::mpsc::{unbounded, UnboundedSender},
5 Stream,
6};
7use netlink_packet_core::NetlinkMessage;
8use std::fmt::Debug;
9
10use crate::{errors::Error, sys::SocketAddr, Request};
11
12#[derive(Clone, Debug)]
14pub struct ConnectionHandle<T>
15where
16 T: Debug,
17{
18 requests_tx: UnboundedSender<Request<T>>,
19}
20
21impl<T> ConnectionHandle<T>
22where
23 T: Debug,
24{
25 pub(crate) fn new(requests_tx: UnboundedSender<Request<T>>) -> Self {
26 ConnectionHandle { requests_tx }
27 }
28
29 pub fn request(
37 &self,
38 message: NetlinkMessage<T>,
39 destination: SocketAddr,
40 ) -> Result<impl Stream<Item = NetlinkMessage<T>>, Error<T>> {
41 let (tx, rx) = unbounded::<NetlinkMessage<T>>();
42 let request = Request::from((message, destination, tx));
43 trace!("handle: forwarding new request to connection");
44 UnboundedSender::unbounded_send(&self.requests_tx, request).map_err(
45 |e| {
46 if e.is_full() {
49 panic!("internal error: unbounded channel full?!");
50 } else if e.is_disconnected() {
51 Error::ConnectionClosed
52 } else {
53 panic!("unknown error: {:?}", e);
54 }
55 },
56 )?;
57 Ok(rx)
58 }
59
60 pub fn notify(
61 &self,
62 message: NetlinkMessage<T>,
63 destination: SocketAddr,
64 ) -> Result<(), Error<T>> {
65 let (tx, _rx) = unbounded::<NetlinkMessage<T>>();
66 let request = Request::from((message, destination, tx));
67 trace!("handle: forwarding new request to connection");
68 UnboundedSender::unbounded_send(&self.requests_tx, request)
69 .map_err(|_| Error::ConnectionClosed)
70 }
71}