use std::{
collections::HashMap,
fs::File,
io::{BufRead, BufReader},
iter::zip,
path::Path,
str::FromStr,
};
use chrono::Utc;
use log::debug;
use nom::{
bytes::complete::tag,
character::complete::{space1, u64},
multi::count,
sequence::preceded,
IResult,
};
use crate::{
metrics::{
core_metrics::METRIC_CPU_USAGE_PCT, system_metrics::SystemMetricFamilyCollector,
KeyedMetricReading, MetricReading, MetricStringKey,
},
util::math::counter_delta_with_overflow,
};
use eyre::{eyre, ErrReport, Result};
const PROC_STAT_PATH: &str = "/proc/stat";
pub const CPU_METRIC_NAMESPACE: &str = "cpu";
pub struct CpuMetricCollector {
last_reading: Option<Vec<u64>>,
}
impl CpuMetricCollector {
pub fn new() -> Self {
Self { last_reading: None }
}
pub fn get_cpu_metrics(&mut self) -> Result<Vec<KeyedMetricReading>> {
let mut no_parseable_lines = true;
let path = Path::new(PROC_STAT_PATH);
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut cpu_metric_readings = vec![];
for line in reader.lines() {
if let Ok(cpu_stats) = Self::parse_proc_stat_line_cpu(line?.trim()) {
no_parseable_lines = false;
if let Ok(Some(mut readings)) = self.delta_since_last_reading(cpu_stats) {
cpu_metric_readings.append(&mut readings);
}
break;
}
}
if !no_parseable_lines {
Ok(cpu_metric_readings)
} else {
Err(eyre!(
"No CPU metrics were collected from {} - is it a properly formatted /proc/stat file?",
PROC_STAT_PATH
))
}
}
fn parse_cpu_stats(input: &str) -> IResult<&str, Vec<u64>> {
preceded(tag("cpu"), count(preceded(space1, u64), 7))(input)
}
fn parse_proc_stat_line_cpu(line: &str) -> Result<Vec<u64>> {
let (_, cpu_stats) = Self::parse_cpu_stats(line)
.map_err(|_e| eyre!("Failed to parse CPU stats line: {}", line))?;
Ok(cpu_stats)
}
fn delta_since_last_reading(
&mut self,
cpu_stats: Vec<u64>,
) -> Result<Option<Vec<KeyedMetricReading>>> {
if let Some(last_stats) = self.last_reading.replace(cpu_stats.clone()) {
let delta = cpu_stats
.iter()
.zip(last_stats)
.map(|(current, previous)| counter_delta_with_overflow(*current, previous));
let cpu_states_with_ticks = zip(
["user", "nice", "system", "idle", "iowait", "irq", "softirq"],
delta,
)
.collect::<HashMap<&str, u64>>();
let sum: f64 = cpu_states_with_ticks.values().sum::<u64>() as f64;
let timestamp = Utc::now();
let readings = cpu_states_with_ticks
.iter()
.map(|(key, value)| -> Result<KeyedMetricReading, ErrReport> {
Ok(KeyedMetricReading::new(
MetricStringKey::from_str(&format!(
"{}/cpu/percent/{}",
CPU_METRIC_NAMESPACE, key
))
.map_err(|e| eyre!(e))?,
MetricReading::Histogram {
value: 100.0 * *value as f64 / sum,
timestamp,
},
))
})
.collect::<Result<Vec<KeyedMetricReading>>>()?;
if sum > 0.0 {
let _cpu_usage_pct = ((sum - cpu_states_with_ticks["idle"] as f64) / sum) * 100.0;
let _cpu_usage_pct_key =
MetricStringKey::from_str(METRIC_CPU_USAGE_PCT).map_err(|e| {
eyre!("Failed to construct MetricStringKey for used memory: {}", e)
})?;
} else {
debug!("Sum of time spent in all CPU states is <= 0 - this is probably incorrect.")
}
Ok(Some(readings))
} else {
Ok(None)
}
}
}
impl SystemMetricFamilyCollector for CpuMetricCollector {
fn family_name(&self) -> &'static str {
CPU_METRIC_NAMESPACE
}
fn collect_metrics(&mut self) -> Result<Vec<KeyedMetricReading>> {
self.get_cpu_metrics()
}
}
#[cfg(test)]
mod test {
use insta::{assert_json_snapshot, rounded_redaction, with_settings};
use rstest::rstest;
use super::*;
#[rstest]
#[case("cpu 1000 5 0 0 2 0 0", "test_basic_line")]
fn test_process_valid_proc_stat_line(#[case] proc_stat_line: &str, #[case] test_name: &str) {
assert_json_snapshot!(test_name,
CpuMetricCollector::parse_proc_stat_line_cpu(proc_stat_line).unwrap(),
{"[].value.**.timestamp" => "[timestamp]", "[].value.**.value" => rounded_redaction(5)})
}
#[rstest]
#[case("cpu 1000 5 0 0 2")]
#[case("1000 5 0 0 2 0 0 0 0 0")]
#[case("processor0 1000 5 0 0 2 0 0 0 0 0")]
#[case("softirq 403453672 10204651 21667771 199 12328940 529390 0 3519783 161759969 147995 193294974")]
fn test_fails_on_invalid_proc_stat_line(#[case] proc_stat_line: &str) {
assert!(CpuMetricCollector::parse_proc_stat_line_cpu(proc_stat_line).is_err())
}
#[rstest]
#[case(
"cpu 1000 5 0 0 2 0 0",
"cpu 1500 20 4 1 2 0 0",
"cpu 1550 200 40 3 3 0 0",
"basic_delta"
)]
fn test_cpu_metric_collector_calcs(
#[case] proc_stat_line_a: &str,
#[case] proc_stat_line_b: &str,
#[case] proc_stat_line_c: &str,
#[case] test_name: &str,
) {
let mut cpu_metric_collector = CpuMetricCollector::new();
let stats = CpuMetricCollector::parse_proc_stat_line_cpu(proc_stat_line_a).unwrap();
let result_a = cpu_metric_collector.delta_since_last_reading(stats);
matches!(result_a, Ok(None));
let stats = CpuMetricCollector::parse_proc_stat_line_cpu(proc_stat_line_b).unwrap();
let mut result_b = cpu_metric_collector
.delta_since_last_reading(stats)
.unwrap()
.unwrap();
result_b.sort_by(|a, b| a.name.cmp(&b.name));
with_settings!({sort_maps => true}, {
assert_json_snapshot!(format!("{}_{}", test_name, "a_b_metrics"),
result_b,
{"[].value.**.timestamp" => "[timestamp]", "[].value.**.value" => rounded_redaction(5)})
});
let stats = CpuMetricCollector::parse_proc_stat_line_cpu(proc_stat_line_c).unwrap();
let mut result_c = cpu_metric_collector
.delta_since_last_reading(stats)
.unwrap()
.unwrap();
result_c.sort_by(|a, b| a.name.cmp(&b.name));
with_settings!({sort_maps => true}, {
assert_json_snapshot!(format!("{}_{}", test_name, "b_c_metrics"),
result_c,
{"[].value.**.timestamp" => "[timestamp]", "[].value.**.value" => rounded_redaction(5)})
});
}
}