use std::convert::Infallible;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::rc::Rc;
use indexmap::{IndexMap, IndexSet};
use libc::{RTNLGRP_IPV4_IFADDR, RTNLGRP_IPV6_IFADDR};
use neli::consts::nl::NlmF;
use neli::consts::rtnl::{Arphrd, Ifa, Ifla, RtAddrFamily, RtScope, Rtm};
use neli::consts::socket::NlFamily;
use neli::err::RouterError;
use neli::genl::Genlmsghdr;
use neli::nl::{NlPayload, Nlmsghdr};
use neli::router::asynchronous::{NlRouter, NlRouterReceiverHandle};
use neli::rtnl::{Ifaddrmsg, IfaddrmsgBuilder, Ifinfomsg, IfinfomsgBuilder};
use neli::utils::Groups;
use tokio::sync::mpsc::{self, Receiver, Sender};
use super::NetlinkInterface;
use crate::error::Result;
pub type InterfaceUpdate = IndexMap<i32, NetlinkInterface>;
type RtNext<T> = Option<std::result::Result<Nlmsghdr<Rtm, T>, RouterError<Rtm, T>>>;
pub async fn netlink_ipaddr_listen(
manual_trigger: mpsc::Receiver<()>,
) -> Result<Receiver<InterfaceUpdate>> {
let (socket, multicast) = NlRouter::connect(NlFamily::Route, None, Groups::empty()).await?;
socket.enable_strict_checking(true)?;
socket
.add_mcast_membership(Groups::new_groups(&[
RTNLGRP_IPV4_IFADDR,
RTNLGRP_IPV6_IFADDR,
]))
.unwrap();
let (tx, rx) = mpsc::channel(8);
let socket = Rc::new(socket);
tokio::task::spawn_local({
let tx = tx.clone();
let socket = socket.clone();
async move {
if let Err(e) = handle_manual_trigger(socket, manual_trigger, tx).await {
log::error!("fatal error while handling manual network updates: {}", e);
}
}
});
tokio::task::spawn_local(async move {
if let Err(e) = handle_netlink_route_messages(socket, multicast, tx).await {
log::error!("fatal error handling netlink route messages: {}", e);
}
});
Ok(rx)
}
async fn handle_manual_trigger(
socket: Rc<NlRouter>,
mut manual_trigger: mpsc::Receiver<()>,
tx: Sender<InterfaceUpdate>,
) -> Result<Infallible> {
while let Some(()) = manual_trigger.recv().await {
log::debug!("manual network update requested");
tx.send(get_all_interfaces(&socket).await?).await?;
}
bail!("unexpected drop of manual trigger senders");
}
async fn handle_netlink_route_messages(
socket: Rc<NlRouter>,
mut multicast: NlRouterReceiverHandle<u16, Genlmsghdr<u8, u16>>,
tx: Sender<InterfaceUpdate>,
) -> Result<Infallible> {
loop {
match multicast.next().await as RtNext<Ifaddrmsg> {
None => bail!("Unexpected end of netlink route stream"),
Some(response) => {
let response = match response {
Ok(response) => response,
Err(e) => {
log::error!("error receiving netlink message: {}", e);
continue;
}
};
match response.nl_payload() {
NlPayload::Payload(_ifaddrmsg) => {
tx.send(get_all_interfaces(&socket).await?).await?
}
payload => {
log::error!("unexpected nl message payload type: {:?}", payload);
continue;
}
}
}
}
}
}
async fn get_all_interfaces(socket: &Rc<NlRouter>) -> Result<InterfaceUpdate> {
let mut interface_map = IndexMap::<i32, NetlinkInterface>::new();
{
let ifinfomsg = IfinfomsgBuilder::default()
.ifi_family(RtAddrFamily::Unspecified)
.ifi_type(Arphrd::Netrom)
.ifi_index(0)
.build()?;
let mut recv = socket
.send::<Rtm, Ifinfomsg, Rtm, Ifinfomsg>(
Rtm::Getlink,
NlmF::REQUEST | NlmF::DUMP | NlmF::ACK,
NlPayload::Payload(ifinfomsg),
)
.await?;
while let Some(response) = recv.next().await as RtNext<Ifinfomsg> {
let header = match response {
Ok(header) => header,
Err(e) => {
log::error!("an error occurred receiving rtnetlink message: {}", e);
continue;
}
};
if let NlPayload::Payload(ifinfomsg) = header.nl_payload() {
let attr_handle = ifinfomsg.rtattrs().get_attr_handle();
let mut interface_info = NetlinkInterface {
index: *ifinfomsg.ifi_index(),
name: match attr_handle.get_attr_payload_as_with_len::<String>(Ifla::Ifname) {
Ok(interface) => interface.into(),
Err(e) => {
log::error!(
"failed to parse interface name from ifinfomsg: {} :: {:?}",
e,
ifinfomsg
);
continue;
}
},
mac_address: None,
ip_addresses: IndexSet::new(),
};
if let Ok(bytes) =
attr_handle.get_attr_payload_as_with_len_borrowed::<&[u8]>(Ifla::Address)
{
if let Ok(array) = bytes.try_into() {
interface_info.mac_address = Some(array);
}
}
interface_map.insert(*ifinfomsg.ifi_index(), interface_info);
}
}
}
{
for family in [RtAddrFamily::Inet, RtAddrFamily::Inet6] {
let ifaddrmsg = IfaddrmsgBuilder::default()
.ifa_family(family)
.ifa_index(0)
.ifa_prefixlen(0)
.ifa_scope(RtScope::Universe)
.build()?;
let mut recv = socket
.send::<Rtm, Ifaddrmsg, Rtm, Ifaddrmsg>(
Rtm::Getaddr,
NlmF::REQUEST | NlmF::DUMP | NlmF::ACK,
NlPayload::Payload(ifaddrmsg),
)
.await?;
while let Some(response) = recv.next().await as RtNext<Ifaddrmsg> {
let header = match response {
Ok(header) => header,
Err(e) => {
log::warn!("an error occurred receiving rtnetlink message: {}", e);
continue;
}
};
if let NlPayload::Payload(ifaddrmsg) = header.nl_payload() {
match interface_map.get_mut(ifaddrmsg.ifa_index()) {
Some(if_info) => {
let attr_handle = ifaddrmsg.rtattrs().get_attr_handle();
match ifaddrmsg.ifa_family() {
RtAddrFamily::Inet => {
if let Ok(addr) =
attr_handle.get_attr_payload_as::<u32>(Ifa::Address)
{
if_info
.ip_addresses
.insert(IpAddr::V4(Ipv4Addr::from(u32::from_be(addr))));
}
}
RtAddrFamily::Inet6 => {
if let Ok(addr) =
attr_handle.get_attr_payload_as::<u128>(Ifa::Address)
{
if_info.ip_addresses.insert(IpAddr::V6(Ipv6Addr::from(
u128::from_be(addr),
)));
}
}
_ => {
continue;
}
}
}
None => {
log::error!(
"received ifaddrmsg for unknown interface: {:?}",
ifaddrmsg
);
continue;
}
}
}
}
}
}
Ok(interface_map)
}