1use futures::channel::mpsc::UnboundedReceiver;
2use futures::{
3 stream::{StreamExt, TryStreamExt},
4 Stream,
5};
6use netlink_packet_core::{NetlinkMessage, NetlinkPayload};
7use netlink_packet_route::{
8 rtnl::{address::Nla, RtnlMessage::*},
9 AddressMessage, RtnlMessage,
10};
11use netlink_proto::{
12 sys::{AsyncSocket, SocketAddr},
13 Connection as RtConnection,
14};
15use rtnetlink::{constants::*, new_connection, AddressHandle, Handle as RtHandle};
16use std::io::{Error, ErrorKind, Result};
17use std::net::Ipv4Addr;
18
19#[derive(Debug, Clone)]
21pub struct Address {
22 addr: Ipv4Addr,
23 label: String,
24}
25
26impl Address {
27 pub fn addr(&self) -> &Ipv4Addr {
29 &self.addr
30 }
31
32 pub fn label(&self) -> &str {
34 &self.label
35 }
36}
37
38impl TryFrom<AddressMessage> for Address {
39 type Error = Error;
40
41 fn try_from(am: AddressMessage) -> Result<Address> {
42 let mut the_addr = None;
43 let mut the_label = None;
44 for nla in am.nlas {
45 match nla {
46 Nla::Address(a) => {
47 let c: [u8; 4] = match a.try_into() {
48 Ok(c) => c,
49 _ => continue,
50 };
51 let addr = Ipv4Addr::from(c);
52 if let Some(label) = the_label {
53 return Ok(Address { addr, label });
54 }
55 the_addr = Some(addr);
56 }
57 Nla::Label(label) => {
58 if let Some(addr) = the_addr {
59 return Ok(Address { addr, label });
60 }
61 the_label = Some(label);
62 }
63 _ => {}
64 }
65 }
66 Err(Error::from(ErrorKind::NotFound))
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct Addresses {
73 handle: RtHandle,
74}
75
76impl Addresses {
77 pub fn stream(self) -> impl Stream<Item = Address> {
79 let inner = AddressHandle::new(self.handle)
80 .get()
81 .execute()
82 .into_stream();
83 inner.filter_map(|item| async move { item.ok().and_then(|am| am.try_into().ok()) })
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct Message {
90 addr: Address,
91 new: bool,
92}
93
94impl Message {
95 fn new(addr: Address, new: bool) -> Self {
96 Message { addr, new }
97 }
98
99 pub fn addr(&self) -> &Address {
101 &self.addr
102 }
103
104 pub fn is_new(&self) -> bool {
106 self.new
107 }
108}
109
110impl TryFrom<RtnlMessage> for Message {
111 type Error = Error;
112
113 fn try_from(item: RtnlMessage) -> Result<Message> {
114 Ok(match item {
115 NewAddress(a) => Message::new(a.try_into()?, true),
116 DelAddress(a) => Message::new(a.try_into()?, false),
117 _ => {
118 return Err(Error::from(ErrorKind::InvalidData));
119 }
120 })
121 }
122}
123
124impl TryFrom<NetlinkMessage<RtnlMessage>> for Message {
125 type Error = Error;
126
127 fn try_from(item: NetlinkMessage<RtnlMessage>) -> Result<Message> {
128 if let NetlinkPayload::InnerMessage(m) = item.payload {
129 m.try_into()
130 } else {
131 Err(Error::from(ErrorKind::InvalidData))
132 }
133 }
134}
135
136#[derive(Debug)]
138pub struct Monitor {
139 messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>,
140}
141
142impl Monitor {
143 pub fn stream(self) -> impl Stream<Item = Message> {
145 self.messages
146 .filter_map(|item| async { item.0.try_into().ok() })
147 }
148}
149
150pub struct Handle {
152 pub addresses: Addresses,
153 pub monitor: Monitor,
154}
155
156pub struct Connection {
158 pub conn: RtConnection<RtnlMessage>,
159 pub handle: Handle,
161}
162
163impl Connection {
164 pub fn new() -> Result<Self> {
166 let (mut conn, handle, messages) = new_connection()?;
167 conn.socket_mut()
168 .socket_mut()
169 .bind(&SocketAddr::new(0, RTMGRP_IPV4_IFADDR))?;
170 Ok(Connection {
171 conn,
172 handle: Handle {
173 addresses: Addresses { handle },
174 monitor: Monitor { messages },
175 },
176 })
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::Connection;
183 use futures::stream::StreamExt;
184
185 #[tokio::test]
186 async fn has_loopback() {
187 let c = Connection::new().unwrap();
188 let rt = tokio::spawn(c.conn);
189 let s = c.handle.addresses.stream();
190 let r = s.any(|m| async move { m.addr.is_loopback() }).await;
191 assert!(r);
192 rt.abort();
193 }
194}