use std::{
collections::HashMap,
fmt,
net::IpAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use tokio::sync::Mutex;
use tokio_stream::Stream;
use crate::netlink::{
connection::Connection,
error::Result,
events::NetworkEvent,
messages::{AddressMessage, LinkMessage, LinkStats, RouteMessage, TcMessage},
protocol::Route,
stream::OwnedEventStream,
types::{link::OperState, neigh::NeighborState},
};
#[derive(Debug, Clone)]
pub struct DiagnosticReport {
pub timestamp: Instant,
pub interfaces: Vec<InterfaceDiag>,
pub routes: RouteDiag,
pub issues: Vec<Issue>,
}
#[derive(Debug, Clone)]
pub struct InterfaceDiag {
pub name: String,
pub ifindex: u32,
pub state: OperState,
pub flags: u32,
pub mtu: Option<u32>,
pub stats: LinkStats,
pub rates: LinkRates,
pub tc: Option<TcDiag>,
pub issues: Vec<Issue>,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct LinkRates {
pub rx_bps: u64,
pub tx_bps: u64,
pub rx_pps: u64,
pub tx_pps: u64,
pub sample_duration_ms: u64,
}
impl LinkRates {
pub fn total_bps(&self) -> u64 {
self.rx_bps + self.tx_bps
}
pub fn total_pps(&self) -> u64 {
self.rx_pps + self.tx_pps
}
}
#[derive(Debug, Clone)]
pub struct TcDiag {
pub qdisc: String,
pub handle: String,
pub drops: u64,
pub overlimits: u64,
pub backlog: u32,
pub qlen: u32,
pub rate_bps: u64,
pub rate_pps: u64,
pub bytes: u64,
pub packets: u64,
}
impl TcDiag {
pub fn from_tc_message(tc: &TcMessage) -> Self {
Self {
qdisc: tc.kind().unwrap_or("unknown").to_string(),
handle: tc.handle_str(),
drops: tc.drops() as u64,
overlimits: tc.overlimits() as u64,
backlog: tc.backlog(),
qlen: tc.qlen(),
rate_bps: tc.bps() as u64,
rate_pps: tc.pps() as u64,
bytes: tc.bytes(),
packets: tc.packets(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RouteDiag {
pub ipv4_route_count: usize,
pub ipv6_route_count: usize,
pub has_default_ipv4: bool,
pub has_default_ipv6: bool,
pub default_gateway_v4: Option<IpAddr>,
pub default_gateway_v6: Option<IpAddr>,
}
#[derive(Debug, Clone)]
pub struct Issue {
pub severity: Severity,
pub category: IssueCategory,
pub message: String,
pub details: Option<String>,
pub interface: Option<String>,
pub timestamp: Instant,
}
impl fmt::Display for Issue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(ref iface) = self.interface {
write!(f, "[{}] ", iface)?;
}
write!(f, "{}", self.message)?;
if let Some(ref details) = self.details {
write!(f, " ({})", details)?;
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
pub enum Severity {
Info,
Warning,
Error,
Critical,
}
impl fmt::Display for Severity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Severity::Info => write!(f, "INFO"),
Severity::Warning => write!(f, "WARN"),
Severity::Error => write!(f, "ERROR"),
Severity::Critical => write!(f, "CRITICAL"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum IssueCategory {
LinkDown,
NoCarrier,
HighPacketLoss,
LinkErrors,
QdiscDrops,
BufferOverflow,
NoRoute,
Unreachable,
HighLatency,
NoAddress,
NoDefaultRoute,
MtuIssue,
DuplexMismatch,
}
impl fmt::Display for IssueCategory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IssueCategory::LinkDown => write!(f, "LinkDown"),
IssueCategory::NoCarrier => write!(f, "NoCarrier"),
IssueCategory::HighPacketLoss => write!(f, "HighPacketLoss"),
IssueCategory::LinkErrors => write!(f, "LinkErrors"),
IssueCategory::QdiscDrops => write!(f, "QdiscDrops"),
IssueCategory::BufferOverflow => write!(f, "BufferOverflow"),
IssueCategory::NoRoute => write!(f, "NoRoute"),
IssueCategory::Unreachable => write!(f, "Unreachable"),
IssueCategory::HighLatency => write!(f, "HighLatency"),
IssueCategory::NoAddress => write!(f, "NoAddress"),
IssueCategory::NoDefaultRoute => write!(f, "NoDefaultRoute"),
IssueCategory::MtuIssue => write!(f, "MtuIssue"),
IssueCategory::DuplexMismatch => write!(f, "DuplexMismatch"),
}
}
}
#[derive(Debug, Clone)]
pub struct ConnectivityReport {
pub destination: IpAddr,
pub route: Option<RouteInfo>,
pub output_interface: Option<String>,
pub gateway: Option<IpAddr>,
pub gateway_reachable: bool,
pub issues: Vec<Issue>,
}
#[derive(Debug, Clone)]
pub struct RouteInfo {
pub destination: String,
pub prefix_len: u8,
pub gateway: Option<IpAddr>,
pub oif: Option<u32>,
pub metric: Option<u32>,
}
#[derive(Debug, Clone)]
pub struct Bottleneck {
pub location: String,
pub bottleneck_type: BottleneckType,
pub current_rate: u64,
pub drop_rate: f64,
pub total_drops: u64,
pub recommendation: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum BottleneckType {
QdiscDrops,
InterfaceDrops,
BufferFull,
RateLimited,
HardwareErrors,
}
impl fmt::Display for BottleneckType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BottleneckType::QdiscDrops => write!(f, "Qdisc Drops"),
BottleneckType::InterfaceDrops => write!(f, "Interface Drops"),
BottleneckType::BufferFull => write!(f, "Buffer Full"),
BottleneckType::RateLimited => write!(f, "Rate Limited"),
BottleneckType::HardwareErrors => write!(f, "Hardware Errors"),
}
}
}
#[derive(Debug, Clone)]
#[must_use = "builders do nothing unless used"]
pub struct DiagnosticsConfig {
pub packet_loss_threshold: f64,
pub error_rate_threshold: f64,
pub qdisc_drop_threshold: f64,
pub backlog_threshold: u32,
pub qlen_threshold: u32,
pub skip_loopback: bool,
pub skip_down: bool,
pub min_bytes_for_rate: u64,
}
impl Default for DiagnosticsConfig {
fn default() -> Self {
Self {
packet_loss_threshold: 0.01,
error_rate_threshold: 0.001,
qdisc_drop_threshold: 0.01,
backlog_threshold: 100_000,
qlen_threshold: 1000,
skip_loopback: true,
skip_down: false,
min_bytes_for_rate: 1000,
}
}
}
pub struct Diagnostics {
conn: Connection<Route>,
config: DiagnosticsConfig,
prev_stats: Arc<Mutex<HashMap<u32, (Instant, LinkStats)>>>,
#[allow(dead_code, clippy::type_complexity)]
prev_tc_stats: Arc<Mutex<HashMap<(u32, u32), (Instant, u64, u64)>>>,
}
impl Diagnostics {
pub fn new(conn: Connection<Route>) -> Self {
Self {
conn,
config: DiagnosticsConfig::default(),
prev_stats: Arc::new(Mutex::new(HashMap::new())),
prev_tc_stats: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn with_config(conn: Connection<Route>, config: DiagnosticsConfig) -> Self {
Self {
conn,
config,
prev_stats: Arc::new(Mutex::new(HashMap::new())),
prev_tc_stats: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn config(&self) -> &DiagnosticsConfig {
&self.config
}
pub fn config_mut(&mut self) -> &mut DiagnosticsConfig {
&mut self.config
}
pub async fn scan(&self) -> Result<DiagnosticReport> {
let timestamp = Instant::now();
let mut all_issues = Vec::new();
let links = self.conn.get_links().await?;
let addresses = self.conn.get_addresses().await?;
let addr_by_ifindex: HashMap<u32, Vec<_>> = {
let mut map: HashMap<u32, Vec<_>> = HashMap::new();
for addr in addresses {
map.entry(addr.ifindex()).or_default().push(addr);
}
map
};
let qdiscs = self.conn.get_qdiscs().await?;
let qdiscs_by_ifindex: HashMap<u32, Vec<_>> = {
let mut map: HashMap<u32, Vec<_>> = HashMap::new();
for qdisc in qdiscs {
map.entry(qdisc.ifindex()).or_default().push(qdisc);
}
map
};
let all_routes = self.conn.get_routes().await.unwrap_or_default();
let ipv4_routes: Vec<_> = all_routes.iter().filter(|r| r.is_ipv4()).collect();
let ipv6_routes: Vec<_> = all_routes.iter().filter(|r| r.is_ipv6()).collect();
let routes = self.build_route_diag(&ipv4_routes, &ipv6_routes);
if !routes.has_default_ipv4 && !routes.has_default_ipv6 {
all_issues.push(Issue {
severity: Severity::Warning,
category: IssueCategory::NoDefaultRoute,
message: "No default route configured".to_string(),
details: Some("System may not have internet connectivity".to_string()),
interface: None,
timestamp,
});
}
let mut interfaces = Vec::new();
let mut prev_stats = self.prev_stats.lock().await;
for link in links {
if self.config.skip_loopback && link.is_loopback() {
continue;
}
if self.config.skip_down && !link.is_up() {
continue;
}
let ifindex = link.ifindex();
let name = link.name().unwrap_or("?").to_string();
let state = link.operstate().unwrap_or(OperState::Unknown);
let stats = link.stats().cloned().unwrap_or_default();
let rates = if let Some((prev_time, prev)) = prev_stats.get(&ifindex) {
let elapsed = prev_time.elapsed();
if elapsed.as_millis() > 0 {
let ms = elapsed.as_millis() as u64;
LinkRates {
rx_bps: (stats.rx_bytes().saturating_sub(prev.rx_bytes())) * 1000 / ms,
tx_bps: (stats.tx_bytes().saturating_sub(prev.tx_bytes())) * 1000 / ms,
rx_pps: (stats.rx_packets().saturating_sub(prev.rx_packets())) * 1000 / ms,
tx_pps: (stats.tx_packets().saturating_sub(prev.tx_packets())) * 1000 / ms,
sample_duration_ms: ms,
}
} else {
LinkRates::default()
}
} else {
LinkRates::default()
};
prev_stats.insert(ifindex, (Instant::now(), stats));
let mut issues = self.detect_link_issues(&link, &stats, &addr_by_ifindex, timestamp);
let tc = qdiscs_by_ifindex.get(&ifindex).and_then(|qs| {
qs.iter()
.find(|q| q.is_root())
.or_else(|| qs.first())
.map(|q| {
let tc_diag = TcDiag::from_tc_message(q);
issues.extend(self.detect_tc_issues(q, &name, timestamp));
tc_diag
})
});
all_issues.extend(issues.iter().cloned());
interfaces.push(InterfaceDiag {
name,
ifindex,
state,
flags: link.flags(),
mtu: link.mtu(),
stats,
rates,
tc,
issues,
});
}
Ok(DiagnosticReport {
timestamp,
interfaces,
routes,
issues: all_issues,
})
}
pub async fn scan_interface(&self, dev: &str) -> Result<InterfaceDiag> {
let timestamp = Instant::now();
let link = self.conn.get_link_by_name(dev).await?;
let link = link.ok_or_else(|| crate::netlink::error::Error::interface_not_found(dev))?;
let ifindex = link.ifindex();
let name = dev.to_string();
let state = link.operstate().unwrap_or(OperState::Unknown);
let stats = link.stats().cloned().unwrap_or_default();
let addresses = self.conn.get_addresses_by_name(dev).await?;
let addr_by_ifindex: HashMap<u32, Vec<_>> = {
let mut map: HashMap<u32, Vec<_>> = HashMap::new();
for addr in addresses {
map.entry(addr.ifindex()).or_default().push(addr);
}
map
};
let mut prev_stats = self.prev_stats.lock().await;
let rates = if let Some((prev_time, prev)) = prev_stats.get(&ifindex) {
let elapsed = prev_time.elapsed();
if elapsed.as_millis() > 0 {
let ms = elapsed.as_millis() as u64;
LinkRates {
rx_bps: (stats.rx_bytes().saturating_sub(prev.rx_bytes())) * 1000 / ms,
tx_bps: (stats.tx_bytes().saturating_sub(prev.tx_bytes())) * 1000 / ms,
rx_pps: (stats.rx_packets().saturating_sub(prev.rx_packets())) * 1000 / ms,
tx_pps: (stats.tx_packets().saturating_sub(prev.tx_packets())) * 1000 / ms,
sample_duration_ms: ms,
}
} else {
LinkRates::default()
}
} else {
LinkRates::default()
};
prev_stats.insert(ifindex, (Instant::now(), stats));
let mut issues = self.detect_link_issues(&link, &stats, &addr_by_ifindex, timestamp);
let qdiscs = self.conn.get_qdiscs_by_name(dev).await?;
let tc = qdiscs
.iter()
.find(|q| q.is_root())
.or(qdiscs.first())
.map(|q| {
let tc_diag = TcDiag::from_tc_message(q);
issues.extend(self.detect_tc_issues(q, &name, timestamp));
tc_diag
});
Ok(InterfaceDiag {
name,
ifindex,
state,
flags: link.flags(),
mtu: link.mtu(),
stats,
rates,
tc,
issues,
})
}
pub async fn check_connectivity(&self, dest: IpAddr) -> Result<ConnectivityReport> {
let timestamp = Instant::now();
let mut issues = Vec::new();
let all_routes = self.conn.get_routes().await?;
let routes: Vec<_> = match dest {
IpAddr::V4(_) => all_routes.iter().filter(|r| r.is_ipv4()).collect(),
IpAddr::V6(_) => all_routes.iter().filter(|r| r.is_ipv6()).collect(),
};
let matching_route = routes.iter().find(|r| {
if let Some(dst) = &r.destination {
if r.dst_len() == 0 {
return true;
}
match (dest, dst) {
(IpAddr::V4(d), IpAddr::V4(p)) => {
let prefix_len = r.dst_len();
let mask = if prefix_len >= 32 {
u32::MAX
} else {
u32::MAX << (32 - prefix_len)
};
(u32::from(d) & mask) == (u32::from(*p) & mask)
}
(IpAddr::V6(d), IpAddr::V6(p)) => {
let d_bytes = d.octets();
let p_bytes = p.octets();
let prefix_len = r.dst_len();
let full_bytes = (prefix_len / 8) as usize;
let remaining_bits = prefix_len % 8;
if d_bytes[..full_bytes] != p_bytes[..full_bytes] {
return false;
}
if remaining_bits > 0 && full_bytes < 16 {
let mask = 0xFF << (8 - remaining_bits);
(d_bytes[full_bytes] & mask) == (p_bytes[full_bytes] & mask)
} else {
true
}
}
_ => false,
}
} else {
r.dst_len() == 0
}
});
let (route, gateway, output_interface, oif) = if let Some(r) = matching_route {
let gateway = r.gateway;
let oif = r.oif;
let output_interface = if let Some(idx) = oif {
self.conn
.get_link_by_index(idx)
.await?
.and_then(|l| l.name().map(|s| s.to_string()))
} else {
None
};
let route_info = RouteInfo {
destination: r
.destination
.map(|d| d.to_string())
.unwrap_or_else(|| "default".to_string()),
prefix_len: r.dst_len(),
gateway,
oif,
metric: r.priority(),
};
(Some(route_info), gateway, output_interface, oif)
} else {
issues.push(Issue {
severity: Severity::Error,
category: IssueCategory::NoRoute,
message: format!("No route to {}", dest),
details: None,
interface: None,
timestamp,
});
(None, None, None, None)
};
let gateway_reachable = if let Some(gw) = gateway {
let neighbors = self.conn.get_neighbors().await.unwrap_or_default();
neighbors.iter().any(|n| {
n.destination == Some(gw)
&& n.state() != NeighborState::Incomplete
&& n.state() != NeighborState::Failed
})
} else {
true };
if gateway.is_some() && !gateway_reachable {
issues.push(Issue {
severity: Severity::Warning,
category: IssueCategory::Unreachable,
message: format!("Gateway {:?} may be unreachable", gateway),
details: Some("Not found in neighbor cache or in failed state".to_string()),
interface: output_interface.clone(),
timestamp,
});
}
if let Some(idx) = oif
&& let Some(link) = self.conn.get_link_by_index(idx).await?
&& !link.is_up()
{
issues.push(Issue {
severity: Severity::Error,
category: IssueCategory::LinkDown,
message: format!("Output interface {} is down", link.name().unwrap_or("?")),
details: None,
interface: link.name().map(|s| s.to_string()),
timestamp,
});
}
Ok(ConnectivityReport {
destination: dest,
route,
output_interface,
gateway,
gateway_reachable,
issues,
})
}
pub async fn find_bottleneck(&self) -> Result<Option<Bottleneck>> {
let mut bottlenecks = Vec::new();
let links = self.conn.get_links().await?;
for link in &links {
if link.is_loopback() {
continue;
}
let name = link.name().unwrap_or("?");
if let Some(stats) = link.stats() {
let total_packets = stats.total_packets();
let total_dropped = stats.total_dropped();
let total_errors = stats.total_errors();
if total_packets > 0 {
let drop_rate = total_dropped as f64 / total_packets as f64;
if drop_rate > self.config.packet_loss_threshold {
bottlenecks.push(Bottleneck {
location: format!("{} interface", name),
bottleneck_type: BottleneckType::InterfaceDrops,
current_rate: 0,
drop_rate,
total_drops: total_dropped,
recommendation: format!(
"Check {} for hardware issues or increase buffer sizes",
name
),
});
}
if total_errors > 0 {
let error_rate = total_errors as f64 / total_packets as f64;
if error_rate > self.config.error_rate_threshold {
bottlenecks.push(Bottleneck {
location: format!("{} interface", name),
bottleneck_type: BottleneckType::HardwareErrors,
current_rate: 0,
drop_rate: error_rate,
total_drops: total_errors,
recommendation: format!(
"Check cable, PHY settings, or NIC on {}",
name
),
});
}
}
}
}
}
let qdiscs = self.conn.get_qdiscs().await?;
let names = self.conn.get_interface_names().await?;
for qdisc in &qdiscs {
if !qdisc.is_root() {
continue;
}
let name = names
.get(&qdisc.ifindex())
.map(|s| s.as_str())
.unwrap_or("?");
let drops = qdisc.drops() as u64;
let packets = qdisc.packets();
if packets > 0 {
let drop_rate = drops as f64 / packets as f64;
if drop_rate > self.config.qdisc_drop_threshold {
bottlenecks.push(Bottleneck {
location: format!(
"{} egress qdisc ({})",
name,
qdisc.kind().unwrap_or("?")
),
bottleneck_type: BottleneckType::QdiscDrops,
current_rate: qdisc.bps() as u64,
drop_rate,
total_drops: drops,
recommendation: format!(
"Increase qdisc limit or rate on {}, or switch to a different qdisc",
name
),
});
}
}
let backlog = qdisc.backlog();
let qlen = qdisc.qlen();
if backlog > self.config.backlog_threshold || qlen > self.config.qlen_threshold {
bottlenecks.push(Bottleneck {
location: format!("{} egress qdisc ({})", name, qdisc.kind().unwrap_or("?")),
bottleneck_type: BottleneckType::BufferFull,
current_rate: qdisc.bps() as u64,
drop_rate: 0.0,
total_drops: drops,
recommendation: format!(
"High queue depth on {} - consider reducing buffering or increasing rate",
name
),
});
}
}
bottlenecks.sort_by(|a, b| {
b.drop_rate
.partial_cmp(&a.drop_rate)
.unwrap_or(std::cmp::Ordering::Equal)
});
Ok(bottlenecks.into_iter().next())
}
pub async fn watch(&self) -> Result<IssueStream> {
let mut conn = Connection::<Route>::new()?;
conn.subscribe_all()?;
Ok(IssueStream {
events: conn.into_events(),
config: self.config.clone(),
})
}
fn build_route_diag(
&self,
ipv4_routes: &[&RouteMessage],
ipv6_routes: &[&RouteMessage],
) -> RouteDiag {
let mut diag = RouteDiag {
ipv4_route_count: ipv4_routes.len(),
ipv6_route_count: ipv6_routes.len(),
..Default::default()
};
for route in ipv4_routes {
if route.dst_len() == 0 {
diag.has_default_ipv4 = true;
diag.default_gateway_v4 = route.gateway;
break;
}
}
for route in ipv6_routes {
if route.dst_len() == 0 {
diag.has_default_ipv6 = true;
diag.default_gateway_v6 = route.gateway;
break;
}
}
diag
}
fn detect_link_issues(
&self,
link: &LinkMessage,
stats: &LinkStats,
addr_by_ifindex: &HashMap<u32, Vec<AddressMessage>>,
timestamp: Instant,
) -> Vec<Issue> {
let mut issues = Vec::new();
let name = link.name().unwrap_or("?").to_string();
let ifindex = link.ifindex();
if !link.is_up() {
issues.push(Issue {
severity: Severity::Warning,
category: IssueCategory::LinkDown,
message: format!("Interface {} is down", name),
details: None,
interface: Some(name.clone()),
timestamp,
});
}
if link.is_up() && !link.has_carrier() {
issues.push(Issue {
severity: Severity::Error,
category: IssueCategory::NoCarrier,
message: format!("No carrier on {}", name),
details: Some("Check cable connection".to_string()),
interface: Some(name.clone()),
timestamp,
});
}
let total_packets = stats.total_packets();
let total_dropped = stats.total_dropped();
if total_packets > self.config.min_bytes_for_rate && total_dropped > 0 {
let drop_rate = total_dropped as f64 / total_packets as f64;
if drop_rate > self.config.packet_loss_threshold {
issues.push(Issue {
severity: Severity::Warning,
category: IssueCategory::HighPacketLoss,
message: format!("{:.2}% packet loss on {}", drop_rate * 100.0, name),
details: Some(format!(
"{} dropped out of {} packets",
total_dropped, total_packets
)),
interface: Some(name.clone()),
timestamp,
});
}
}
let total_errors = stats.total_errors();
if total_packets > self.config.min_bytes_for_rate && total_errors > 0 {
let error_rate = total_errors as f64 / total_packets as f64;
if error_rate > self.config.error_rate_threshold {
issues.push(Issue {
severity: Severity::Warning,
category: IssueCategory::LinkErrors,
message: format!(
"{} errors on {} ({:.3}%)",
total_errors,
name,
error_rate * 100.0
),
details: Some(format!(
"RX errors: {}, TX errors: {}",
stats.rx_errors(),
stats.tx_errors()
)),
interface: Some(name.clone()),
timestamp,
});
}
}
if link.is_up() && !link.is_loopback() {
let has_addrs = addr_by_ifindex
.get(&ifindex)
.map(|addrs| !addrs.is_empty())
.unwrap_or(false);
if !has_addrs {
issues.push(Issue {
severity: Severity::Info,
category: IssueCategory::NoAddress,
message: format!("No IP addresses configured on {}", name),
details: None,
interface: Some(name.clone()),
timestamp,
});
}
}
issues
}
fn detect_tc_issues(&self, tc: &TcMessage, iface: &str, timestamp: Instant) -> Vec<Issue> {
let mut issues = Vec::new();
let drops = tc.drops() as u64;
let packets = tc.packets();
if packets > 0 {
let drop_rate = drops as f64 / packets as f64;
if drop_rate > self.config.qdisc_drop_threshold {
issues.push(Issue {
severity: Severity::Warning,
category: IssueCategory::QdiscDrops,
message: format!(
"Qdisc {} dropping {:.2}% of packets on {}",
tc.kind().unwrap_or("?"),
drop_rate * 100.0,
iface
),
details: Some(format!("{} drops out of {} packets", drops, packets)),
interface: Some(iface.to_string()),
timestamp,
});
}
}
if tc.backlog() > self.config.backlog_threshold {
issues.push(Issue {
severity: Severity::Warning,
category: IssueCategory::BufferOverflow,
message: format!("High backlog ({} bytes) on {} qdisc", tc.backlog(), iface),
details: Some(format!("Queue length: {} packets", tc.qlen())),
interface: Some(iface.to_string()),
timestamp,
});
}
issues
}
}
pub struct IssueStream {
events: OwnedEventStream<Route>,
#[allow(dead_code)]
config: DiagnosticsConfig,
}
impl Stream for IssueStream {
type Item = Result<Issue>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let events = Pin::new(&mut self.events);
match events.poll_next(cx) {
Poll::Ready(Some(Ok(event))) => {
if let Some(issue) = self.event_to_issue(&event) {
return Poll::Ready(Some(Ok(issue)));
}
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
return Poll::Ready(None);
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
}
impl IssueStream {
fn event_to_issue(&self, event: &NetworkEvent) -> Option<Issue> {
let timestamp = Instant::now();
match event {
NetworkEvent::DelLink(link) => {
let name = link.name().unwrap_or("?").to_string();
Some(Issue {
severity: Severity::Warning,
category: IssueCategory::LinkDown,
message: format!("Interface {} removed", name),
details: None,
interface: Some(name),
timestamp,
})
}
NetworkEvent::NewLink(link) => {
let name = link.name().unwrap_or("?").to_string();
if link.is_up() && !link.has_carrier() {
return Some(Issue {
severity: Severity::Error,
category: IssueCategory::NoCarrier,
message: format!("No carrier on {}", name),
details: Some("Check cable connection".to_string()),
interface: Some(name),
timestamp,
});
}
if let Some(state) = link.operstate()
&& (state == OperState::Down || state == OperState::LowerLayerDown)
{
return Some(Issue {
severity: Severity::Warning,
category: IssueCategory::LinkDown,
message: format!("Interface {} is {:?}", name, state),
details: None,
interface: Some(name),
timestamp,
});
}
None
}
NetworkEvent::DelAddress(addr) => {
let name = crate::util::ifname::index_to_name(addr.ifindex())
.unwrap_or_else(|_| format!("if{}", addr.ifindex()));
Some(Issue {
severity: Severity::Info,
category: IssueCategory::NoAddress,
message: format!("Address {:?} removed from {}", addr.address(), name),
details: None,
interface: Some(name),
timestamp,
})
}
NetworkEvent::DelRoute(route) => {
if route.dst_len() == 0 {
return Some(Issue {
severity: Severity::Warning,
category: IssueCategory::NoDefaultRoute,
message: "Default route removed".to_string(),
details: None,
interface: None,
timestamp,
});
}
None
}
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_severity_ordering() {
assert!(Severity::Info < Severity::Warning);
assert!(Severity::Warning < Severity::Error);
assert!(Severity::Error < Severity::Critical);
}
#[test]
fn test_issue_display() {
let issue = Issue {
severity: Severity::Warning,
category: IssueCategory::HighPacketLoss,
message: "5% packet loss".to_string(),
details: Some("Check cable".to_string()),
interface: Some("eth0".to_string()),
timestamp: Instant::now(),
};
let s = format!("{}", issue);
assert!(s.contains("eth0"));
assert!(s.contains("5% packet loss"));
assert!(s.contains("Check cable"));
}
#[test]
fn test_link_rates() {
let rates = LinkRates {
rx_bps: 1000,
tx_bps: 2000,
rx_pps: 10,
tx_pps: 20,
sample_duration_ms: 1000,
};
assert_eq!(rates.total_bps(), 3000);
assert_eq!(rates.total_pps(), 30);
}
#[test]
fn test_config_defaults() {
let config = DiagnosticsConfig::default();
assert_eq!(config.packet_loss_threshold, 0.01);
assert!(config.skip_loopback);
}
}