use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::Stream;
use super::{connection::Connection, error::Result, message::MessageIter, protocol::ProtocolState};
mod private {
pub trait Sealed {}
}
pub trait EventSource: ProtocolState + private::Sealed {
type Event: Send + 'static;
fn parse_events(data: &[u8]) -> Vec<Self::Event>;
}
pub struct EventSubscription<'a, P: EventSource> {
conn: &'a Connection<P>,
buffer: Vec<u8>,
pending: Vec<P::Event>,
}
impl<'a, P: EventSource> EventSubscription<'a, P> {
pub(crate) fn new(conn: &'a Connection<P>) -> Self {
Self {
conn,
buffer: Vec::new(),
pending: Vec::new(),
}
}
}
impl<P: EventSource> Stream for EventSubscription<'_, P> {
type Item = Result<P::Event>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(event) = this.pending.pop() {
return Poll::Ready(Some(Ok(event)));
}
loop {
match this.conn.socket().poll_recv(cx) {
Poll::Ready(Ok(data)) => {
this.buffer = data;
this.pending = P::parse_events(&this.buffer);
tracing::trace!(
protocol = std::any::type_name::<P>(),
events = this.pending.len(),
"delivered multicast batch"
);
this.pending.reverse();
if let Some(event) = this.pending.pop() {
return Poll::Ready(Some(Ok(event)));
}
continue;
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => return Poll::Pending,
}
}
}
}
impl<P: EventSource> Unpin for EventSubscription<'_, P> {}
pub struct OwnedEventStream<P: EventSource> {
conn: Connection<P>,
buffer: Vec<u8>,
pending: Vec<P::Event>,
}
impl<P: EventSource> OwnedEventStream<P> {
pub(crate) fn new(conn: Connection<P>) -> Self {
Self {
conn,
buffer: Vec::new(),
pending: Vec::new(),
}
}
pub fn connection(&self) -> &Connection<P> {
&self.conn
}
pub fn into_connection(self) -> Connection<P> {
self.conn
}
}
impl<P: EventSource> Stream for OwnedEventStream<P> {
type Item = Result<P::Event>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(event) = this.pending.pop() {
return Poll::Ready(Some(Ok(event)));
}
loop {
match this.conn.socket().poll_recv(cx) {
Poll::Ready(Ok(data)) => {
this.buffer = data;
this.pending = P::parse_events(&this.buffer);
tracing::trace!(
protocol = std::any::type_name::<P>(),
events = this.pending.len(),
"delivered multicast batch (owned stream)"
);
this.pending.reverse();
if let Some(event) = this.pending.pop() {
return Poll::Ready(Some(Ok(event)));
}
continue;
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => return Poll::Pending,
}
}
}
}
impl<P: EventSource> Unpin for OwnedEventStream<P> {}
impl<P: EventSource> Connection<P> {
pub fn events(&self) -> EventSubscription<'_, P> {
EventSubscription::new(self)
}
pub fn into_events(self) -> OwnedEventStream<P> {
OwnedEventStream::new(self)
}
}
use super::{
connector::ProcEvent,
events::NetworkEvent,
message::NlMsgType,
messages::{AddressMessage, LinkMessage, NeighborMessage, RouteMessage, TcMessage},
parse::FromNetlink,
protocol::{Connector, Devlink, Ethtool, KobjectUevent, Nl80211, Route, SELinux},
selinux::SELinuxEvent,
uevent::Uevent,
};
impl private::Sealed for Route {}
impl EventSource for Route {
type Event = NetworkEvent;
fn parse_events(data: &[u8]) -> Vec<NetworkEvent> {
let mut events = Vec::new();
for (header, payload) in MessageIter::new(data).flatten() {
if let Some(event) = parse_route_event(header.nlmsg_type, payload) {
events.push(event);
}
}
events
}
}
fn parse_route_event(msg_type: u16, payload: &[u8]) -> Option<NetworkEvent> {
match msg_type {
t if t == NlMsgType::RTM_NEWLINK => LinkMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::NewLink),
t if t == NlMsgType::RTM_DELLINK => LinkMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::DelLink),
t if t == NlMsgType::RTM_NEWADDR => AddressMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::NewAddress),
t if t == NlMsgType::RTM_DELADDR => AddressMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::DelAddress),
t if t == NlMsgType::RTM_NEWROUTE => RouteMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::NewRoute),
t if t == NlMsgType::RTM_DELROUTE => RouteMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::DelRoute),
t if t == NlMsgType::RTM_NEWNEIGH => NeighborMessage::from_bytes(payload).ok().map(|msg| {
if msg.family() == 7 {
super::fdb::FdbEntry::from_neighbor(&msg)
.map(NetworkEvent::NewFdb)
.unwrap_or(NetworkEvent::NewNeighbor(msg))
} else {
NetworkEvent::NewNeighbor(msg)
}
}),
t if t == NlMsgType::RTM_DELNEIGH => NeighborMessage::from_bytes(payload).ok().map(|msg| {
if msg.family() == 7 {
super::fdb::FdbEntry::from_neighbor(&msg)
.map(NetworkEvent::DelFdb)
.unwrap_or(NetworkEvent::DelNeighbor(msg))
} else {
NetworkEvent::DelNeighbor(msg)
}
}),
t if t == NlMsgType::RTM_NEWQDISC => TcMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::NewQdisc),
t if t == NlMsgType::RTM_DELQDISC => TcMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::DelQdisc),
t if t == NlMsgType::RTM_NEWTCLASS => TcMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::NewClass),
t if t == NlMsgType::RTM_DELTCLASS => TcMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::DelClass),
t if t == NlMsgType::RTM_NEWTFILTER => TcMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::NewFilter),
t if t == NlMsgType::RTM_DELTFILTER => TcMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::DelFilter),
t if t == NlMsgType::RTM_NEWACTION => TcMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::NewAction),
t if t == NlMsgType::RTM_DELACTION => TcMessage::from_bytes(payload)
.ok()
.map(NetworkEvent::DelAction),
_ => None,
}
}
impl private::Sealed for KobjectUevent {}
impl EventSource for KobjectUevent {
type Event = Uevent;
fn parse_events(data: &[u8]) -> Vec<Uevent> {
Uevent::parse(data).into_iter().collect()
}
}
impl private::Sealed for Connector {}
impl EventSource for Connector {
type Event = ProcEvent;
fn parse_events(data: &[u8]) -> Vec<ProcEvent> {
parse_connector_event(data).into_iter().collect()
}
}
fn parse_connector_event(data: &[u8]) -> Option<ProcEvent> {
const NLMSG_HDRLEN: usize = 16;
const CN_MSG_HDRLEN: usize = 20;
if data.len() < NLMSG_HDRLEN + CN_MSG_HDRLEN {
return None;
}
let payload = &data[NLMSG_HDRLEN + CN_MSG_HDRLEN..];
ProcEvent::parse_from_bytes(payload)
}
impl private::Sealed for SELinux {}
impl EventSource for SELinux {
type Event = SELinuxEvent;
fn parse_events(data: &[u8]) -> Vec<SELinuxEvent> {
parse_selinux_event(data).into_iter().collect()
}
}
fn parse_selinux_event(data: &[u8]) -> Option<SELinuxEvent> {
use zerocopy::FromBytes;
use super::selinux::{SelnlMsgPolicyload, SelnlMsgSetenforce};
const NLMSG_HDRLEN: usize = 16;
const SELNL_MSG_SETENFORCE: u16 = 0x10;
const SELNL_MSG_POLICYLOAD: u16 = 0x11;
if data.len() < NLMSG_HDRLEN {
return None;
}
let nlmsg_type = u16::from_ne_bytes([data[4], data[5]]);
let payload = &data[NLMSG_HDRLEN..];
match nlmsg_type {
SELNL_MSG_SETENFORCE => {
let (msg, _) = SelnlMsgSetenforce::ref_from_prefix(payload).ok()?;
Some(SELinuxEvent::SetEnforce {
enforcing: msg.val != 0,
})
}
SELNL_MSG_POLICYLOAD => {
let (msg, _) = SelnlMsgPolicyload::ref_from_prefix(payload).ok()?;
Some(SELinuxEvent::PolicyLoad { seqno: msg.seqno })
}
_ => None,
}
}
impl private::Sealed for Devlink {}
impl EventSource for Devlink {
type Event = super::genl::devlink::DevlinkEvent;
fn parse_events(data: &[u8]) -> Vec<Self::Event> {
parse_devlink_events(data)
}
}
fn parse_devlink_events(data: &[u8]) -> Vec<super::genl::devlink::DevlinkEvent> {
use super::genl::{
GENL_HDRLEN, GenlMsgHdr,
devlink::{
DEVLINK_ATTR_BUS_NAME, DEVLINK_ATTR_DEV_NAME, DEVLINK_ATTR_FLASH_UPDATE_COMPONENT,
DEVLINK_ATTR_FLASH_UPDATE_STATUS_DONE, DEVLINK_ATTR_FLASH_UPDATE_STATUS_MSG,
DEVLINK_ATTR_FLASH_UPDATE_STATUS_TOTAL, DEVLINK_ATTR_HEALTH_REPORTER,
DEVLINK_ATTR_HEALTH_REPORTER_NAME, DEVLINK_ATTR_PORT_INDEX,
DEVLINK_ATTR_PORT_NETDEV_NAME, DEVLINK_CMD_FLASH_UPDATE_STATUS, DEVLINK_CMD_GET,
DEVLINK_CMD_HEALTH_REPORTER_RECOVER, DEVLINK_CMD_PORT_DEL, DEVLINK_CMD_PORT_NEW,
DevlinkEvent, FlashProgress,
},
};
let mut events = Vec::new();
for msg_result in MessageIter::new(data) {
let Ok((header, payload)) = msg_result else {
continue;
};
if header.is_error() || header.is_done() {
continue;
}
if payload.len() < GENL_HDRLEN {
continue;
}
let Some(genl_hdr) = GenlMsgHdr::from_bytes(payload) else {
continue;
};
let cmd = genl_hdr.cmd;
let attrs_data = &payload[GENL_HDRLEN..];
let mut bus = String::new();
let mut device = String::new();
let mut port_index = 0u32;
let mut netdev_name: Option<String> = None;
let mut reporter_name: Option<String> = None;
let mut flash_msg: Option<String> = None;
let mut flash_component: Option<String> = None;
let mut flash_done: u64 = 0;
let mut flash_total: u64 = 0;
for (attr_type, attr_payload) in super::attr::AttrIter::new(attrs_data) {
match attr_type {
DEVLINK_ATTR_BUS_NAME => {
bus = std::str::from_utf8(attr_payload)
.unwrap_or("")
.trim_end_matches('\0')
.to_string();
}
DEVLINK_ATTR_DEV_NAME => {
device = std::str::from_utf8(attr_payload)
.unwrap_or("")
.trim_end_matches('\0')
.to_string();
}
DEVLINK_ATTR_PORT_INDEX if attr_payload.len() >= 4 => {
port_index = u32::from_ne_bytes(attr_payload[..4].try_into().unwrap());
}
DEVLINK_ATTR_PORT_NETDEV_NAME => {
netdev_name = Some(
std::str::from_utf8(attr_payload)
.unwrap_or("")
.trim_end_matches('\0')
.to_string(),
);
}
DEVLINK_ATTR_HEALTH_REPORTER => {
for (inner_type, inner_payload) in super::attr::AttrIter::new(attr_payload) {
if inner_type == DEVLINK_ATTR_HEALTH_REPORTER_NAME {
reporter_name = Some(
std::str::from_utf8(inner_payload)
.unwrap_or("")
.trim_end_matches('\0')
.to_string(),
);
}
}
}
DEVLINK_ATTR_FLASH_UPDATE_STATUS_MSG => {
flash_msg = Some(
std::str::from_utf8(attr_payload)
.unwrap_or("")
.trim_end_matches('\0')
.to_string(),
);
}
DEVLINK_ATTR_FLASH_UPDATE_COMPONENT => {
flash_component = Some(
std::str::from_utf8(attr_payload)
.unwrap_or("")
.trim_end_matches('\0')
.to_string(),
);
}
DEVLINK_ATTR_FLASH_UPDATE_STATUS_DONE if attr_payload.len() >= 8 => {
flash_done = u64::from_ne_bytes(attr_payload[..8].try_into().unwrap());
}
DEVLINK_ATTR_FLASH_UPDATE_STATUS_TOTAL if attr_payload.len() >= 8 => {
flash_total = u64::from_ne_bytes(attr_payload[..8].try_into().unwrap());
}
_ => {}
}
}
let event = match cmd {
DEVLINK_CMD_GET => {
DevlinkEvent::NewDevice { bus, device }
}
DEVLINK_CMD_PORT_NEW => DevlinkEvent::NewPort {
bus,
device,
port_index,
netdev_name,
},
DEVLINK_CMD_PORT_DEL => DevlinkEvent::DelPort {
bus,
device,
port_index,
},
DEVLINK_CMD_HEALTH_REPORTER_RECOVER => DevlinkEvent::HealthEvent {
bus,
device,
reporter: reporter_name,
},
DEVLINK_CMD_FLASH_UPDATE_STATUS => DevlinkEvent::FlashUpdate(FlashProgress {
message: flash_msg,
component: flash_component,
done: flash_done,
total: flash_total,
}),
_ => continue,
};
events.push(event);
}
events
}
impl private::Sealed for Nl80211 {}
impl EventSource for Nl80211 {
type Event = super::genl::nl80211::Nl80211Event;
fn parse_events(data: &[u8]) -> Vec<Self::Event> {
parse_nl80211_events(data)
}
}
fn parse_nl80211_events(data: &[u8]) -> Vec<super::genl::nl80211::Nl80211Event> {
use super::genl::{
GENL_HDRLEN, GenlMsgHdr,
nl80211::{
InterfaceType, NL80211_ATTR_IFINDEX, NL80211_ATTR_IFNAME, NL80211_ATTR_IFTYPE,
NL80211_ATTR_MAC, NL80211_ATTR_REASON_CODE, NL80211_ATTR_REG_ALPHA2,
NL80211_ATTR_STATUS_CODE, NL80211_CMD_CONNECT, NL80211_CMD_DEL_INTERFACE,
NL80211_CMD_DISCONNECT, NL80211_CMD_NEW_INTERFACE, NL80211_CMD_NEW_SCAN_RESULTS,
NL80211_CMD_REG_CHANGE, NL80211_CMD_SCAN_ABORTED, Nl80211Event,
},
};
let mut events = Vec::new();
for msg_result in MessageIter::new(data) {
let Ok((header, payload)) = msg_result else {
continue;
};
if header.is_error() || header.is_done() {
continue;
}
if payload.len() < GENL_HDRLEN {
continue;
}
let Some(genl_hdr) = GenlMsgHdr::from_bytes(payload) else {
continue;
};
let cmd = genl_hdr.cmd;
let attrs_data = &payload[GENL_HDRLEN..];
let mut ifindex = 0u32;
let mut ifname: Option<String> = None;
let mut iftype = InterfaceType::Unspecified;
let mut mac: Option<[u8; 6]> = None;
let mut reason_code = 0u16;
let mut status_code = 0u16;
let mut country: Option<String> = None;
for (attr_type, attr_payload) in super::attr::AttrIter::new(attrs_data) {
match attr_type {
NL80211_ATTR_IFINDEX if attr_payload.len() >= 4 => {
ifindex = u32::from_ne_bytes(attr_payload[..4].try_into().unwrap());
}
NL80211_ATTR_IFNAME => {
ifname = std::str::from_utf8(attr_payload)
.ok()
.map(|s| s.trim_end_matches('\0').to_string())
.filter(|s| !s.is_empty());
}
NL80211_ATTR_IFTYPE if attr_payload.len() >= 4 => {
let val = u32::from_ne_bytes(attr_payload[..4].try_into().unwrap());
iftype = InterfaceType::try_from(val).unwrap_or(InterfaceType::Unspecified);
}
NL80211_ATTR_MAC if attr_payload.len() >= 6 => {
let mut m = [0u8; 6];
m.copy_from_slice(&attr_payload[..6]);
mac = Some(m);
}
NL80211_ATTR_REASON_CODE if attr_payload.len() >= 2 => {
reason_code = u16::from_ne_bytes(attr_payload[..2].try_into().unwrap());
}
NL80211_ATTR_STATUS_CODE if attr_payload.len() >= 2 => {
status_code = u16::from_ne_bytes(attr_payload[..2].try_into().unwrap());
}
NL80211_ATTR_REG_ALPHA2 => {
country = std::str::from_utf8(attr_payload)
.ok()
.map(|s| s.trim_end_matches('\0').to_string())
.filter(|s| !s.is_empty());
}
_ => {}
}
}
let event = match cmd {
NL80211_CMD_NEW_SCAN_RESULTS => Nl80211Event::ScanComplete { ifindex },
NL80211_CMD_SCAN_ABORTED => Nl80211Event::ScanAborted { ifindex },
NL80211_CMD_CONNECT => Nl80211Event::Connect {
ifindex,
bssid: mac.unwrap_or([0; 6]),
status_code,
},
NL80211_CMD_DISCONNECT => Nl80211Event::Disconnect {
ifindex,
bssid: mac,
reason_code,
},
NL80211_CMD_NEW_INTERFACE => Nl80211Event::NewInterface {
ifindex,
name: ifname,
iftype,
},
NL80211_CMD_DEL_INTERFACE => Nl80211Event::DelInterface { ifindex },
NL80211_CMD_REG_CHANGE => Nl80211Event::RegChange { country },
_ => continue,
};
events.push(event);
}
events
}
impl private::Sealed for Ethtool {}
impl EventSource for Ethtool {
type Event = super::genl::ethtool::EthtoolEvent;
fn parse_events(data: &[u8]) -> Vec<Self::Event> {
parse_ethtool_events(data)
}
}
fn parse_ethtool_events(data: &[u8]) -> Vec<super::genl::ethtool::EthtoolEvent> {
use super::genl::{GENL_HDRLEN, GenlMsgHdr};
let mut events = Vec::new();
for msg_result in MessageIter::new(data) {
let Ok((header, payload)) = msg_result else {
continue;
};
if header.is_error() || header.is_done() {
continue;
}
if payload.len() < GENL_HDRLEN {
continue;
}
let Some(genl_hdr) = GenlMsgHdr::from_bytes(payload) else {
continue;
};
let cmd = genl_hdr.cmd;
let attrs_data = &payload[GENL_HDRLEN..];
if let Some(event) = parse_ethtool_event(cmd, attrs_data) {
events.push(event);
}
}
events
}
fn parse_ethtool_event(cmd: u8, data: &[u8]) -> Option<super::genl::ethtool::EthtoolEvent> {
use super::{
attr::AttrIter,
genl::ethtool::{
Channels, Coalesce, Duplex, EthtoolChannelsAttr, EthtoolCmd, EthtoolCoalesceAttr,
EthtoolEvent, EthtoolFeaturesAttr, EthtoolHeaderAttr, EthtoolLinkinfoAttr,
EthtoolLinkmodesAttr, EthtoolLinkstateAttr, EthtoolPauseAttr, EthtoolRingsAttr,
Features, LinkExtState, LinkInfo, LinkModes, LinkState, MdiX, Pause, Port, Rings,
Transceiver,
},
};
fn parse_header(data: &[u8]) -> (Option<String>, Option<u32>) {
let mut ifname = None;
let mut ifindex = None;
for (attr_type, payload) in AttrIter::new(data) {
if attr_type == EthtoolHeaderAttr::DevName as u16 {
ifname = Some(
std::str::from_utf8(payload)
.unwrap_or("")
.trim_end_matches('\0')
.to_string(),
);
} else if attr_type == EthtoolHeaderAttr::DevIndex as u16 && payload.len() >= 4 {
ifindex = Some(u32::from_ne_bytes(payload[..4].try_into().unwrap()));
}
}
(ifname, ifindex)
}
match cmd {
c if c == EthtoolCmd::LinkinfoGet as u8 => {
let mut info = LinkInfo::default();
for (attr_type, payload) in AttrIter::new(data) {
match attr_type {
t if t == EthtoolLinkinfoAttr::Header as u16 => {
let (name, idx) = parse_header(payload);
info.ifname = name;
info.ifindex = idx;
}
t if t == EthtoolLinkinfoAttr::Port as u16 && !payload.is_empty() => {
info.port = Some(Port::from_u8(payload[0]));
}
t if t == EthtoolLinkinfoAttr::Phyaddr as u16 && !payload.is_empty() => {
info.phyaddr = Some(payload[0]);
}
t if t == EthtoolLinkinfoAttr::TpMdiCtrl as u16 && !payload.is_empty() => {
info.tp_mdix_ctrl = Some(MdiX::from_u8(payload[0]));
}
t if t == EthtoolLinkinfoAttr::TpMdix as u16 && !payload.is_empty() => {
info.tp_mdix = Some(MdiX::from_u8(payload[0]));
}
t if t == EthtoolLinkinfoAttr::Transceiver as u16 && !payload.is_empty() => {
info.transceiver = Some(Transceiver::from_u8(payload[0]));
}
_ => {}
}
}
Some(EthtoolEvent::LinkInfoChanged {
ifname: info.ifname.clone(),
info,
})
}
c if c == EthtoolCmd::LinkmodesGet as u8 => {
let mut modes = LinkModes::default();
for (attr_type, payload) in AttrIter::new(data) {
match attr_type {
t if t == EthtoolLinkmodesAttr::Header as u16 => {
let (name, idx) = parse_header(payload);
modes.ifname = name;
modes.ifindex = idx;
}
t if t == EthtoolLinkmodesAttr::Autoneg as u16 && !payload.is_empty() => {
modes.autoneg = payload[0] != 0;
}
t if t == EthtoolLinkmodesAttr::Speed as u16 && payload.len() >= 4 => {
let speed = u32::from_ne_bytes(payload[..4].try_into().unwrap());
if speed != 0xFFFFFFFF {
modes.speed = Some(speed);
}
}
t if t == EthtoolLinkmodesAttr::Duplex as u16 && !payload.is_empty() => {
modes.duplex = Some(Duplex::from_u8(payload[0]));
}
_ => {}
}
}
Some(EthtoolEvent::LinkModesChanged {
ifname: modes.ifname.clone(),
modes,
})
}
c if c == EthtoolCmd::LinkstateGet as u8 => {
let mut state = LinkState::default();
for (attr_type, payload) in AttrIter::new(data) {
match attr_type {
t if t == EthtoolLinkstateAttr::Header as u16 => {
let (name, idx) = parse_header(payload);
state.ifname = name;
state.ifindex = idx;
}
t if t == EthtoolLinkstateAttr::Link as u16 && !payload.is_empty() => {
state.link = payload[0] != 0;
}
t if t == EthtoolLinkstateAttr::Sqi as u16 && payload.len() >= 4 => {
state.sqi = Some(u32::from_ne_bytes(payload[..4].try_into().unwrap()));
}
t if t == EthtoolLinkstateAttr::SqiMax as u16 && payload.len() >= 4 => {
state.sqi_max = Some(u32::from_ne_bytes(payload[..4].try_into().unwrap()));
}
t if t == EthtoolLinkstateAttr::ExtState as u16 && !payload.is_empty() => {
state.ext_state = Some(LinkExtState::from_u8(payload[0]));
}
_ => {}
}
}
Some(EthtoolEvent::LinkStateChanged {
ifname: state.ifname.clone(),
state,
})
}
c if c == EthtoolCmd::FeaturesGet as u8 => {
let mut features = Features::default();
for (attr_type, payload) in AttrIter::new(data) {
if attr_type == EthtoolFeaturesAttr::Header as u16 {
let (name, idx) = parse_header(payload);
features.ifname = name;
features.ifindex = idx;
}
}
Some(EthtoolEvent::FeaturesChanged {
ifname: features.ifname.clone(),
features,
})
}
c if c == EthtoolCmd::RingsGet as u8 => {
let mut rings = Rings::default();
for (attr_type, payload) in AttrIter::new(data) {
match attr_type {
t if t == EthtoolRingsAttr::Header as u16 => {
let (name, idx) = parse_header(payload);
rings.ifname = name;
rings.ifindex = idx;
}
t if t == EthtoolRingsAttr::Rx as u16 && payload.len() >= 4 => {
rings.rx = Some(u32::from_ne_bytes(payload[..4].try_into().unwrap()));
}
t if t == EthtoolRingsAttr::Tx as u16 && payload.len() >= 4 => {
rings.tx = Some(u32::from_ne_bytes(payload[..4].try_into().unwrap()));
}
_ => {}
}
}
Some(EthtoolEvent::RingsChanged {
ifname: rings.ifname.clone(),
rings,
})
}
c if c == EthtoolCmd::ChannelsGet as u8 => {
let mut channels = Channels::default();
for (attr_type, payload) in AttrIter::new(data) {
match attr_type {
t if t == EthtoolChannelsAttr::Header as u16 => {
let (name, idx) = parse_header(payload);
channels.ifname = name;
channels.ifindex = idx;
}
t if t == EthtoolChannelsAttr::CombinedCount as u16 && payload.len() >= 4 => {
channels.combined_count =
Some(u32::from_ne_bytes(payload[..4].try_into().unwrap()));
}
_ => {}
}
}
Some(EthtoolEvent::ChannelsChanged {
ifname: channels.ifname.clone(),
channels,
})
}
c if c == EthtoolCmd::CoalesceGet as u8 => {
let mut coalesce = Coalesce::default();
for (attr_type, payload) in AttrIter::new(data) {
match attr_type {
t if t == EthtoolCoalesceAttr::Header as u16 => {
let (name, idx) = parse_header(payload);
coalesce.ifname = name;
coalesce.ifindex = idx;
}
t if t == EthtoolCoalesceAttr::RxUsecs as u16 && payload.len() >= 4 => {
coalesce.rx_usecs =
Some(u32::from_ne_bytes(payload[..4].try_into().unwrap()));
}
t if t == EthtoolCoalesceAttr::TxUsecs as u16 && payload.len() >= 4 => {
coalesce.tx_usecs =
Some(u32::from_ne_bytes(payload[..4].try_into().unwrap()));
}
_ => {}
}
}
Some(EthtoolEvent::CoalesceChanged {
ifname: coalesce.ifname.clone(),
coalesce,
})
}
c if c == EthtoolCmd::PauseGet as u8 => {
let mut pause = Pause::default();
for (attr_type, payload) in AttrIter::new(data) {
match attr_type {
t if t == EthtoolPauseAttr::Header as u16 => {
let (name, idx) = parse_header(payload);
pause.ifname = name;
pause.ifindex = idx;
}
t if t == EthtoolPauseAttr::Autoneg as u16 && !payload.is_empty() => {
pause.autoneg = Some(payload[0] != 0);
}
t if t == EthtoolPauseAttr::Rx as u16 && !payload.is_empty() => {
pause.rx = Some(payload[0] != 0);
}
t if t == EthtoolPauseAttr::Tx as u16 && !payload.is_empty() => {
pause.tx = Some(payload[0] != 0);
}
_ => {}
}
}
Some(EthtoolEvent::PauseChanged {
ifname: pause.ifname.clone(),
pause,
})
}
_ => Some(EthtoolEvent::Unknown { cmd }),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn event_subscription_is_unpin() {
fn assert_unpin<T: Unpin>() {}
assert_unpin::<EventSubscription<'_, KobjectUevent>>();
assert_unpin::<EventSubscription<'_, SELinux>>();
}
#[test]
fn owned_event_stream_is_unpin() {
fn assert_unpin<T: Unpin>() {}
assert_unpin::<OwnedEventStream<KobjectUevent>>();
assert_unpin::<OwnedEventStream<SELinux>>();
}
}