use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::iter::zip;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use chrono::Utc;
use log::debug;
use nom::bytes::complete::tag;
use nom::character::complete::{alphanumeric1, i64, multispace0, multispace1, u64};
use nom::{
combinator::opt,
multi::count,
sequence::{pair, preceded, terminated},
IResult,
};
use crate::{
metrics::{
system_metrics::SystemMetricFamilyCollector, KeyedMetricReading, MetricReading,
MetricStringKey,
},
util::time_measure::TimeMeasure,
};
use eyre::{eyre, ErrReport, Result};
const PROC_NET_DEV_PATH: &str = "/proc/net/dev";
const PROC_NET_WIRELESS_PATH: &str = "/proc/net/wireless";
pub const NETWORK_INTERFACE_METRIC_NAMESPACE: &str = "interface";
pub const METRIC_INTERFACE_BYTES_PER_SECOND_RX_SUFFIX: &str = "bytes_per_second/rx";
pub const METRIC_INTERFACE_BYTES_PER_SECOND_TX_SUFFIX: &str = "bytes_per_second/tx";
const NETWORK_INTERFACE_METRIC_KEYS: &[&str; 8] = &[
METRIC_INTERFACE_BYTES_PER_SECOND_RX_SUFFIX,
"packets_per_second/rx",
"errors_per_second/rx",
"dropped_per_second/rx",
METRIC_INTERFACE_BYTES_PER_SECOND_TX_SUFFIX,
"packets_per_second/tx",
"errors_per_second/tx",
"dropped_per_second/tx",
];
pub enum NetworkInterfaceMetricsConfig {
Auto,
Interfaces(HashSet<String>),
}
pub struct NetworkInterfaceMetricCollector<T: TimeMeasure> {
config: NetworkInterfaceMetricsConfig,
previous_readings_by_interface: HashMap<String, ProcNetDevReading<T>>,
}
#[derive(Clone)]
pub struct ProcNetDevReading<T: TimeMeasure> {
stats: Vec<u64>,
reading_time: T,
}
impl<T> NetworkInterfaceMetricCollector<T>
where
T: TimeMeasure + Copy + Ord + std::ops::Add<Duration, Output = T> + Send + Sync + 'static,
{
pub fn new(config: NetworkInterfaceMetricsConfig) -> Self {
Self {
config,
previous_readings_by_interface: HashMap::new(),
}
}
fn interface_is_monitored(&self, interface: &str) -> bool {
match &self.config {
NetworkInterfaceMetricsConfig::Auto => {
!(interface.starts_with("lo")
|| interface.starts_with("tun")
|| interface.starts_with("dummy")
|| interface.starts_with("veth")
|| interface.starts_with("usb"))
}
NetworkInterfaceMetricsConfig::Interfaces(configured_interfaces) => {
configured_interfaces.contains(interface)
}
}
}
pub fn get_wireless_interface_metrics(&mut self) -> Result<Vec<KeyedMetricReading>> {
let path = Path::new(PROC_NET_WIRELESS_PATH);
if !path.exists() {
return Ok(vec![]);
}
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut wireless_metric_readings = vec![];
for line in reader.lines() {
if let Ok((interface_id, net_stats)) = Self::parse_proc_net_wireless_line(line?.trim())
{
if self.interface_is_monitored(&interface_id) {
let level = net_stats
.get(1)
.ok_or(eyre!("Missing level for {}", interface_id))?;
let rssi_key = MetricStringKey::from_str(
format!("interfaces/{}/rssi", interface_id).as_str(),
)
.map_err(|e| {
eyre!("Couldn't build link metric key for {}: {}", interface_id, e)
})?;
wireless_metric_readings
.extend([KeyedMetricReading::new_histogram(rssi_key, *level as f64)]);
}
} else {
debug!("Couldn't parse /proc/net/wireless line");
}
}
Ok(wireless_metric_readings)
}
pub fn get_network_interface_metrics(&mut self) -> Result<Vec<KeyedMetricReading>> {
let mut no_parseable_lines = true;
let path = Path::new(PROC_NET_DEV_PATH);
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut net_metric_readings = vec![];
for line in reader.lines() {
if let Ok((interface_id, net_stats)) = Self::parse_proc_net_dev_line(line?.trim()) {
no_parseable_lines = false;
if self.interface_is_monitored(&interface_id) {
if let Ok(mut readings) = self.calculate_network_metrics(
interface_id.to_string(),
ProcNetDevReading {
stats: net_stats,
reading_time: T::now(),
},
) {
net_metric_readings.append(&mut readings);
}
}
}
}
if no_parseable_lines {
Err(eyre!(
"No network metrics were collected from {} - is it a properly formatted /proc/net/dev file?",
PROC_NET_DEV_PATH
))
} else {
Ok(net_metric_readings)
}
}
fn parse_net_if(input: &str) -> IResult<&str, &str> {
terminated(preceded(multispace0, alphanumeric1), tag(":"))(input)
}
fn parse_interface_stats(input: &str) -> IResult<&str, Vec<u64>> {
count(preceded(multispace1, u64), 16)(input)
}
fn parse_proc_net_dev_line(line: &str) -> Result<(String, Vec<u64>)> {
let (_remaining, (interface_id, net_stats)) =
pair(Self::parse_net_if, Self::parse_interface_stats)(line)
.map_err(|e| eyre!("Failed to parse /proc/net/dev line: {}", e))?;
Ok((interface_id.to_string(), net_stats))
}
fn parse_wireless_status(input: &str) -> IResult<&str, u64> {
preceded(multispace1, u64)(input)
}
fn parse_wireless_stats(input: &str) -> IResult<&str, Vec<i64>> {
count(terminated(preceded(multispace1, i64), opt(tag("."))), 2)(input)
}
fn parse_proc_net_wireless_line(line: &str) -> Result<(String, Vec<i64>)> {
let (_remaining, (interface_id, (_status, wireless_stats))) = pair(
Self::parse_net_if,
pair(Self::parse_wireless_status, Self::parse_wireless_stats),
)(line)
.map_err(|e| eyre!("Failed to parse /proc/net/wireless line: {}", e))?;
Ok((interface_id.to_string(), wireless_stats))
}
fn calculate_network_metrics(
&mut self,
interface: String,
current_reading: ProcNetDevReading<T>,
) -> Result<Vec<KeyedMetricReading>> {
if let Some(ProcNetDevReading {
stats: previous_net_stats,
reading_time: previous_reading_time,
}) = self
.previous_readings_by_interface
.insert(interface.clone(), current_reading.clone())
{
let curr_interface_bytes_rx = current_reading
.stats
.first()
.ok_or(eyre!("Current reading is missing bytes received value"))?;
let prev_interface_bytes_rx = previous_net_stats
.first()
.ok_or(eyre!("Previous reading is missing bytes received value"))?;
let interface_bytes_rx = curr_interface_bytes_rx.checked_sub(*prev_interface_bytes_rx);
let curr_interface_bytes_tx = current_reading
.stats
.get(8)
.ok_or(eyre!("Current reading is missing bytes sent value"))?;
let prev_interface_bytes_tx = previous_net_stats
.get(8)
.ok_or(eyre!("Previous reading is missing bytes sent value"))?;
let interface_bytes_tx = curr_interface_bytes_tx.checked_sub(*prev_interface_bytes_tx);
let interface_rx_key = MetricStringKey::from_str(
format!("interface/{}/total_bytes/rx", interface).as_str(),
)
.map_err(|e| eyre!("Couldn't construct metric key: {}", e))?;
let interface_tx_key = MetricStringKey::from_str(
format!("interface/{}/total_bytes/tx", interface).as_str(),
)
.map_err(|e| eyre!("Couldn't construct metric key: {}", e))?;
let mut interface_counter_readings = [
(interface_tx_key, interface_bytes_tx),
(interface_rx_key, interface_bytes_rx),
]
.into_iter()
.filter_map(|(key, value)| {
value.map(|v| KeyedMetricReading::new_counter(key, v as f64))
})
.collect::<Vec<KeyedMetricReading>>();
let current_period_rates =
current_reading
.stats
.iter()
.zip(previous_net_stats)
.map(|(current, previous)| {
if *current >= previous {
Some(
(*current - previous) as f64
/ current_reading
.reading_time
.since(&previous_reading_time)
.as_secs_f64(),
)
} else {
None
}
});
let net_keys_with_stats = zip(
[
"bytes_per_second/rx",
"packets_per_second/rx",
"errors_per_second/rx",
"dropped_per_second/rx",
"fifo/rx",
"frame/rx",
"compressed/rx",
"multicast/rx",
"bytes_per_second/tx",
"packets_per_second/tx",
"errors_per_second/tx",
"dropped_per_second/tx",
"fifo/tx",
"colls/tx",
"carrier/tx",
"compressed/tx",
],
current_period_rates,
)
.filter_map(|(key, value)| {
match (NETWORK_INTERFACE_METRIC_KEYS.contains(&key), value) {
(true, Some(value)) => Some((key, value)),
_ => None,
}
})
.collect::<Vec<(&str, f64)>>();
let timestamp = Utc::now();
let mut readings = net_keys_with_stats
.iter()
.map(|(key, value)| -> Result<KeyedMetricReading, ErrReport> {
Ok(KeyedMetricReading::new(
MetricStringKey::from_str(&format!(
"{}/{}/{}",
NETWORK_INTERFACE_METRIC_NAMESPACE, interface, key
))
.map_err(|e| eyre!(e))?,
MetricReading::Histogram {
value: *value,
timestamp,
},
))
})
.collect::<Result<Vec<KeyedMetricReading>>>()?;
readings.append(&mut interface_counter_readings);
Ok(readings)
} else {
Ok(vec![])
}
}
}
impl<T> SystemMetricFamilyCollector for NetworkInterfaceMetricCollector<T>
where
T: TimeMeasure + Copy + Ord + std::ops::Add<Duration, Output = T> + Send + Sync + 'static,
{
fn family_name(&self) -> &'static str {
NETWORK_INTERFACE_METRIC_NAMESPACE
}
fn collect_metrics(&mut self) -> Result<Vec<KeyedMetricReading>> {
let mut network_metrics = self.get_network_interface_metrics()?;
let net_wireless_metrics = self.get_wireless_interface_metrics()?;
network_metrics.extend(net_wireless_metrics);
Ok(network_metrics)
}
}
#[cfg(test)]
mod test {
use insta::{assert_json_snapshot, rounded_redaction, with_settings};
use rstest::rstest;
use super::*;
use crate::test_utils::TestInstant;
#[rstest]
#[case(" eth0: 2707 25 0 0 0 0 0 0 2707 25 0 0 0 0 0 0", "eth0")]
#[case("wlan1: 2707 25 0 0 0 0 0 0 2707 25 0 0 0 0 0 0", "wlan1")]
fn test_parse_netdev_line(#[case] proc_net_dev_line: &str, #[case] test_name: &str) {
assert_json_snapshot!(test_name,
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(proc_net_dev_line).unwrap(),
{"[].value.**.timestamp" => "[timestamp]", "[].value.**.value" => rounded_redaction(5)})
}
#[rstest]
#[case("wlan0: 0000 56. -54. -256 0 0 0 0 38 0")]
#[case("wlan0: 0000 56 -54 -256 0 0 0 0 38 0")]
fn test_parse_netwireless_line(#[case] proc_net_dev_line: &str) {
let result = NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_wireless_line(
proc_net_dev_line,
);
assert!(result.is_ok());
let (interface, stats) = result.unwrap();
assert_eq!(interface, "wlan0");
assert_eq!(stats, [56, -54]);
}
#[rstest]
#[case("wlan0 2707 25 0 0 0 0 0 0 2707 25 0 0 0 0 0 0")]
#[case("wlan0: 2707 0 0 0 0 0 0 2707 25 0 0 0 0 0 0")]
fn test_fails_on_invalid_proc_net_dev_line(#[case] proc_net_dev_line: &str) {
assert!(
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(
proc_net_dev_line
)
.is_err()
)
}
#[rstest]
#[case(
" eth0: 1000 25 0 0 0 0 0 0 2000 25 0 0 0 0 0 0",
" eth0: 2500 80 10 10 0 0 0 0 3000 50 0 0 0 0 0 0",
" eth0: 5000 100 15 15 0 0 0 0 5000 75 20 20 0 0 0 0",
"basic_delta"
)]
#[case(
" eth0: 4294967293 25 0 0 0 0 0 0 2000 25 0 0 0 0 0 0",
" eth0: 2498 80 10 10 0 0 0 0 3000 50 0 0 0 0 0 0",
" eth0: 5000 100 15 15 0 0 0 0 5000 75 20 20 0 0 0 0",
"with_overflow"
)]
fn test_net_if_metric_collector_calcs(
#[case] proc_net_dev_line_a: &str,
#[case] proc_net_dev_line_b: &str,
#[case] proc_net_dev_line_c: &str,
#[case] test_name: &str,
) {
let mut net_metric_collector = NetworkInterfaceMetricCollector::<TestInstant>::new(
NetworkInterfaceMetricsConfig::Interfaces(HashSet::from_iter(["eth0".to_string()])),
);
let (net_if, stats) =
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(
proc_net_dev_line_a,
)
.unwrap();
let reading_a = ProcNetDevReading {
stats,
reading_time: TestInstant::now(),
};
let result_a = net_metric_collector.calculate_network_metrics(net_if, reading_a);
assert!(result_a.unwrap().is_empty());
TestInstant::sleep(Duration::from_secs(10));
let (net_if, stats) =
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(
proc_net_dev_line_b,
)
.unwrap();
let reading_b = ProcNetDevReading {
stats,
reading_time: TestInstant::now(),
};
let result_b = net_metric_collector.calculate_network_metrics(net_if, reading_b);
assert!(result_b.is_ok());
with_settings!({sort_maps => true}, {
assert_json_snapshot!(format!("{}_{}", test_name, "a_b_metrics"),
result_b.unwrap(),
{"[].value.**.timestamp" => "[timestamp]", "[].value.**.value" => rounded_redaction(5)})
});
TestInstant::sleep(Duration::from_secs(30));
let (net_if, stats) =
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(
proc_net_dev_line_c,
)
.unwrap();
let reading_c = ProcNetDevReading {
stats,
reading_time: TestInstant::now(),
};
let result_c = net_metric_collector.calculate_network_metrics(net_if, reading_c);
assert!(result_c.is_ok());
with_settings!({sort_maps => true}, {
assert_json_snapshot!(format!("{}_{}", test_name, "b_c_metrics"),
result_c.unwrap(),
{"[].value.**.timestamp" => "[timestamp]", "[].value.**.value" => rounded_redaction(5)})
});
}
#[rstest]
#[case(
" eth0: 1000 25 0 0 0 0 0 0 2000 25 0 0 0 0 0 0",
" eth1: 2500 80 10 10 0 0 0 0 3000 50 0 0 0 0 0 0",
" eth0: 5000 100 15 15 0 0 0 0 5000 75 20 20 0 0 0 0",
" eth1: 3700 100 10 10 0 0 0 0 3200 50 0 0 0 0 0 0",
true,
"different_interfaces"
)]
fn test_net_if_metric_collector_different_if(
#[case] proc_net_dev_line_a: &str,
#[case] proc_net_dev_line_b: &str,
#[case] proc_net_dev_line_c: &str,
#[case] proc_net_dev_line_d: &str,
#[case] use_auto_config: bool,
#[case] test_name: &str,
) {
let mut net_metric_collector =
NetworkInterfaceMetricCollector::<TestInstant>::new(if use_auto_config {
NetworkInterfaceMetricsConfig::Auto
} else {
NetworkInterfaceMetricsConfig::Interfaces(HashSet::from_iter(["eth1".to_string()]))
});
let (net_if, stats) =
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(
proc_net_dev_line_a,
)
.unwrap();
let reading_a = ProcNetDevReading {
stats,
reading_time: TestInstant::now(),
};
let result_a = net_metric_collector.calculate_network_metrics(net_if, reading_a);
assert!(result_a.unwrap().is_empty());
TestInstant::sleep(Duration::from_secs(10));
let (net_if, stats) =
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(
proc_net_dev_line_b,
)
.unwrap();
let reading_b = ProcNetDevReading {
stats,
reading_time: TestInstant::now(),
};
let result_b = net_metric_collector.calculate_network_metrics(net_if, reading_b);
assert!(result_b.unwrap().is_empty());
TestInstant::sleep(Duration::from_secs(30));
let (net_if, stats) =
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(
proc_net_dev_line_c,
)
.unwrap();
let reading_c = ProcNetDevReading {
stats,
reading_time: TestInstant::now(),
};
let result_c = net_metric_collector.calculate_network_metrics(net_if, reading_c);
assert!(result_c.is_ok());
with_settings!({sort_maps => true}, {
assert_json_snapshot!(format!("{}_{}", test_name, "a_c_metrics"),
result_c.unwrap(),
{"[].value.**.timestamp" => "[timestamp]", "[].value.**.value" => rounded_redaction(5)})
});
TestInstant::sleep(Duration::from_secs(30));
let (net_if, stats) =
NetworkInterfaceMetricCollector::<TestInstant>::parse_proc_net_dev_line(
proc_net_dev_line_d,
)
.unwrap();
let reading_d = ProcNetDevReading {
stats,
reading_time: TestInstant::now(),
};
let result_d = net_metric_collector.calculate_network_metrics(net_if, reading_d);
assert!(result_d.is_ok());
with_settings!({sort_maps => true}, {
assert_json_snapshot!(format!("{}_{}", test_name, "b_d_metrics"),
result_d.unwrap(),
{"[].value.**.timestamp" => "[timestamp]", "[].value.**.value" => rounded_redaction(5)})
});
}
#[rstest]
#[case(vec!["eth0".to_string(), "wlan1".to_string()], "eth1", false)]
#[case(vec!["eth0".to_string(), "wlan1".to_string()], "eth0", true)]
#[case(vec!["eth0".to_string(), "wlan1".to_string()], "enp0s10", false)]
#[case(vec!["eth0".to_string(), "wlan1".to_string()], "wlan1", true)]
fn test_interface_is_monitored(
#[case] monitored_interfaces: Vec<String>,
#[case] interface: &str,
#[case] should_be_monitored: bool,
) {
let net_metric_collector = NetworkInterfaceMetricCollector::<TestInstant>::new(
NetworkInterfaceMetricsConfig::Interfaces(HashSet::from_iter(monitored_interfaces)),
);
assert_eq!(
net_metric_collector.interface_is_monitored(interface),
should_be_monitored
)
}
#[rstest]
#[case("eth1", true)]
#[case("eth0", true)]
#[case("enp0s10", true)]
#[case("wlan1", true)]
#[case("tun0", false)]
#[case("dummy1", false)]
#[case("lo1", false)]
#[case("vethcdd37e7", false)]
#[case("usb1047", false)]
fn test_interface_is_monitored_auto(
#[case] interface: &str,
#[case] should_be_monitored: bool,
) {
let net_metric_collector = NetworkInterfaceMetricCollector::<TestInstant>::new(
NetworkInterfaceMetricsConfig::Auto,
);
assert_eq!(
net_metric_collector.interface_is_monitored(interface),
should_be_monitored
)
}
}