Skip to main content

nlink/netlink/
diagnostics.rs

1//! Network diagnostics module.
2//!
3//! Provides a unified diagnostic API that combines data from multiple sources
4//! (links, TC, routes, addresses) to provide actionable insights about network issues.
5//!
6//! # Quick Start
7//!
8//! ```ignore
9//! use nlink::netlink::{Connection, Route};
10//! use nlink::netlink::diagnostics::Diagnostics;
11//!
12//! let conn = Connection::<Route>::new()?;
13//! let diag = Diagnostics::new(conn);
14//!
15//! // Full scan
16//! let report = diag.scan().await?;
17//! for issue in &report.issues {
18//!     println!("[{:?}] {}: {}", issue.severity, issue.category, issue.message);
19//! }
20//!
21//! // Interface diagnostics
22//! let eth0 = diag.scan_interface("eth0").await?;
23//! println!("eth0: {} bps, {} drops", eth0.rates.tx_bps, eth0.stats.tx_dropped());
24//!
25//! // Connectivity check
26//! let report = diag.check_connectivity("8.8.8.8".parse()?).await?;
27//! if !report.issues.is_empty() {
28//!     for issue in &report.issues {
29//!         println!("  - {}", issue.message);
30//!     }
31//! }
32//!
33//! // Find bottleneck
34//! if let Some(bottleneck) = diag.find_bottleneck().await? {
35//!     println!("Bottleneck: {}", bottleneck.location);
36//!     println!("  Drop rate: {:.2}%", bottleneck.drop_rate * 100.0);
37//!     println!("  Recommendation: {}", bottleneck.recommendation);
38//! }
39//! ```
40//!
41//! # Real-time Monitoring
42//!
43//! ```ignore
44//! use tokio_stream::StreamExt;
45//!
46//! let mut issues = diag.watch().await?;
47//! while let Some(issue) = issues.next().await {
48//!     let issue = issue?;
49//!     println!("[{:?}] {}", issue.severity, issue.message);
50//! }
51//! ```
52
53use std::collections::HashMap;
54use std::fmt;
55use std::net::IpAddr;
56use std::pin::Pin;
57use std::sync::Arc;
58use std::task::{Context, Poll};
59use std::time::Instant;
60
61use tokio::sync::Mutex;
62use tokio_stream::Stream;
63
64use crate::netlink::connection::Connection;
65use crate::netlink::error::Result;
66use crate::netlink::events::NetworkEvent;
67use crate::netlink::messages::{AddressMessage, LinkMessage, LinkStats, RouteMessage, TcMessage};
68use crate::netlink::protocol::Route;
69use crate::netlink::stream::OwnedEventStream;
70use crate::netlink::types::link::OperState;
71use crate::netlink::types::neigh::NeighborState;
72
73// ============================================================================
74// Core Types
75// ============================================================================
76
77/// Network diagnostic report containing all collected information.
78#[derive(Debug, Clone)]
79pub struct DiagnosticReport {
80    /// When the report was generated.
81    pub timestamp: Instant,
82    /// Interface diagnostics.
83    pub interfaces: Vec<InterfaceDiag>,
84    /// Route diagnostics.
85    pub routes: RouteDiag,
86    /// All detected issues across the system.
87    pub issues: Vec<Issue>,
88}
89
90/// Diagnostics for a single network interface.
91#[derive(Debug, Clone)]
92pub struct InterfaceDiag {
93    /// Interface name.
94    pub name: String,
95    /// Interface index.
96    pub ifindex: u32,
97    /// Operational state.
98    pub state: OperState,
99    /// Interface flags (IFF_UP, IFF_RUNNING, etc.).
100    pub flags: u32,
101    /// MTU.
102    pub mtu: Option<u32>,
103    /// Link statistics.
104    pub stats: LinkStats,
105    /// Calculated rates (requires previous sample).
106    pub rates: LinkRates,
107    /// Traffic control diagnostics.
108    pub tc: Option<TcDiag>,
109    /// Issues detected for this interface.
110    pub issues: Vec<Issue>,
111}
112
113/// Link transfer rates calculated from statistics deltas.
114#[derive(Debug, Clone, Copy, Default)]
115pub struct LinkRates {
116    /// Receive bytes per second.
117    pub rx_bps: u64,
118    /// Transmit bytes per second.
119    pub tx_bps: u64,
120    /// Receive packets per second.
121    pub rx_pps: u64,
122    /// Transmit packets per second.
123    pub tx_pps: u64,
124    /// Sample duration in milliseconds.
125    pub sample_duration_ms: u64,
126}
127
128impl LinkRates {
129    /// Total bits per second (rx + tx).
130    pub fn total_bps(&self) -> u64 {
131        self.rx_bps + self.tx_bps
132    }
133
134    /// Total packets per second (rx + tx).
135    pub fn total_pps(&self) -> u64 {
136        self.rx_pps + self.tx_pps
137    }
138}
139
140/// Traffic control diagnostics for an interface.
141#[derive(Debug, Clone)]
142pub struct TcDiag {
143    /// Qdisc type (e.g., "fq_codel", "htb", "netem").
144    pub qdisc: String,
145    /// Qdisc handle as string (e.g., "1:0").
146    pub handle: String,
147    /// Total drops from this qdisc.
148    pub drops: u64,
149    /// Overlimit count.
150    pub overlimits: u64,
151    /// Current backlog in bytes.
152    pub backlog: u32,
153    /// Current queue length in packets.
154    pub qlen: u32,
155    /// Current rate in bytes per second (from rate estimator).
156    pub rate_bps: u64,
157    /// Current packet rate (from rate estimator).
158    pub rate_pps: u64,
159    /// Total bytes processed.
160    pub bytes: u64,
161    /// Total packets processed.
162    pub packets: u64,
163}
164
165impl TcDiag {
166    /// Create TC diagnostics from a TcMessage.
167    pub fn from_tc_message(tc: &TcMessage) -> Self {
168        Self {
169            qdisc: tc.kind().unwrap_or("unknown").to_string(),
170            handle: tc.handle_str(),
171            drops: tc.drops() as u64,
172            overlimits: tc.overlimits() as u64,
173            backlog: tc.backlog(),
174            qlen: tc.qlen(),
175            rate_bps: tc.bps() as u64,
176            rate_pps: tc.pps() as u64,
177            bytes: tc.bytes(),
178            packets: tc.packets(),
179        }
180    }
181}
182
183/// Route diagnostics summary.
184#[derive(Debug, Clone, Default)]
185pub struct RouteDiag {
186    /// Total number of IPv4 routes.
187    pub ipv4_route_count: usize,
188    /// Total number of IPv6 routes.
189    pub ipv6_route_count: usize,
190    /// Whether a default IPv4 route exists.
191    pub has_default_ipv4: bool,
192    /// Whether a default IPv6 route exists.
193    pub has_default_ipv6: bool,
194    /// Default gateway for IPv4 (if any).
195    pub default_gateway_v4: Option<IpAddr>,
196    /// Default gateway for IPv6 (if any).
197    pub default_gateway_v6: Option<IpAddr>,
198}
199
200/// A detected issue.
201#[derive(Debug, Clone)]
202pub struct Issue {
203    /// Severity level.
204    pub severity: Severity,
205    /// Category of the issue.
206    pub category: IssueCategory,
207    /// Human-readable message.
208    pub message: String,
209    /// Additional details or recommendations.
210    pub details: Option<String>,
211    /// Interface name if issue is interface-specific.
212    pub interface: Option<String>,
213    /// When the issue was detected.
214    pub timestamp: Instant,
215}
216
217impl fmt::Display for Issue {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        if let Some(ref iface) = self.interface {
220            write!(f, "[{}] ", iface)?;
221        }
222        write!(f, "{}", self.message)?;
223        if let Some(ref details) = self.details {
224            write!(f, " ({})", details)?;
225        }
226        Ok(())
227    }
228}
229
230/// Issue severity level.
231#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
232#[non_exhaustive]
233pub enum Severity {
234    /// Informational message.
235    Info,
236    /// Warning that may indicate a problem.
237    Warning,
238    /// Error that affects functionality.
239    Error,
240    /// Critical issue requiring immediate attention.
241    Critical,
242}
243
244impl fmt::Display for Severity {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        match self {
247            Severity::Info => write!(f, "INFO"),
248            Severity::Warning => write!(f, "WARN"),
249            Severity::Error => write!(f, "ERROR"),
250            Severity::Critical => write!(f, "CRITICAL"),
251        }
252    }
253}
254
255/// Category of detected issue.
256#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
257#[non_exhaustive]
258pub enum IssueCategory {
259    /// Interface is down.
260    LinkDown,
261    /// No carrier detected.
262    NoCarrier,
263    /// High packet loss rate.
264    HighPacketLoss,
265    /// RX/TX errors detected.
266    LinkErrors,
267    /// Qdisc dropping packets.
268    QdiscDrops,
269    /// Buffer overflow / backlog.
270    BufferOverflow,
271    /// No route to destination.
272    NoRoute,
273    /// Destination unreachable.
274    Unreachable,
275    /// High latency detected.
276    HighLatency,
277    /// Interface has no addresses.
278    NoAddress,
279    /// No default route configured.
280    NoDefaultRoute,
281    /// MTU mismatch or issue.
282    MtuIssue,
283    /// Duplex/speed mismatch.
284    DuplexMismatch,
285}
286
287impl fmt::Display for IssueCategory {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        match self {
290            IssueCategory::LinkDown => write!(f, "LinkDown"),
291            IssueCategory::NoCarrier => write!(f, "NoCarrier"),
292            IssueCategory::HighPacketLoss => write!(f, "HighPacketLoss"),
293            IssueCategory::LinkErrors => write!(f, "LinkErrors"),
294            IssueCategory::QdiscDrops => write!(f, "QdiscDrops"),
295            IssueCategory::BufferOverflow => write!(f, "BufferOverflow"),
296            IssueCategory::NoRoute => write!(f, "NoRoute"),
297            IssueCategory::Unreachable => write!(f, "Unreachable"),
298            IssueCategory::HighLatency => write!(f, "HighLatency"),
299            IssueCategory::NoAddress => write!(f, "NoAddress"),
300            IssueCategory::NoDefaultRoute => write!(f, "NoDefaultRoute"),
301            IssueCategory::MtuIssue => write!(f, "MtuIssue"),
302            IssueCategory::DuplexMismatch => write!(f, "DuplexMismatch"),
303        }
304    }
305}
306
307/// Connectivity check result.
308#[derive(Debug, Clone)]
309pub struct ConnectivityReport {
310    /// Destination that was checked.
311    pub destination: IpAddr,
312    /// Route information if found.
313    pub route: Option<RouteInfo>,
314    /// Output interface for the route.
315    pub output_interface: Option<String>,
316    /// Gateway address if any.
317    pub gateway: Option<IpAddr>,
318    /// Whether the gateway is reachable (based on neighbor state).
319    pub gateway_reachable: bool,
320    /// Issues detected during connectivity check.
321    pub issues: Vec<Issue>,
322}
323
324/// Basic route information.
325#[derive(Debug, Clone)]
326pub struct RouteInfo {
327    /// Destination prefix.
328    pub destination: String,
329    /// Prefix length.
330    pub prefix_len: u8,
331    /// Gateway address.
332    pub gateway: Option<IpAddr>,
333    /// Output interface index.
334    pub oif: Option<u32>,
335    /// Route metric.
336    pub metric: Option<u32>,
337}
338
339/// Bottleneck analysis result.
340#[derive(Debug, Clone)]
341pub struct Bottleneck {
342    /// Location description (e.g., "eth0 egress qdisc").
343    pub location: String,
344    /// Type of bottleneck.
345    pub bottleneck_type: BottleneckType,
346    /// Current rate in bytes per second.
347    pub current_rate: u64,
348    /// Drop rate as fraction (0.0 to 1.0).
349    pub drop_rate: f64,
350    /// Total drops observed.
351    pub total_drops: u64,
352    /// Recommendation for fixing the bottleneck.
353    pub recommendation: String,
354}
355
356/// Type of bottleneck detected.
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
358#[non_exhaustive]
359pub enum BottleneckType {
360    /// Qdisc is dropping packets.
361    QdiscDrops,
362    /// Interface is dropping packets.
363    InterfaceDrops,
364    /// Buffer overflow.
365    BufferFull,
366    /// Rate limiting in effect.
367    RateLimited,
368    /// Hardware errors.
369    HardwareErrors,
370}
371
372impl fmt::Display for BottleneckType {
373    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374        match self {
375            BottleneckType::QdiscDrops => write!(f, "Qdisc Drops"),
376            BottleneckType::InterfaceDrops => write!(f, "Interface Drops"),
377            BottleneckType::BufferFull => write!(f, "Buffer Full"),
378            BottleneckType::RateLimited => write!(f, "Rate Limited"),
379            BottleneckType::HardwareErrors => write!(f, "Hardware Errors"),
380        }
381    }
382}
383
384// ============================================================================
385// Configuration
386// ============================================================================
387
388/// Configuration for issue detection thresholds.
389#[derive(Debug, Clone)]
390#[must_use = "builders do nothing unless used"]
391pub struct DiagnosticsConfig {
392    /// Packet loss threshold as fraction (default: 0.01 = 1%).
393    pub packet_loss_threshold: f64,
394    /// Error rate threshold as fraction (default: 0.001 = 0.1%).
395    pub error_rate_threshold: f64,
396    /// Qdisc drop threshold as fraction of packets (default: 0.01 = 1%).
397    pub qdisc_drop_threshold: f64,
398    /// Backlog threshold in bytes (default: 100KB).
399    pub backlog_threshold: u32,
400    /// Queue length threshold in packets (default: 1000).
401    pub qlen_threshold: u32,
402    /// Whether to skip loopback interfaces.
403    pub skip_loopback: bool,
404    /// Whether to skip down interfaces.
405    pub skip_down: bool,
406    /// Minimum bytes transferred before calculating loss rate.
407    pub min_bytes_for_rate: u64,
408}
409
410impl Default for DiagnosticsConfig {
411    fn default() -> Self {
412        Self {
413            packet_loss_threshold: 0.01,
414            error_rate_threshold: 0.001,
415            qdisc_drop_threshold: 0.01,
416            backlog_threshold: 100_000,
417            qlen_threshold: 1000,
418            skip_loopback: true,
419            skip_down: false,
420            min_bytes_for_rate: 1000,
421        }
422    }
423}
424
425// ============================================================================
426// Diagnostics Runner
427// ============================================================================
428
429/// Network diagnostics runner.
430///
431/// Provides methods to scan and analyze network configuration.
432pub struct Diagnostics {
433    conn: Connection<Route>,
434    config: DiagnosticsConfig,
435    /// Previous statistics for rate calculation.
436    prev_stats: Arc<Mutex<HashMap<u32, (Instant, LinkStats)>>>,
437    /// Previous TC stats for rate calculation (reserved for future use).
438    #[allow(dead_code, clippy::type_complexity)]
439    prev_tc_stats: Arc<Mutex<HashMap<(u32, u32), (Instant, u64, u64)>>>,
440}
441
442impl Diagnostics {
443    /// Create a new diagnostics runner with default configuration.
444    pub fn new(conn: Connection<Route>) -> Self {
445        Self {
446            conn,
447            config: DiagnosticsConfig::default(),
448            prev_stats: Arc::new(Mutex::new(HashMap::new())),
449            prev_tc_stats: Arc::new(Mutex::new(HashMap::new())),
450        }
451    }
452
453    /// Create a new diagnostics runner with custom configuration.
454    pub fn with_config(conn: Connection<Route>, config: DiagnosticsConfig) -> Self {
455        Self {
456            conn,
457            config,
458            prev_stats: Arc::new(Mutex::new(HashMap::new())),
459            prev_tc_stats: Arc::new(Mutex::new(HashMap::new())),
460        }
461    }
462
463    /// Get a reference to the configuration.
464    pub fn config(&self) -> &DiagnosticsConfig {
465        &self.config
466    }
467
468    /// Get a mutable reference to the configuration.
469    pub fn config_mut(&mut self) -> &mut DiagnosticsConfig {
470        &mut self.config
471    }
472
473    /// Run a full diagnostic scan of all interfaces and routes.
474    pub async fn scan(&self) -> Result<DiagnosticReport> {
475        let timestamp = Instant::now();
476        let mut all_issues = Vec::new();
477
478        // Get all links
479        let links = self.conn.get_links().await?;
480
481        // Get all addresses for checking
482        let addresses = self.conn.get_addresses().await?;
483        let addr_by_ifindex: HashMap<u32, Vec<_>> = {
484            let mut map: HashMap<u32, Vec<_>> = HashMap::new();
485            for addr in addresses {
486                map.entry(addr.ifindex()).or_default().push(addr);
487            }
488            map
489        };
490
491        // Get all qdiscs
492        let qdiscs = self.conn.get_qdiscs().await?;
493        let qdiscs_by_ifindex: HashMap<u32, Vec<_>> = {
494            let mut map: HashMap<u32, Vec<_>> = HashMap::new();
495            for qdisc in qdiscs {
496                map.entry(qdisc.ifindex()).or_default().push(qdisc);
497            }
498            map
499        };
500
501        // Get routes and split by family
502        let all_routes = self.conn.get_routes().await.unwrap_or_default();
503        let ipv4_routes: Vec<_> = all_routes.iter().filter(|r| r.is_ipv4()).collect();
504        let ipv6_routes: Vec<_> = all_routes.iter().filter(|r| r.is_ipv6()).collect();
505
506        // Build route diagnostics
507        let routes = self.build_route_diag(&ipv4_routes, &ipv6_routes);
508
509        // Check for missing default route
510        if !routes.has_default_ipv4 && !routes.has_default_ipv6 {
511            all_issues.push(Issue {
512                severity: Severity::Warning,
513                category: IssueCategory::NoDefaultRoute,
514                message: "No default route configured".to_string(),
515                details: Some("System may not have internet connectivity".to_string()),
516                interface: None,
517                timestamp,
518            });
519        }
520
521        // Scan each interface
522        let mut interfaces = Vec::new();
523        let mut prev_stats = self.prev_stats.lock().await;
524
525        for link in links {
526            // Skip loopback if configured
527            if self.config.skip_loopback && link.is_loopback() {
528                continue;
529            }
530
531            // Skip down interfaces if configured
532            if self.config.skip_down && !link.is_up() {
533                continue;
534            }
535
536            let ifindex = link.ifindex();
537            let name = link.name().unwrap_or("?").to_string();
538            let state = link.operstate().unwrap_or(OperState::Unknown);
539            let stats = link.stats().cloned().unwrap_or_default();
540
541            // Calculate rates
542            let rates = if let Some((prev_time, prev)) = prev_stats.get(&ifindex) {
543                let elapsed = prev_time.elapsed();
544                if elapsed.as_millis() > 0 {
545                    let ms = elapsed.as_millis() as u64;
546                    LinkRates {
547                        rx_bps: (stats.rx_bytes().saturating_sub(prev.rx_bytes())) * 1000 / ms,
548                        tx_bps: (stats.tx_bytes().saturating_sub(prev.tx_bytes())) * 1000 / ms,
549                        rx_pps: (stats.rx_packets().saturating_sub(prev.rx_packets())) * 1000 / ms,
550                        tx_pps: (stats.tx_packets().saturating_sub(prev.tx_packets())) * 1000 / ms,
551                        sample_duration_ms: ms,
552                    }
553                } else {
554                    LinkRates::default()
555                }
556            } else {
557                LinkRates::default()
558            };
559
560            // Store current stats for next calculation
561            prev_stats.insert(ifindex, (Instant::now(), stats));
562
563            // Detect issues for this interface
564            let mut issues = self.detect_link_issues(&link, &stats, &addr_by_ifindex, timestamp);
565
566            // Get TC diagnostics
567            let tc = qdiscs_by_ifindex.get(&ifindex).and_then(|qs| {
568                // Find the root qdisc
569                qs.iter()
570                    .find(|q| q.is_root())
571                    .or_else(|| qs.first())
572                    .map(|q| {
573                        let tc_diag = TcDiag::from_tc_message(q);
574                        // Check for TC issues
575                        issues.extend(self.detect_tc_issues(q, &name, timestamp));
576                        tc_diag
577                    })
578            });
579
580            // Add interface issues to global list
581            all_issues.extend(issues.iter().cloned());
582
583            interfaces.push(InterfaceDiag {
584                name,
585                ifindex,
586                state,
587                flags: link.flags(),
588                mtu: link.mtu(),
589                stats,
590                rates,
591                tc,
592                issues,
593            });
594        }
595
596        Ok(DiagnosticReport {
597            timestamp,
598            interfaces,
599            routes,
600            issues: all_issues,
601        })
602    }
603
604    /// Diagnose a specific interface.
605    pub async fn scan_interface(&self, dev: &str) -> Result<InterfaceDiag> {
606        let timestamp = Instant::now();
607
608        // Get the link
609        let link = self.conn.get_link_by_name(dev).await?;
610        let link = link.ok_or_else(|| crate::netlink::error::Error::interface_not_found(dev))?;
611
612        let ifindex = link.ifindex();
613        let name = dev.to_string();
614        let state = link.operstate().unwrap_or(OperState::Unknown);
615        let stats = link.stats().cloned().unwrap_or_default();
616
617        // Get addresses
618        let addresses = self.conn.get_addresses_by_name(dev).await?;
619        let addr_by_ifindex: HashMap<u32, Vec<_>> = {
620            let mut map: HashMap<u32, Vec<_>> = HashMap::new();
621            for addr in addresses {
622                map.entry(addr.ifindex()).or_default().push(addr);
623            }
624            map
625        };
626
627        // Calculate rates
628        let mut prev_stats = self.prev_stats.lock().await;
629        let rates = if let Some((prev_time, prev)) = prev_stats.get(&ifindex) {
630            let elapsed = prev_time.elapsed();
631            if elapsed.as_millis() > 0 {
632                let ms = elapsed.as_millis() as u64;
633                LinkRates {
634                    rx_bps: (stats.rx_bytes().saturating_sub(prev.rx_bytes())) * 1000 / ms,
635                    tx_bps: (stats.tx_bytes().saturating_sub(prev.tx_bytes())) * 1000 / ms,
636                    rx_pps: (stats.rx_packets().saturating_sub(prev.rx_packets())) * 1000 / ms,
637                    tx_pps: (stats.tx_packets().saturating_sub(prev.tx_packets())) * 1000 / ms,
638                    sample_duration_ms: ms,
639                }
640            } else {
641                LinkRates::default()
642            }
643        } else {
644            LinkRates::default()
645        };
646
647        prev_stats.insert(ifindex, (Instant::now(), stats));
648
649        // Detect issues
650        let mut issues = self.detect_link_issues(&link, &stats, &addr_by_ifindex, timestamp);
651
652        // Get TC diagnostics
653        let qdiscs = self.conn.get_qdiscs_by_name(dev).await?;
654        let tc = qdiscs
655            .iter()
656            .find(|q| q.is_root())
657            .or(qdiscs.first())
658            .map(|q| {
659                let tc_diag = TcDiag::from_tc_message(q);
660                issues.extend(self.detect_tc_issues(q, &name, timestamp));
661                tc_diag
662            });
663
664        Ok(InterfaceDiag {
665            name,
666            ifindex,
667            state,
668            flags: link.flags(),
669            mtu: link.mtu(),
670            stats,
671            rates,
672            tc,
673            issues,
674        })
675    }
676
677    /// Check connectivity to a destination IP address.
678    pub async fn check_connectivity(&self, dest: IpAddr) -> Result<ConnectivityReport> {
679        let timestamp = Instant::now();
680        let mut issues = Vec::new();
681
682        // Get routes and filter by address family
683        let all_routes = self.conn.get_routes().await?;
684        let routes: Vec<_> = match dest {
685            IpAddr::V4(_) => all_routes.iter().filter(|r| r.is_ipv4()).collect(),
686            IpAddr::V6(_) => all_routes.iter().filter(|r| r.is_ipv6()).collect(),
687        };
688
689        // Find matching route (simple longest prefix match)
690        let matching_route = routes.iter().find(|r| {
691            if let Some(dst) = &r.destination {
692                // For default route (0.0.0.0/0 or ::/0)
693                if r.dst_len() == 0 {
694                    return true;
695                }
696                // Check if destination matches (simplified)
697                match (dest, dst) {
698                    (IpAddr::V4(d), IpAddr::V4(p)) => {
699                        let prefix_len = r.dst_len();
700                        let mask = if prefix_len >= 32 {
701                            u32::MAX
702                        } else {
703                            u32::MAX << (32 - prefix_len)
704                        };
705                        (u32::from(d) & mask) == (u32::from(*p) & mask)
706                    }
707                    (IpAddr::V6(d), IpAddr::V6(p)) => {
708                        let d_bytes = d.octets();
709                        let p_bytes = p.octets();
710                        let prefix_len = r.dst_len();
711                        let full_bytes = (prefix_len / 8) as usize;
712                        let remaining_bits = prefix_len % 8;
713
714                        if d_bytes[..full_bytes] != p_bytes[..full_bytes] {
715                            return false;
716                        }
717
718                        if remaining_bits > 0 && full_bytes < 16 {
719                            let mask = 0xFF << (8 - remaining_bits);
720                            (d_bytes[full_bytes] & mask) == (p_bytes[full_bytes] & mask)
721                        } else {
722                            true
723                        }
724                    }
725                    _ => false,
726                }
727            } else {
728                // Default route
729                r.dst_len() == 0
730            }
731        });
732
733        let (route, gateway, output_interface, oif) = if let Some(r) = matching_route {
734            let gateway = r.gateway;
735            let oif = r.oif;
736            let output_interface = if let Some(idx) = oif {
737                self.conn
738                    .get_link_by_index(idx)
739                    .await?
740                    .and_then(|l| l.name().map(|s| s.to_string()))
741            } else {
742                None
743            };
744
745            let route_info = RouteInfo {
746                destination: r
747                    .destination
748                    .map(|d| d.to_string())
749                    .unwrap_or_else(|| "default".to_string()),
750                prefix_len: r.dst_len(),
751                gateway,
752                oif,
753                metric: r.priority(),
754            };
755
756            (Some(route_info), gateway, output_interface, oif)
757        } else {
758            issues.push(Issue {
759                severity: Severity::Error,
760                category: IssueCategory::NoRoute,
761                message: format!("No route to {}", dest),
762                details: None,
763                interface: None,
764                timestamp,
765            });
766            (None, None, None, None)
767        };
768
769        // Check if gateway is reachable via neighbor cache
770        let gateway_reachable = if let Some(gw) = gateway {
771            // Check neighbor cache
772            let neighbors = self.conn.get_neighbors().await.unwrap_or_default();
773            neighbors.iter().any(|n| {
774                n.destination == Some(gw)
775                    && n.state() != NeighborState::Incomplete
776                    && n.state() != NeighborState::Failed
777            })
778        } else {
779            true // No gateway means direct route
780        };
781
782        if gateway.is_some() && !gateway_reachable {
783            issues.push(Issue {
784                severity: Severity::Warning,
785                category: IssueCategory::Unreachable,
786                message: format!("Gateway {:?} may be unreachable", gateway),
787                details: Some("Not found in neighbor cache or in failed state".to_string()),
788                interface: output_interface.clone(),
789                timestamp,
790            });
791        }
792
793        // Check if output interface is up
794        if let Some(idx) = oif
795            && let Some(link) = self.conn.get_link_by_index(idx).await?
796            && !link.is_up()
797        {
798            issues.push(Issue {
799                severity: Severity::Error,
800                category: IssueCategory::LinkDown,
801                message: format!("Output interface {} is down", link.name().unwrap_or("?")),
802                details: None,
803                interface: link.name().map(|s| s.to_string()),
804                timestamp,
805            });
806        }
807
808        Ok(ConnectivityReport {
809            destination: dest,
810            route,
811            output_interface,
812            gateway,
813            gateway_reachable,
814            issues,
815        })
816    }
817
818    /// Find the most significant bottleneck in the system.
819    pub async fn find_bottleneck(&self) -> Result<Option<Bottleneck>> {
820        let mut bottlenecks = Vec::new();
821
822        // Check all interfaces
823        let links = self.conn.get_links().await?;
824
825        for link in &links {
826            if link.is_loopback() {
827                continue;
828            }
829
830            let name = link.name().unwrap_or("?");
831
832            if let Some(stats) = link.stats() {
833                let total_packets = stats.total_packets();
834                let total_dropped = stats.total_dropped();
835                let total_errors = stats.total_errors();
836
837                if total_packets > 0 {
838                    let drop_rate = total_dropped as f64 / total_packets as f64;
839
840                    if drop_rate > self.config.packet_loss_threshold {
841                        bottlenecks.push(Bottleneck {
842                            location: format!("{} interface", name),
843                            bottleneck_type: BottleneckType::InterfaceDrops,
844                            current_rate: 0,
845                            drop_rate,
846                            total_drops: total_dropped,
847                            recommendation: format!(
848                                "Check {} for hardware issues or increase buffer sizes",
849                                name
850                            ),
851                        });
852                    }
853
854                    if total_errors > 0 {
855                        let error_rate = total_errors as f64 / total_packets as f64;
856                        if error_rate > self.config.error_rate_threshold {
857                            bottlenecks.push(Bottleneck {
858                                location: format!("{} interface", name),
859                                bottleneck_type: BottleneckType::HardwareErrors,
860                                current_rate: 0,
861                                drop_rate: error_rate,
862                                total_drops: total_errors,
863                                recommendation: format!(
864                                    "Check cable, PHY settings, or NIC on {}",
865                                    name
866                                ),
867                            });
868                        }
869                    }
870                }
871            }
872        }
873
874        // Check all qdiscs
875        let qdiscs = self.conn.get_qdiscs().await?;
876        let names = self.conn.get_interface_names().await?;
877
878        for qdisc in &qdiscs {
879            if !qdisc.is_root() {
880                continue;
881            }
882
883            let name = names
884                .get(&qdisc.ifindex())
885                .map(|s| s.as_str())
886                .unwrap_or("?");
887
888            let drops = qdisc.drops() as u64;
889            let packets = qdisc.packets();
890
891            if packets > 0 {
892                let drop_rate = drops as f64 / packets as f64;
893
894                if drop_rate > self.config.qdisc_drop_threshold {
895                    bottlenecks.push(Bottleneck {
896                        location: format!(
897                            "{} egress qdisc ({})",
898                            name,
899                            qdisc.kind().unwrap_or("?")
900                        ),
901                        bottleneck_type: BottleneckType::QdiscDrops,
902                        current_rate: qdisc.bps() as u64,
903                        drop_rate,
904                        total_drops: drops,
905                        recommendation: format!(
906                            "Increase qdisc limit or rate on {}, or switch to a different qdisc",
907                            name
908                        ),
909                    });
910                }
911            }
912
913            // Check for buffer issues
914            let backlog = qdisc.backlog();
915            let qlen = qdisc.qlen();
916
917            if backlog > self.config.backlog_threshold || qlen > self.config.qlen_threshold {
918                bottlenecks.push(Bottleneck {
919                    location: format!("{} egress qdisc ({})", name, qdisc.kind().unwrap_or("?")),
920                    bottleneck_type: BottleneckType::BufferFull,
921                    current_rate: qdisc.bps() as u64,
922                    drop_rate: 0.0,
923                    total_drops: drops,
924                    recommendation: format!(
925                        "High queue depth on {} - consider reducing buffering or increasing rate",
926                        name
927                    ),
928                });
929            }
930        }
931
932        // Return the worst bottleneck (highest drop rate)
933        bottlenecks.sort_by(|a, b| {
934            b.drop_rate
935                .partial_cmp(&a.drop_rate)
936                .unwrap_or(std::cmp::Ordering::Equal)
937        });
938        Ok(bottlenecks.into_iter().next())
939    }
940
941    /// Watch for issues in real-time.
942    ///
943    /// Returns a stream of issues detected from network events.
944    pub async fn watch(&self) -> Result<IssueStream> {
945        let mut conn = Connection::<Route>::new()?;
946        conn.subscribe_all()?;
947        Ok(IssueStream {
948            events: conn.into_events(),
949            config: self.config.clone(),
950        })
951    }
952
953    // ========================================================================
954    // Private helpers
955    // ========================================================================
956
957    fn build_route_diag(
958        &self,
959        ipv4_routes: &[&RouteMessage],
960        ipv6_routes: &[&RouteMessage],
961    ) -> RouteDiag {
962        let mut diag = RouteDiag {
963            ipv4_route_count: ipv4_routes.len(),
964            ipv6_route_count: ipv6_routes.len(),
965            ..Default::default()
966        };
967
968        // Find default IPv4 route
969        for route in ipv4_routes {
970            if route.dst_len() == 0 {
971                diag.has_default_ipv4 = true;
972                diag.default_gateway_v4 = route.gateway;
973                break;
974            }
975        }
976
977        // Find default IPv6 route
978        for route in ipv6_routes {
979            if route.dst_len() == 0 {
980                diag.has_default_ipv6 = true;
981                diag.default_gateway_v6 = route.gateway;
982                break;
983            }
984        }
985
986        diag
987    }
988
989    fn detect_link_issues(
990        &self,
991        link: &LinkMessage,
992        stats: &LinkStats,
993        addr_by_ifindex: &HashMap<u32, Vec<AddressMessage>>,
994        timestamp: Instant,
995    ) -> Vec<Issue> {
996        let mut issues = Vec::new();
997        let name = link.name().unwrap_or("?").to_string();
998        let ifindex = link.ifindex();
999
1000        // Check if interface is down
1001        if !link.is_up() {
1002            issues.push(Issue {
1003                severity: Severity::Warning,
1004                category: IssueCategory::LinkDown,
1005                message: format!("Interface {} is down", name),
1006                details: None,
1007                interface: Some(name.clone()),
1008                timestamp,
1009            });
1010        }
1011
1012        // Check carrier
1013        if link.is_up() && !link.has_carrier() {
1014            issues.push(Issue {
1015                severity: Severity::Error,
1016                category: IssueCategory::NoCarrier,
1017                message: format!("No carrier on {}", name),
1018                details: Some("Check cable connection".to_string()),
1019                interface: Some(name.clone()),
1020                timestamp,
1021            });
1022        }
1023
1024        // Check for packet loss
1025        let total_packets = stats.total_packets();
1026        let total_dropped = stats.total_dropped();
1027
1028        if total_packets > self.config.min_bytes_for_rate && total_dropped > 0 {
1029            let drop_rate = total_dropped as f64 / total_packets as f64;
1030            if drop_rate > self.config.packet_loss_threshold {
1031                issues.push(Issue {
1032                    severity: Severity::Warning,
1033                    category: IssueCategory::HighPacketLoss,
1034                    message: format!("{:.2}% packet loss on {}", drop_rate * 100.0, name),
1035                    details: Some(format!(
1036                        "{} dropped out of {} packets",
1037                        total_dropped, total_packets
1038                    )),
1039                    interface: Some(name.clone()),
1040                    timestamp,
1041                });
1042            }
1043        }
1044
1045        // Check for errors
1046        let total_errors = stats.total_errors();
1047        if total_packets > self.config.min_bytes_for_rate && total_errors > 0 {
1048            let error_rate = total_errors as f64 / total_packets as f64;
1049            if error_rate > self.config.error_rate_threshold {
1050                issues.push(Issue {
1051                    severity: Severity::Warning,
1052                    category: IssueCategory::LinkErrors,
1053                    message: format!(
1054                        "{} errors on {} ({:.3}%)",
1055                        total_errors,
1056                        name,
1057                        error_rate * 100.0
1058                    ),
1059                    details: Some(format!(
1060                        "RX errors: {}, TX errors: {}",
1061                        stats.rx_errors(),
1062                        stats.tx_errors()
1063                    )),
1064                    interface: Some(name.clone()),
1065                    timestamp,
1066                });
1067            }
1068        }
1069
1070        // Check for missing addresses (skip loopback)
1071        if link.is_up() && !link.is_loopback() {
1072            let has_addrs = addr_by_ifindex
1073                .get(&ifindex)
1074                .map(|addrs| !addrs.is_empty())
1075                .unwrap_or(false);
1076            if !has_addrs {
1077                issues.push(Issue {
1078                    severity: Severity::Info,
1079                    category: IssueCategory::NoAddress,
1080                    message: format!("No IP addresses configured on {}", name),
1081                    details: None,
1082                    interface: Some(name.clone()),
1083                    timestamp,
1084                });
1085            }
1086        }
1087
1088        issues
1089    }
1090
1091    fn detect_tc_issues(&self, tc: &TcMessage, iface: &str, timestamp: Instant) -> Vec<Issue> {
1092        let mut issues = Vec::new();
1093
1094        let drops = tc.drops() as u64;
1095        let packets = tc.packets();
1096
1097        if packets > 0 {
1098            let drop_rate = drops as f64 / packets as f64;
1099            if drop_rate > self.config.qdisc_drop_threshold {
1100                issues.push(Issue {
1101                    severity: Severity::Warning,
1102                    category: IssueCategory::QdiscDrops,
1103                    message: format!(
1104                        "Qdisc {} dropping {:.2}% of packets on {}",
1105                        tc.kind().unwrap_or("?"),
1106                        drop_rate * 100.0,
1107                        iface
1108                    ),
1109                    details: Some(format!("{} drops out of {} packets", drops, packets)),
1110                    interface: Some(iface.to_string()),
1111                    timestamp,
1112                });
1113            }
1114        }
1115
1116        // Check backlog
1117        if tc.backlog() > self.config.backlog_threshold {
1118            issues.push(Issue {
1119                severity: Severity::Warning,
1120                category: IssueCategory::BufferOverflow,
1121                message: format!("High backlog ({} bytes) on {} qdisc", tc.backlog(), iface),
1122                details: Some(format!("Queue length: {} packets", tc.qlen())),
1123                interface: Some(iface.to_string()),
1124                timestamp,
1125            });
1126        }
1127
1128        issues
1129    }
1130}
1131
1132// ============================================================================
1133// Issue Stream
1134// ============================================================================
1135
1136/// Stream of issues from real-time monitoring.
1137pub struct IssueStream {
1138    events: OwnedEventStream<Route>,
1139    /// Configuration for issue thresholds (reserved for future filtering).
1140    #[allow(dead_code)]
1141    config: DiagnosticsConfig,
1142}
1143
1144impl Stream for IssueStream {
1145    type Item = Result<Issue>;
1146
1147    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1148        loop {
1149            let events = Pin::new(&mut self.events);
1150
1151            match events.poll_next(cx) {
1152                Poll::Ready(Some(Ok(event))) => {
1153                    if let Some(issue) = self.event_to_issue(&event) {
1154                        return Poll::Ready(Some(Ok(issue)));
1155                    }
1156                    // Continue polling for next event
1157                }
1158                Poll::Ready(Some(Err(e))) => {
1159                    return Poll::Ready(Some(Err(e)));
1160                }
1161                Poll::Ready(None) => {
1162                    return Poll::Ready(None);
1163                }
1164                Poll::Pending => {
1165                    return Poll::Pending;
1166                }
1167            }
1168        }
1169    }
1170}
1171
1172impl IssueStream {
1173    fn event_to_issue(&self, event: &NetworkEvent) -> Option<Issue> {
1174        let timestamp = Instant::now();
1175
1176        match event {
1177            NetworkEvent::DelLink(link) => {
1178                let name = link.name().unwrap_or("?").to_string();
1179                Some(Issue {
1180                    severity: Severity::Warning,
1181                    category: IssueCategory::LinkDown,
1182                    message: format!("Interface {} removed", name),
1183                    details: None,
1184                    interface: Some(name),
1185                    timestamp,
1186                })
1187            }
1188            NetworkEvent::NewLink(link) => {
1189                let name = link.name().unwrap_or("?").to_string();
1190
1191                // Check for carrier issues on new/changed links
1192                if link.is_up() && !link.has_carrier() {
1193                    return Some(Issue {
1194                        severity: Severity::Error,
1195                        category: IssueCategory::NoCarrier,
1196                        message: format!("No carrier on {}", name),
1197                        details: Some("Check cable connection".to_string()),
1198                        interface: Some(name),
1199                        timestamp,
1200                    });
1201                }
1202
1203                // Check for operstate changes
1204                if let Some(state) = link.operstate()
1205                    && (state == OperState::Down || state == OperState::LowerLayerDown)
1206                {
1207                    return Some(Issue {
1208                        severity: Severity::Warning,
1209                        category: IssueCategory::LinkDown,
1210                        message: format!("Interface {} is {:?}", name, state),
1211                        details: None,
1212                        interface: Some(name),
1213                        timestamp,
1214                    });
1215                }
1216
1217                None
1218            }
1219            NetworkEvent::DelAddress(addr) => {
1220                let name = crate::util::ifname::index_to_name(addr.ifindex())
1221                    .unwrap_or_else(|_| format!("if{}", addr.ifindex()));
1222                Some(Issue {
1223                    severity: Severity::Info,
1224                    category: IssueCategory::NoAddress,
1225                    message: format!("Address {:?} removed from {}", addr.address(), name),
1226                    details: None,
1227                    interface: Some(name),
1228                    timestamp,
1229                })
1230            }
1231            NetworkEvent::DelRoute(route) => {
1232                // Check if it's the default route
1233                if route.dst_len() == 0 {
1234                    return Some(Issue {
1235                        severity: Severity::Warning,
1236                        category: IssueCategory::NoDefaultRoute,
1237                        message: "Default route removed".to_string(),
1238                        details: None,
1239                        interface: None,
1240                        timestamp,
1241                    });
1242                }
1243                None
1244            }
1245            _ => None,
1246        }
1247    }
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252    use super::*;
1253
1254    #[test]
1255    fn test_severity_ordering() {
1256        assert!(Severity::Info < Severity::Warning);
1257        assert!(Severity::Warning < Severity::Error);
1258        assert!(Severity::Error < Severity::Critical);
1259    }
1260
1261    #[test]
1262    fn test_issue_display() {
1263        let issue = Issue {
1264            severity: Severity::Warning,
1265            category: IssueCategory::HighPacketLoss,
1266            message: "5% packet loss".to_string(),
1267            details: Some("Check cable".to_string()),
1268            interface: Some("eth0".to_string()),
1269            timestamp: Instant::now(),
1270        };
1271
1272        let s = format!("{}", issue);
1273        assert!(s.contains("eth0"));
1274        assert!(s.contains("5% packet loss"));
1275        assert!(s.contains("Check cable"));
1276    }
1277
1278    #[test]
1279    fn test_link_rates() {
1280        let rates = LinkRates {
1281            rx_bps: 1000,
1282            tx_bps: 2000,
1283            rx_pps: 10,
1284            tx_pps: 20,
1285            sample_duration_ms: 1000,
1286        };
1287
1288        assert_eq!(rates.total_bps(), 3000);
1289        assert_eq!(rates.total_pps(), 30);
1290    }
1291
1292    #[test]
1293    fn test_config_defaults() {
1294        let config = DiagnosticsConfig::default();
1295        assert_eq!(config.packet_loss_threshold, 0.01);
1296        assert!(config.skip_loopback);
1297    }
1298}