ip_roam/
lib.rs

1use futures::channel::mpsc::UnboundedReceiver;
2use futures::{
3    stream::{StreamExt, TryStreamExt},
4    Stream,
5};
6use netlink_packet_core::{NetlinkMessage, NetlinkPayload};
7use netlink_packet_route::{
8    rtnl::{address::Nla, RtnlMessage::*},
9    AddressMessage, RtnlMessage,
10};
11use netlink_proto::{
12    sys::{AsyncSocket, SocketAddr},
13    Connection as RtConnection,
14};
15use rtnetlink::{constants::*, new_connection, AddressHandle, Handle as RtHandle};
16use std::io::{Error, ErrorKind, Result};
17use std::net::Ipv4Addr;
18
19/// A retrieved address entry.
20#[derive(Debug, Clone)]
21pub struct Address {
22    addr: Ipv4Addr,
23    label: String,
24}
25
26impl Address {
27    /// Gets the IPv4 address.
28    pub fn addr(&self) -> &Ipv4Addr {
29        &self.addr
30    }
31
32    /// Gets the label of the interface.
33    pub fn label(&self) -> &str {
34        &self.label
35    }
36}
37
38impl TryFrom<AddressMessage> for Address {
39    type Error = Error;
40
41    fn try_from(am: AddressMessage) -> Result<Address> {
42        let mut the_addr = None;
43        let mut the_label = None;
44        for nla in am.nlas {
45            match nla {
46                Nla::Address(a) => {
47                    let c: [u8; 4] = match a.try_into() {
48                        Ok(c) => c,
49                        _ => continue,
50                    };
51                    let addr = Ipv4Addr::from(c);
52                    if let Some(label) = the_label {
53                        return Ok(Address { addr, label });
54                    }
55                    the_addr = Some(addr);
56                }
57                Nla::Label(label) => {
58                    if let Some(addr) = the_addr {
59                        return Ok(Address { addr, label });
60                    }
61                    the_label = Some(label);
62                }
63                _ => {}
64            }
65        }
66        Err(Error::from(ErrorKind::NotFound))
67    }
68}
69
70/// A handle to get current local addresses.
71#[derive(Debug, Clone)]
72pub struct Addresses {
73    handle: RtHandle,
74}
75
76impl Addresses {
77    /// Streams the current local addresses.
78    pub fn stream(self) -> impl Stream<Item = Address> {
79        let inner = AddressHandle::new(self.handle)
80            .get()
81            .execute()
82            .into_stream();
83        inner.filter_map(|item| async move { item.ok().and_then(|am| am.try_into().ok()) })
84    }
85}
86
87/// A message from the monitor, denoting a new or deleted address.
88#[derive(Debug, Clone)]
89pub struct Message {
90    addr: Address,
91    new: bool,
92}
93
94impl Message {
95    fn new(addr: Address, new: bool) -> Self {
96        Message { addr, new }
97    }
98
99    /// Gets the address.
100    pub fn addr(&self) -> &Address {
101        &self.addr
102    }
103
104    /// Checks whether the address is new or deleted.
105    pub fn is_new(&self) -> bool {
106        self.new
107    }
108}
109
110impl TryFrom<RtnlMessage> for Message {
111    type Error = Error;
112
113    fn try_from(item: RtnlMessage) -> Result<Message> {
114        Ok(match item {
115            NewAddress(a) => Message::new(a.try_into()?, true),
116            DelAddress(a) => Message::new(a.try_into()?, false),
117            _ => {
118                return Err(Error::from(ErrorKind::InvalidData));
119            }
120        })
121    }
122}
123
124impl TryFrom<NetlinkMessage<RtnlMessage>> for Message {
125    type Error = Error;
126
127    fn try_from(item: NetlinkMessage<RtnlMessage>) -> Result<Message> {
128        if let NetlinkPayload::InnerMessage(m) = item.payload {
129            m.try_into()
130        } else {
131            Err(Error::from(ErrorKind::InvalidData))
132        }
133    }
134}
135
136/// A monitor to watch the changes of local addresses.
137#[derive(Debug)]
138pub struct Monitor {
139    messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>,
140}
141
142impl Monitor {
143    /// Streams the monitor messages.
144    pub fn stream(self) -> impl Stream<Item = Message> {
145        self.messages
146            .filter_map(|item| async { item.0.try_into().ok() })
147    }
148}
149
150/// Handles to get the current local addresses and their changes.
151pub struct Handle {
152    pub addresses: Addresses,
153    pub monitor: Monitor,
154}
155
156/// A pending connection to the netlink socket.
157pub struct Connection {
158    pub conn: RtConnection<RtnlMessage>,
159    /// The `conn` future must be spawned before the `handle` could work.
160    pub handle: Handle,
161}
162
163impl Connection {
164    /// Creates a pending connection to the netlink socket.
165    pub fn new() -> Result<Self> {
166        let (mut conn, handle, messages) = new_connection()?;
167        conn.socket_mut()
168            .socket_mut()
169            .bind(&SocketAddr::new(0, RTMGRP_IPV4_IFADDR))?;
170        Ok(Connection {
171            conn,
172            handle: Handle {
173                addresses: Addresses { handle },
174                monitor: Monitor { messages },
175            },
176        })
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::Connection;
183    use futures::stream::StreamExt;
184
185    #[tokio::test]
186    async fn has_loopback() {
187        let c = Connection::new().unwrap();
188        let rt = tokio::spawn(c.conn);
189        let s = c.handle.addresses.stream();
190        let r = s.any(|m| async move { m.addr.is_loopback() }).await;
191        assert!(r);
192        rt.abort();
193    }
194}