1use std::collections::HashMap;
54use std::fmt;
55use std::net::IpAddr;
56use std::pin::Pin;
57use std::sync::Arc;
58use std::task::{Context, Poll};
59use std::time::Instant;
60
61use tokio::sync::Mutex;
62use tokio_stream::Stream;
63
64use crate::netlink::connection::Connection;
65use crate::netlink::error::Result;
66use crate::netlink::events::NetworkEvent;
67use crate::netlink::messages::{AddressMessage, LinkMessage, LinkStats, RouteMessage, TcMessage};
68use crate::netlink::protocol::Route;
69use crate::netlink::stream::OwnedEventStream;
70use crate::netlink::types::link::OperState;
71use crate::netlink::types::neigh::NeighborState;
72
73#[derive(Debug, Clone)]
79pub struct DiagnosticReport {
80 pub timestamp: Instant,
82 pub interfaces: Vec<InterfaceDiag>,
84 pub routes: RouteDiag,
86 pub issues: Vec<Issue>,
88}
89
90#[derive(Debug, Clone)]
92pub struct InterfaceDiag {
93 pub name: String,
95 pub ifindex: u32,
97 pub state: OperState,
99 pub flags: u32,
101 pub mtu: Option<u32>,
103 pub stats: LinkStats,
105 pub rates: LinkRates,
107 pub tc: Option<TcDiag>,
109 pub issues: Vec<Issue>,
111}
112
113#[derive(Debug, Clone, Copy, Default)]
115pub struct LinkRates {
116 pub rx_bps: u64,
118 pub tx_bps: u64,
120 pub rx_pps: u64,
122 pub tx_pps: u64,
124 pub sample_duration_ms: u64,
126}
127
128impl LinkRates {
129 pub fn total_bps(&self) -> u64 {
131 self.rx_bps + self.tx_bps
132 }
133
134 pub fn total_pps(&self) -> u64 {
136 self.rx_pps + self.tx_pps
137 }
138}
139
140#[derive(Debug, Clone)]
142pub struct TcDiag {
143 pub qdisc: String,
145 pub handle: String,
147 pub drops: u64,
149 pub overlimits: u64,
151 pub backlog: u32,
153 pub qlen: u32,
155 pub rate_bps: u64,
157 pub rate_pps: u64,
159 pub bytes: u64,
161 pub packets: u64,
163}
164
165impl TcDiag {
166 pub fn from_tc_message(tc: &TcMessage) -> Self {
168 Self {
169 qdisc: tc.kind().unwrap_or("unknown").to_string(),
170 handle: tc.handle_str(),
171 drops: tc.drops() as u64,
172 overlimits: tc.overlimits() as u64,
173 backlog: tc.backlog(),
174 qlen: tc.qlen(),
175 rate_bps: tc.bps() as u64,
176 rate_pps: tc.pps() as u64,
177 bytes: tc.bytes(),
178 packets: tc.packets(),
179 }
180 }
181}
182
183#[derive(Debug, Clone, Default)]
185pub struct RouteDiag {
186 pub ipv4_route_count: usize,
188 pub ipv6_route_count: usize,
190 pub has_default_ipv4: bool,
192 pub has_default_ipv6: bool,
194 pub default_gateway_v4: Option<IpAddr>,
196 pub default_gateway_v6: Option<IpAddr>,
198}
199
200#[derive(Debug, Clone)]
202pub struct Issue {
203 pub severity: Severity,
205 pub category: IssueCategory,
207 pub message: String,
209 pub details: Option<String>,
211 pub interface: Option<String>,
213 pub timestamp: Instant,
215}
216
217impl fmt::Display for Issue {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 if let Some(ref iface) = self.interface {
220 write!(f, "[{}] ", iface)?;
221 }
222 write!(f, "{}", self.message)?;
223 if let Some(ref details) = self.details {
224 write!(f, " ({})", details)?;
225 }
226 Ok(())
227 }
228}
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
232#[non_exhaustive]
233pub enum Severity {
234 Info,
236 Warning,
238 Error,
240 Critical,
242}
243
244impl fmt::Display for Severity {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 match self {
247 Severity::Info => write!(f, "INFO"),
248 Severity::Warning => write!(f, "WARN"),
249 Severity::Error => write!(f, "ERROR"),
250 Severity::Critical => write!(f, "CRITICAL"),
251 }
252 }
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
257#[non_exhaustive]
258pub enum IssueCategory {
259 LinkDown,
261 NoCarrier,
263 HighPacketLoss,
265 LinkErrors,
267 QdiscDrops,
269 BufferOverflow,
271 NoRoute,
273 Unreachable,
275 HighLatency,
277 NoAddress,
279 NoDefaultRoute,
281 MtuIssue,
283 DuplexMismatch,
285}
286
287impl fmt::Display for IssueCategory {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 match self {
290 IssueCategory::LinkDown => write!(f, "LinkDown"),
291 IssueCategory::NoCarrier => write!(f, "NoCarrier"),
292 IssueCategory::HighPacketLoss => write!(f, "HighPacketLoss"),
293 IssueCategory::LinkErrors => write!(f, "LinkErrors"),
294 IssueCategory::QdiscDrops => write!(f, "QdiscDrops"),
295 IssueCategory::BufferOverflow => write!(f, "BufferOverflow"),
296 IssueCategory::NoRoute => write!(f, "NoRoute"),
297 IssueCategory::Unreachable => write!(f, "Unreachable"),
298 IssueCategory::HighLatency => write!(f, "HighLatency"),
299 IssueCategory::NoAddress => write!(f, "NoAddress"),
300 IssueCategory::NoDefaultRoute => write!(f, "NoDefaultRoute"),
301 IssueCategory::MtuIssue => write!(f, "MtuIssue"),
302 IssueCategory::DuplexMismatch => write!(f, "DuplexMismatch"),
303 }
304 }
305}
306
307#[derive(Debug, Clone)]
309pub struct ConnectivityReport {
310 pub destination: IpAddr,
312 pub route: Option<RouteInfo>,
314 pub output_interface: Option<String>,
316 pub gateway: Option<IpAddr>,
318 pub gateway_reachable: bool,
320 pub issues: Vec<Issue>,
322}
323
324#[derive(Debug, Clone)]
326pub struct RouteInfo {
327 pub destination: String,
329 pub prefix_len: u8,
331 pub gateway: Option<IpAddr>,
333 pub oif: Option<u32>,
335 pub metric: Option<u32>,
337}
338
339#[derive(Debug, Clone)]
341pub struct Bottleneck {
342 pub location: String,
344 pub bottleneck_type: BottleneckType,
346 pub current_rate: u64,
348 pub drop_rate: f64,
350 pub total_drops: u64,
352 pub recommendation: String,
354}
355
356#[derive(Debug, Clone, Copy, PartialEq, Eq)]
358#[non_exhaustive]
359pub enum BottleneckType {
360 QdiscDrops,
362 InterfaceDrops,
364 BufferFull,
366 RateLimited,
368 HardwareErrors,
370}
371
372impl fmt::Display for BottleneckType {
373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 match self {
375 BottleneckType::QdiscDrops => write!(f, "Qdisc Drops"),
376 BottleneckType::InterfaceDrops => write!(f, "Interface Drops"),
377 BottleneckType::BufferFull => write!(f, "Buffer Full"),
378 BottleneckType::RateLimited => write!(f, "Rate Limited"),
379 BottleneckType::HardwareErrors => write!(f, "Hardware Errors"),
380 }
381 }
382}
383
384#[derive(Debug, Clone)]
390#[must_use = "builders do nothing unless used"]
391pub struct DiagnosticsConfig {
392 pub packet_loss_threshold: f64,
394 pub error_rate_threshold: f64,
396 pub qdisc_drop_threshold: f64,
398 pub backlog_threshold: u32,
400 pub qlen_threshold: u32,
402 pub skip_loopback: bool,
404 pub skip_down: bool,
406 pub min_bytes_for_rate: u64,
408}
409
410impl Default for DiagnosticsConfig {
411 fn default() -> Self {
412 Self {
413 packet_loss_threshold: 0.01,
414 error_rate_threshold: 0.001,
415 qdisc_drop_threshold: 0.01,
416 backlog_threshold: 100_000,
417 qlen_threshold: 1000,
418 skip_loopback: true,
419 skip_down: false,
420 min_bytes_for_rate: 1000,
421 }
422 }
423}
424
425pub struct Diagnostics {
433 conn: Connection<Route>,
434 config: DiagnosticsConfig,
435 prev_stats: Arc<Mutex<HashMap<u32, (Instant, LinkStats)>>>,
437 #[allow(dead_code, clippy::type_complexity)]
439 prev_tc_stats: Arc<Mutex<HashMap<(u32, u32), (Instant, u64, u64)>>>,
440}
441
442impl Diagnostics {
443 pub fn new(conn: Connection<Route>) -> Self {
445 Self {
446 conn,
447 config: DiagnosticsConfig::default(),
448 prev_stats: Arc::new(Mutex::new(HashMap::new())),
449 prev_tc_stats: Arc::new(Mutex::new(HashMap::new())),
450 }
451 }
452
453 pub fn with_config(conn: Connection<Route>, config: DiagnosticsConfig) -> Self {
455 Self {
456 conn,
457 config,
458 prev_stats: Arc::new(Mutex::new(HashMap::new())),
459 prev_tc_stats: Arc::new(Mutex::new(HashMap::new())),
460 }
461 }
462
463 pub fn config(&self) -> &DiagnosticsConfig {
465 &self.config
466 }
467
468 pub fn config_mut(&mut self) -> &mut DiagnosticsConfig {
470 &mut self.config
471 }
472
473 pub async fn scan(&self) -> Result<DiagnosticReport> {
475 let timestamp = Instant::now();
476 let mut all_issues = Vec::new();
477
478 let links = self.conn.get_links().await?;
480
481 let addresses = self.conn.get_addresses().await?;
483 let addr_by_ifindex: HashMap<u32, Vec<_>> = {
484 let mut map: HashMap<u32, Vec<_>> = HashMap::new();
485 for addr in addresses {
486 map.entry(addr.ifindex()).or_default().push(addr);
487 }
488 map
489 };
490
491 let qdiscs = self.conn.get_qdiscs().await?;
493 let qdiscs_by_ifindex: HashMap<u32, Vec<_>> = {
494 let mut map: HashMap<u32, Vec<_>> = HashMap::new();
495 for qdisc in qdiscs {
496 map.entry(qdisc.ifindex()).or_default().push(qdisc);
497 }
498 map
499 };
500
501 let all_routes = self.conn.get_routes().await.unwrap_or_default();
503 let ipv4_routes: Vec<_> = all_routes.iter().filter(|r| r.is_ipv4()).collect();
504 let ipv6_routes: Vec<_> = all_routes.iter().filter(|r| r.is_ipv6()).collect();
505
506 let routes = self.build_route_diag(&ipv4_routes, &ipv6_routes);
508
509 if !routes.has_default_ipv4 && !routes.has_default_ipv6 {
511 all_issues.push(Issue {
512 severity: Severity::Warning,
513 category: IssueCategory::NoDefaultRoute,
514 message: "No default route configured".to_string(),
515 details: Some("System may not have internet connectivity".to_string()),
516 interface: None,
517 timestamp,
518 });
519 }
520
521 let mut interfaces = Vec::new();
523 let mut prev_stats = self.prev_stats.lock().await;
524
525 for link in links {
526 if self.config.skip_loopback && link.is_loopback() {
528 continue;
529 }
530
531 if self.config.skip_down && !link.is_up() {
533 continue;
534 }
535
536 let ifindex = link.ifindex();
537 let name = link.name().unwrap_or("?").to_string();
538 let state = link.operstate().unwrap_or(OperState::Unknown);
539 let stats = link.stats().cloned().unwrap_or_default();
540
541 let rates = if let Some((prev_time, prev)) = prev_stats.get(&ifindex) {
543 let elapsed = prev_time.elapsed();
544 if elapsed.as_millis() > 0 {
545 let ms = elapsed.as_millis() as u64;
546 LinkRates {
547 rx_bps: (stats.rx_bytes().saturating_sub(prev.rx_bytes())) * 1000 / ms,
548 tx_bps: (stats.tx_bytes().saturating_sub(prev.tx_bytes())) * 1000 / ms,
549 rx_pps: (stats.rx_packets().saturating_sub(prev.rx_packets())) * 1000 / ms,
550 tx_pps: (stats.tx_packets().saturating_sub(prev.tx_packets())) * 1000 / ms,
551 sample_duration_ms: ms,
552 }
553 } else {
554 LinkRates::default()
555 }
556 } else {
557 LinkRates::default()
558 };
559
560 prev_stats.insert(ifindex, (Instant::now(), stats));
562
563 let mut issues = self.detect_link_issues(&link, &stats, &addr_by_ifindex, timestamp);
565
566 let tc = qdiscs_by_ifindex.get(&ifindex).and_then(|qs| {
568 qs.iter()
570 .find(|q| q.is_root())
571 .or_else(|| qs.first())
572 .map(|q| {
573 let tc_diag = TcDiag::from_tc_message(q);
574 issues.extend(self.detect_tc_issues(q, &name, timestamp));
576 tc_diag
577 })
578 });
579
580 all_issues.extend(issues.iter().cloned());
582
583 interfaces.push(InterfaceDiag {
584 name,
585 ifindex,
586 state,
587 flags: link.flags(),
588 mtu: link.mtu(),
589 stats,
590 rates,
591 tc,
592 issues,
593 });
594 }
595
596 Ok(DiagnosticReport {
597 timestamp,
598 interfaces,
599 routes,
600 issues: all_issues,
601 })
602 }
603
604 pub async fn scan_interface(&self, dev: &str) -> Result<InterfaceDiag> {
606 let timestamp = Instant::now();
607
608 let link = self.conn.get_link_by_name(dev).await?;
610 let link = link.ok_or_else(|| crate::netlink::error::Error::interface_not_found(dev))?;
611
612 let ifindex = link.ifindex();
613 let name = dev.to_string();
614 let state = link.operstate().unwrap_or(OperState::Unknown);
615 let stats = link.stats().cloned().unwrap_or_default();
616
617 let addresses = self.conn.get_addresses_by_name(dev).await?;
619 let addr_by_ifindex: HashMap<u32, Vec<_>> = {
620 let mut map: HashMap<u32, Vec<_>> = HashMap::new();
621 for addr in addresses {
622 map.entry(addr.ifindex()).or_default().push(addr);
623 }
624 map
625 };
626
627 let mut prev_stats = self.prev_stats.lock().await;
629 let rates = if let Some((prev_time, prev)) = prev_stats.get(&ifindex) {
630 let elapsed = prev_time.elapsed();
631 if elapsed.as_millis() > 0 {
632 let ms = elapsed.as_millis() as u64;
633 LinkRates {
634 rx_bps: (stats.rx_bytes().saturating_sub(prev.rx_bytes())) * 1000 / ms,
635 tx_bps: (stats.tx_bytes().saturating_sub(prev.tx_bytes())) * 1000 / ms,
636 rx_pps: (stats.rx_packets().saturating_sub(prev.rx_packets())) * 1000 / ms,
637 tx_pps: (stats.tx_packets().saturating_sub(prev.tx_packets())) * 1000 / ms,
638 sample_duration_ms: ms,
639 }
640 } else {
641 LinkRates::default()
642 }
643 } else {
644 LinkRates::default()
645 };
646
647 prev_stats.insert(ifindex, (Instant::now(), stats));
648
649 let mut issues = self.detect_link_issues(&link, &stats, &addr_by_ifindex, timestamp);
651
652 let qdiscs = self.conn.get_qdiscs_by_name(dev).await?;
654 let tc = qdiscs
655 .iter()
656 .find(|q| q.is_root())
657 .or(qdiscs.first())
658 .map(|q| {
659 let tc_diag = TcDiag::from_tc_message(q);
660 issues.extend(self.detect_tc_issues(q, &name, timestamp));
661 tc_diag
662 });
663
664 Ok(InterfaceDiag {
665 name,
666 ifindex,
667 state,
668 flags: link.flags(),
669 mtu: link.mtu(),
670 stats,
671 rates,
672 tc,
673 issues,
674 })
675 }
676
677 pub async fn check_connectivity(&self, dest: IpAddr) -> Result<ConnectivityReport> {
679 let timestamp = Instant::now();
680 let mut issues = Vec::new();
681
682 let all_routes = self.conn.get_routes().await?;
684 let routes: Vec<_> = match dest {
685 IpAddr::V4(_) => all_routes.iter().filter(|r| r.is_ipv4()).collect(),
686 IpAddr::V6(_) => all_routes.iter().filter(|r| r.is_ipv6()).collect(),
687 };
688
689 let matching_route = routes.iter().find(|r| {
691 if let Some(dst) = &r.destination {
692 if r.dst_len() == 0 {
694 return true;
695 }
696 match (dest, dst) {
698 (IpAddr::V4(d), IpAddr::V4(p)) => {
699 let prefix_len = r.dst_len();
700 let mask = if prefix_len >= 32 {
701 u32::MAX
702 } else {
703 u32::MAX << (32 - prefix_len)
704 };
705 (u32::from(d) & mask) == (u32::from(*p) & mask)
706 }
707 (IpAddr::V6(d), IpAddr::V6(p)) => {
708 let d_bytes = d.octets();
709 let p_bytes = p.octets();
710 let prefix_len = r.dst_len();
711 let full_bytes = (prefix_len / 8) as usize;
712 let remaining_bits = prefix_len % 8;
713
714 if d_bytes[..full_bytes] != p_bytes[..full_bytes] {
715 return false;
716 }
717
718 if remaining_bits > 0 && full_bytes < 16 {
719 let mask = 0xFF << (8 - remaining_bits);
720 (d_bytes[full_bytes] & mask) == (p_bytes[full_bytes] & mask)
721 } else {
722 true
723 }
724 }
725 _ => false,
726 }
727 } else {
728 r.dst_len() == 0
730 }
731 });
732
733 let (route, gateway, output_interface, oif) = if let Some(r) = matching_route {
734 let gateway = r.gateway;
735 let oif = r.oif;
736 let output_interface = if let Some(idx) = oif {
737 self.conn
738 .get_link_by_index(idx)
739 .await?
740 .and_then(|l| l.name().map(|s| s.to_string()))
741 } else {
742 None
743 };
744
745 let route_info = RouteInfo {
746 destination: r
747 .destination
748 .map(|d| d.to_string())
749 .unwrap_or_else(|| "default".to_string()),
750 prefix_len: r.dst_len(),
751 gateway,
752 oif,
753 metric: r.priority(),
754 };
755
756 (Some(route_info), gateway, output_interface, oif)
757 } else {
758 issues.push(Issue {
759 severity: Severity::Error,
760 category: IssueCategory::NoRoute,
761 message: format!("No route to {}", dest),
762 details: None,
763 interface: None,
764 timestamp,
765 });
766 (None, None, None, None)
767 };
768
769 let gateway_reachable = if let Some(gw) = gateway {
771 let neighbors = self.conn.get_neighbors().await.unwrap_or_default();
773 neighbors.iter().any(|n| {
774 n.destination == Some(gw)
775 && n.state() != NeighborState::Incomplete
776 && n.state() != NeighborState::Failed
777 })
778 } else {
779 true };
781
782 if gateway.is_some() && !gateway_reachable {
783 issues.push(Issue {
784 severity: Severity::Warning,
785 category: IssueCategory::Unreachable,
786 message: format!("Gateway {:?} may be unreachable", gateway),
787 details: Some("Not found in neighbor cache or in failed state".to_string()),
788 interface: output_interface.clone(),
789 timestamp,
790 });
791 }
792
793 if let Some(idx) = oif
795 && let Some(link) = self.conn.get_link_by_index(idx).await?
796 && !link.is_up()
797 {
798 issues.push(Issue {
799 severity: Severity::Error,
800 category: IssueCategory::LinkDown,
801 message: format!("Output interface {} is down", link.name().unwrap_or("?")),
802 details: None,
803 interface: link.name().map(|s| s.to_string()),
804 timestamp,
805 });
806 }
807
808 Ok(ConnectivityReport {
809 destination: dest,
810 route,
811 output_interface,
812 gateway,
813 gateway_reachable,
814 issues,
815 })
816 }
817
818 pub async fn find_bottleneck(&self) -> Result<Option<Bottleneck>> {
820 let mut bottlenecks = Vec::new();
821
822 let links = self.conn.get_links().await?;
824
825 for link in &links {
826 if link.is_loopback() {
827 continue;
828 }
829
830 let name = link.name().unwrap_or("?");
831
832 if let Some(stats) = link.stats() {
833 let total_packets = stats.total_packets();
834 let total_dropped = stats.total_dropped();
835 let total_errors = stats.total_errors();
836
837 if total_packets > 0 {
838 let drop_rate = total_dropped as f64 / total_packets as f64;
839
840 if drop_rate > self.config.packet_loss_threshold {
841 bottlenecks.push(Bottleneck {
842 location: format!("{} interface", name),
843 bottleneck_type: BottleneckType::InterfaceDrops,
844 current_rate: 0,
845 drop_rate,
846 total_drops: total_dropped,
847 recommendation: format!(
848 "Check {} for hardware issues or increase buffer sizes",
849 name
850 ),
851 });
852 }
853
854 if total_errors > 0 {
855 let error_rate = total_errors as f64 / total_packets as f64;
856 if error_rate > self.config.error_rate_threshold {
857 bottlenecks.push(Bottleneck {
858 location: format!("{} interface", name),
859 bottleneck_type: BottleneckType::HardwareErrors,
860 current_rate: 0,
861 drop_rate: error_rate,
862 total_drops: total_errors,
863 recommendation: format!(
864 "Check cable, PHY settings, or NIC on {}",
865 name
866 ),
867 });
868 }
869 }
870 }
871 }
872 }
873
874 let qdiscs = self.conn.get_qdiscs().await?;
876 let names = self.conn.get_interface_names().await?;
877
878 for qdisc in &qdiscs {
879 if !qdisc.is_root() {
880 continue;
881 }
882
883 let name = names
884 .get(&qdisc.ifindex())
885 .map(|s| s.as_str())
886 .unwrap_or("?");
887
888 let drops = qdisc.drops() as u64;
889 let packets = qdisc.packets();
890
891 if packets > 0 {
892 let drop_rate = drops as f64 / packets as f64;
893
894 if drop_rate > self.config.qdisc_drop_threshold {
895 bottlenecks.push(Bottleneck {
896 location: format!(
897 "{} egress qdisc ({})",
898 name,
899 qdisc.kind().unwrap_or("?")
900 ),
901 bottleneck_type: BottleneckType::QdiscDrops,
902 current_rate: qdisc.bps() as u64,
903 drop_rate,
904 total_drops: drops,
905 recommendation: format!(
906 "Increase qdisc limit or rate on {}, or switch to a different qdisc",
907 name
908 ),
909 });
910 }
911 }
912
913 let backlog = qdisc.backlog();
915 let qlen = qdisc.qlen();
916
917 if backlog > self.config.backlog_threshold || qlen > self.config.qlen_threshold {
918 bottlenecks.push(Bottleneck {
919 location: format!("{} egress qdisc ({})", name, qdisc.kind().unwrap_or("?")),
920 bottleneck_type: BottleneckType::BufferFull,
921 current_rate: qdisc.bps() as u64,
922 drop_rate: 0.0,
923 total_drops: drops,
924 recommendation: format!(
925 "High queue depth on {} - consider reducing buffering or increasing rate",
926 name
927 ),
928 });
929 }
930 }
931
932 bottlenecks.sort_by(|a, b| {
934 b.drop_rate
935 .partial_cmp(&a.drop_rate)
936 .unwrap_or(std::cmp::Ordering::Equal)
937 });
938 Ok(bottlenecks.into_iter().next())
939 }
940
941 pub async fn watch(&self) -> Result<IssueStream> {
945 let mut conn = Connection::<Route>::new()?;
946 conn.subscribe_all()?;
947 Ok(IssueStream {
948 events: conn.into_events(),
949 config: self.config.clone(),
950 })
951 }
952
953 fn build_route_diag(
958 &self,
959 ipv4_routes: &[&RouteMessage],
960 ipv6_routes: &[&RouteMessage],
961 ) -> RouteDiag {
962 let mut diag = RouteDiag {
963 ipv4_route_count: ipv4_routes.len(),
964 ipv6_route_count: ipv6_routes.len(),
965 ..Default::default()
966 };
967
968 for route in ipv4_routes {
970 if route.dst_len() == 0 {
971 diag.has_default_ipv4 = true;
972 diag.default_gateway_v4 = route.gateway;
973 break;
974 }
975 }
976
977 for route in ipv6_routes {
979 if route.dst_len() == 0 {
980 diag.has_default_ipv6 = true;
981 diag.default_gateway_v6 = route.gateway;
982 break;
983 }
984 }
985
986 diag
987 }
988
989 fn detect_link_issues(
990 &self,
991 link: &LinkMessage,
992 stats: &LinkStats,
993 addr_by_ifindex: &HashMap<u32, Vec<AddressMessage>>,
994 timestamp: Instant,
995 ) -> Vec<Issue> {
996 let mut issues = Vec::new();
997 let name = link.name().unwrap_or("?").to_string();
998 let ifindex = link.ifindex();
999
1000 if !link.is_up() {
1002 issues.push(Issue {
1003 severity: Severity::Warning,
1004 category: IssueCategory::LinkDown,
1005 message: format!("Interface {} is down", name),
1006 details: None,
1007 interface: Some(name.clone()),
1008 timestamp,
1009 });
1010 }
1011
1012 if link.is_up() && !link.has_carrier() {
1014 issues.push(Issue {
1015 severity: Severity::Error,
1016 category: IssueCategory::NoCarrier,
1017 message: format!("No carrier on {}", name),
1018 details: Some("Check cable connection".to_string()),
1019 interface: Some(name.clone()),
1020 timestamp,
1021 });
1022 }
1023
1024 let total_packets = stats.total_packets();
1026 let total_dropped = stats.total_dropped();
1027
1028 if total_packets > self.config.min_bytes_for_rate && total_dropped > 0 {
1029 let drop_rate = total_dropped as f64 / total_packets as f64;
1030 if drop_rate > self.config.packet_loss_threshold {
1031 issues.push(Issue {
1032 severity: Severity::Warning,
1033 category: IssueCategory::HighPacketLoss,
1034 message: format!("{:.2}% packet loss on {}", drop_rate * 100.0, name),
1035 details: Some(format!(
1036 "{} dropped out of {} packets",
1037 total_dropped, total_packets
1038 )),
1039 interface: Some(name.clone()),
1040 timestamp,
1041 });
1042 }
1043 }
1044
1045 let total_errors = stats.total_errors();
1047 if total_packets > self.config.min_bytes_for_rate && total_errors > 0 {
1048 let error_rate = total_errors as f64 / total_packets as f64;
1049 if error_rate > self.config.error_rate_threshold {
1050 issues.push(Issue {
1051 severity: Severity::Warning,
1052 category: IssueCategory::LinkErrors,
1053 message: format!(
1054 "{} errors on {} ({:.3}%)",
1055 total_errors,
1056 name,
1057 error_rate * 100.0
1058 ),
1059 details: Some(format!(
1060 "RX errors: {}, TX errors: {}",
1061 stats.rx_errors(),
1062 stats.tx_errors()
1063 )),
1064 interface: Some(name.clone()),
1065 timestamp,
1066 });
1067 }
1068 }
1069
1070 if link.is_up() && !link.is_loopback() {
1072 let has_addrs = addr_by_ifindex
1073 .get(&ifindex)
1074 .map(|addrs| !addrs.is_empty())
1075 .unwrap_or(false);
1076 if !has_addrs {
1077 issues.push(Issue {
1078 severity: Severity::Info,
1079 category: IssueCategory::NoAddress,
1080 message: format!("No IP addresses configured on {}", name),
1081 details: None,
1082 interface: Some(name.clone()),
1083 timestamp,
1084 });
1085 }
1086 }
1087
1088 issues
1089 }
1090
1091 fn detect_tc_issues(&self, tc: &TcMessage, iface: &str, timestamp: Instant) -> Vec<Issue> {
1092 let mut issues = Vec::new();
1093
1094 let drops = tc.drops() as u64;
1095 let packets = tc.packets();
1096
1097 if packets > 0 {
1098 let drop_rate = drops as f64 / packets as f64;
1099 if drop_rate > self.config.qdisc_drop_threshold {
1100 issues.push(Issue {
1101 severity: Severity::Warning,
1102 category: IssueCategory::QdiscDrops,
1103 message: format!(
1104 "Qdisc {} dropping {:.2}% of packets on {}",
1105 tc.kind().unwrap_or("?"),
1106 drop_rate * 100.0,
1107 iface
1108 ),
1109 details: Some(format!("{} drops out of {} packets", drops, packets)),
1110 interface: Some(iface.to_string()),
1111 timestamp,
1112 });
1113 }
1114 }
1115
1116 if tc.backlog() > self.config.backlog_threshold {
1118 issues.push(Issue {
1119 severity: Severity::Warning,
1120 category: IssueCategory::BufferOverflow,
1121 message: format!("High backlog ({} bytes) on {} qdisc", tc.backlog(), iface),
1122 details: Some(format!("Queue length: {} packets", tc.qlen())),
1123 interface: Some(iface.to_string()),
1124 timestamp,
1125 });
1126 }
1127
1128 issues
1129 }
1130}
1131
1132pub struct IssueStream {
1138 events: OwnedEventStream<Route>,
1139 #[allow(dead_code)]
1141 config: DiagnosticsConfig,
1142}
1143
1144impl Stream for IssueStream {
1145 type Item = Result<Issue>;
1146
1147 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1148 loop {
1149 let events = Pin::new(&mut self.events);
1150
1151 match events.poll_next(cx) {
1152 Poll::Ready(Some(Ok(event))) => {
1153 if let Some(issue) = self.event_to_issue(&event) {
1154 return Poll::Ready(Some(Ok(issue)));
1155 }
1156 }
1158 Poll::Ready(Some(Err(e))) => {
1159 return Poll::Ready(Some(Err(e)));
1160 }
1161 Poll::Ready(None) => {
1162 return Poll::Ready(None);
1163 }
1164 Poll::Pending => {
1165 return Poll::Pending;
1166 }
1167 }
1168 }
1169 }
1170}
1171
1172impl IssueStream {
1173 fn event_to_issue(&self, event: &NetworkEvent) -> Option<Issue> {
1174 let timestamp = Instant::now();
1175
1176 match event {
1177 NetworkEvent::DelLink(link) => {
1178 let name = link.name().unwrap_or("?").to_string();
1179 Some(Issue {
1180 severity: Severity::Warning,
1181 category: IssueCategory::LinkDown,
1182 message: format!("Interface {} removed", name),
1183 details: None,
1184 interface: Some(name),
1185 timestamp,
1186 })
1187 }
1188 NetworkEvent::NewLink(link) => {
1189 let name = link.name().unwrap_or("?").to_string();
1190
1191 if link.is_up() && !link.has_carrier() {
1193 return Some(Issue {
1194 severity: Severity::Error,
1195 category: IssueCategory::NoCarrier,
1196 message: format!("No carrier on {}", name),
1197 details: Some("Check cable connection".to_string()),
1198 interface: Some(name),
1199 timestamp,
1200 });
1201 }
1202
1203 if let Some(state) = link.operstate()
1205 && (state == OperState::Down || state == OperState::LowerLayerDown)
1206 {
1207 return Some(Issue {
1208 severity: Severity::Warning,
1209 category: IssueCategory::LinkDown,
1210 message: format!("Interface {} is {:?}", name, state),
1211 details: None,
1212 interface: Some(name),
1213 timestamp,
1214 });
1215 }
1216
1217 None
1218 }
1219 NetworkEvent::DelAddress(addr) => {
1220 let name = crate::util::ifname::index_to_name(addr.ifindex())
1221 .unwrap_or_else(|_| format!("if{}", addr.ifindex()));
1222 Some(Issue {
1223 severity: Severity::Info,
1224 category: IssueCategory::NoAddress,
1225 message: format!("Address {:?} removed from {}", addr.address(), name),
1226 details: None,
1227 interface: Some(name),
1228 timestamp,
1229 })
1230 }
1231 NetworkEvent::DelRoute(route) => {
1232 if route.dst_len() == 0 {
1234 return Some(Issue {
1235 severity: Severity::Warning,
1236 category: IssueCategory::NoDefaultRoute,
1237 message: "Default route removed".to_string(),
1238 details: None,
1239 interface: None,
1240 timestamp,
1241 });
1242 }
1243 None
1244 }
1245 _ => None,
1246 }
1247 }
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252 use super::*;
1253
1254 #[test]
1255 fn test_severity_ordering() {
1256 assert!(Severity::Info < Severity::Warning);
1257 assert!(Severity::Warning < Severity::Error);
1258 assert!(Severity::Error < Severity::Critical);
1259 }
1260
1261 #[test]
1262 fn test_issue_display() {
1263 let issue = Issue {
1264 severity: Severity::Warning,
1265 category: IssueCategory::HighPacketLoss,
1266 message: "5% packet loss".to_string(),
1267 details: Some("Check cable".to_string()),
1268 interface: Some("eth0".to_string()),
1269 timestamp: Instant::now(),
1270 };
1271
1272 let s = format!("{}", issue);
1273 assert!(s.contains("eth0"));
1274 assert!(s.contains("5% packet loss"));
1275 assert!(s.contains("Check cable"));
1276 }
1277
1278 #[test]
1279 fn test_link_rates() {
1280 let rates = LinkRates {
1281 rx_bps: 1000,
1282 tx_bps: 2000,
1283 rx_pps: 10,
1284 tx_pps: 20,
1285 sample_duration_ms: 1000,
1286 };
1287
1288 assert_eq!(rates.total_bps(), 3000);
1289 assert_eq!(rates.total_pps(), 30);
1290 }
1291
1292 #[test]
1293 fn test_config_defaults() {
1294 let config = DiagnosticsConfig::default();
1295 assert_eq!(config.packet_loss_threshold, 0.01);
1296 assert!(config.skip_loopback);
1297 }
1298}