use chrono::Utc;
use eyre::{eyre, Result};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
path::Path,
time::{Duration, Instant},
};
use crate::{
mar::{MarEntryBuilder, Metadata},
metrics::{
metric_reading::KeyedMetricReading,
timeseries::{Counter, Gauge, Histogram, TimeSeries, TimeWeightedAverage},
MetricReading, MetricStringKey, MetricValue, SessionName,
},
util::{system::get_system_clock, wildcard_pattern::WildcardPattern},
};
use super::{
core_metrics::{
METRIC_CPU_USAGE_PCT, METRIC_CPU_USAGE_PROCESS_PCT_PREFIX,
METRIC_CPU_USAGE_PROCESS_PCT_SUFFIX, METRIC_MEMORY_PCT, METRIC_MEMORY_PROCESS_PCT_PREFIX,
METRIC_MEMORY_PROCESS_PCT_SUFFIX,
},
internal_metrics::{INTERNAL_METRIC_MAR_CLEANER_DURATION, INTERNAL_METRIC_MAR_ENTRY_COUNT},
system_metrics::{
METRIC_INTERFACE_BYTES_PER_SECOND_RX_SUFFIX, METRIC_INTERFACE_BYTES_PER_SECOND_TX_SUFFIX,
NETWORK_INTERFACE_METRIC_NAMESPACE, THERMAL_METRIC_NAMESPACE,
},
timeseries::{Bool, ReportTag, RssiAverage},
};
pub enum CapturedMetrics {
All,
Metrics(MetricsSet),
}
#[derive(Clone)]
pub struct MetricsSet {
pub metric_keys: HashSet<MetricStringKey>,
pub wildcard_metric_keys: Vec<WildcardPattern>,
}
fn histo_min_max_keys() -> MetricsSet {
MetricsSet {
metric_keys: HashSet::from_iter([
MetricStringKey::from(METRIC_CPU_USAGE_PCT),
MetricStringKey::from(METRIC_MEMORY_PCT),
MetricStringKey::from(INTERNAL_METRIC_MAR_CLEANER_DURATION),
MetricStringKey::from(INTERNAL_METRIC_MAR_ENTRY_COUNT),
]),
wildcard_metric_keys: vec![
WildcardPattern::new(
METRIC_CPU_USAGE_PROCESS_PCT_PREFIX,
METRIC_CPU_USAGE_PROCESS_PCT_SUFFIX,
),
WildcardPattern::new(
METRIC_MEMORY_PROCESS_PCT_PREFIX,
METRIC_MEMORY_PROCESS_PCT_SUFFIX,
),
WildcardPattern::new(
NETWORK_INTERFACE_METRIC_NAMESPACE,
METRIC_INTERFACE_BYTES_PER_SECOND_RX_SUFFIX,
),
WildcardPattern::new(
NETWORK_INTERFACE_METRIC_NAMESPACE,
METRIC_INTERFACE_BYTES_PER_SECOND_TX_SUFFIX,
),
WildcardPattern::new(THERMAL_METRIC_NAMESPACE, ""),
],
}
}
impl MetricsSet {
pub fn empty() -> Self {
MetricsSet {
metric_keys: HashSet::new(),
wildcard_metric_keys: Vec::new(),
}
}
pub fn from_metric_keys(keys: &[MetricStringKey]) -> Self {
MetricsSet {
metric_keys: keys.iter().cloned().collect(),
wildcard_metric_keys: Vec::new(),
}
}
fn extend(&mut self, other: MetricsSet) {
self.metric_keys.extend(other.metric_keys);
self.wildcard_metric_keys.extend(other.wildcard_metric_keys);
}
pub fn contains(&self, metric_string_key: &MetricStringKey) -> bool {
self.metric_keys.contains(metric_string_key)
|| self
.wildcard_metric_keys
.iter()
.any(|pattern| pattern.matches(metric_string_key.as_str()))
}
}
pub const HEARTBEAT_REPORT_TYPE: &str = "heartbeat";
pub const DAILY_HEARTBEAT_REPORT_TYPE: &str = "daily-heartbeat";
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
pub enum MetricReportType {
#[serde(rename = "heartbeat")]
Heartbeat,
#[serde(rename = "session")]
Session(SessionName),
#[serde(rename = "daily-heartbeat")]
DailyHeartbeat,
}
impl MetricReportType {
pub fn as_str(&self) -> &str {
match self {
MetricReportType::Heartbeat => HEARTBEAT_REPORT_TYPE,
MetricReportType::Session(session_name) => session_name.as_str(),
MetricReportType::DailyHeartbeat => DAILY_HEARTBEAT_REPORT_TYPE,
}
}
}
pub struct MetricReport {
metrics: HashMap<MetricStringKey, Box<dyn TimeSeries + Send>>,
start: Instant,
boottime_start: Option<Duration>,
captured_metrics: CapturedMetrics,
report_type: MetricReportType,
histo_min_max_metrics: MetricsSet,
}
struct MetricReportSnapshot {
duration: Duration,
boottime_duration: Option<Duration>,
metrics: HashMap<MetricStringKey, MetricValue>,
}
impl MetricReport {
pub fn new(
report_type: MetricReportType,
captured_metrics: CapturedMetrics,
extra_histo_min_max: MetricsSet,
) -> Self {
let mut histo_min_max_metrics = histo_min_max_keys();
histo_min_max_metrics.extend(extra_histo_min_max);
Self {
metrics: HashMap::new(),
start: Instant::now(),
boottime_start: get_system_clock(crate::util::system::Clock::Boottime).ok(),
captured_metrics,
report_type,
histo_min_max_metrics,
}
}
pub fn new_heartbeat() -> Self {
MetricReport::new(
MetricReportType::Heartbeat,
CapturedMetrics::All,
MetricsSet::empty(),
)
}
pub fn new_daily_heartbeat() -> Self {
MetricReport::new(
MetricReportType::DailyHeartbeat,
CapturedMetrics::All,
MetricsSet::empty(),
)
}
fn is_captured(&self, metric_key: &MetricStringKey) -> bool {
match &self.captured_metrics {
CapturedMetrics::Metrics(metric_keys) => metric_keys.contains(metric_key),
CapturedMetrics::All => true,
}
}
pub fn add_metric(&mut self, m: KeyedMetricReading) -> Result<()> {
if self.is_captured(&m.name) {
match self.metrics.entry(m.name) {
std::collections::hash_map::Entry::Occupied(mut o) => {
let state = o.get_mut();
if let Err(e) = (*state).aggregate(&m.value) {
*state = Self::select_aggregate_for(&m.value)?;
log::warn!(
"New value for metric {} is incompatible ({}). Resetting timeseries.",
o.key(),
e
);
}
}
std::collections::hash_map::Entry::Vacant(v) => {
let timeseries = Self::select_aggregate_for(&m.value)?;
v.insert(timeseries);
}
};
}
Ok(())
}
pub fn increment_counter(&mut self, name: &str) -> Result<()> {
self.add_to_counter(name, 1.0)
}
pub fn add_to_counter(&mut self, name: &str, value: f64) -> Result<()> {
match name.parse::<MetricStringKey>() {
Ok(metric_name) => self.add_metric(KeyedMetricReading::new(
metric_name,
MetricReading::Counter {
value,
timestamp: Utc::now(),
},
)),
Err(e) => Err(eyre!("Invalid metric name: {} - {}", name, e)),
}
}
pub fn take_metrics(&mut self) -> HashMap<MetricStringKey, MetricValue> {
self.take_metric_report_snapshot().metrics
}
fn take_metric_report_snapshot(&mut self) -> MetricReportSnapshot {
let duration = std::mem::replace(&mut self.start, Instant::now()).elapsed();
let time_since_boot = get_system_clock(crate::util::system::Clock::Boottime).ok();
let boottime_duration = match (self.boottime_start, time_since_boot) {
(Some(boottime_start), Some(boottime_end)) => Some(boottime_end - boottime_start),
_ => None,
};
let metrics = std::mem::take(&mut self.metrics)
.into_iter()
.flat_map(|(name, state)| match state.value() {
MetricValue::Histogram(histo) => {
if self.histo_min_max_metrics.contains(&name) {
vec![
(name.with_suffix("_max"), histo.max()),
(name.with_suffix("_min"), histo.min()),
(name, histo.avg()),
]
} else {
vec![(name, histo.avg())]
}
}
_ => vec![(name, state.value())],
})
.collect();
MetricReportSnapshot {
duration,
boottime_duration,
metrics,
}
}
fn log_metrics_snapshot(&self, snapshot: &MetricReportSnapshot) {
log::trace!(
"🔍 MEMFAULTD_DEBUG: Serializing {} metrics for report type '{}':",
snapshot.metrics.len(),
self.report_type.as_str()
);
let json_metrics: std::collections::BTreeMap<String, String> = snapshot
.metrics
.iter()
.map(|(key, value)| (key.to_string(), value.to_string()))
.collect();
if let Ok(json_str) = serde_json::to_string_pretty(&json_metrics) {
log::trace!("{}", json_str);
} else {
log::trace!("❌ Failed to serialize metrics to JSON");
}
log::trace!("📊 Duration: {:.2}s", snapshot.duration.as_secs_f64());
if let Some(boottime_duration) = snapshot.boottime_duration {
log::trace!(
"⏱️ Boottime duration: {:.2}s",
boottime_duration.as_secs_f64()
);
}
}
pub fn prepare_metric_report(
&mut self,
mar_staging_area: &Path,
) -> Result<Option<MarEntryBuilder<Metadata>>> {
let snapshot = self.take_metric_report_snapshot();
if snapshot.metrics.is_empty() {
return Ok(None);
}
if log::log_enabled!(log::Level::Trace) {
self.log_metrics_snapshot(&snapshot);
}
Ok(Some(MarEntryBuilder::new(mar_staging_area)?.set_metadata(
Metadata::new_metric_report(
snapshot.metrics,
snapshot.duration,
snapshot.boottime_duration,
self.report_type.clone(),
),
)))
}
fn select_aggregate_for(event: &MetricReading) -> Result<Box<dyn TimeSeries + Send>> {
match event {
MetricReading::Histogram { .. } => Ok(Box::new(Histogram::new(event)?)),
MetricReading::Counter { .. } => Ok(Box::new(Counter::new(event)?)),
MetricReading::Gauge { .. } => Ok(Box::new(Gauge::new(event)?)),
MetricReading::Rssi { .. } => Ok(Box::new(RssiAverage::new(event)?)),
MetricReading::TimeWeightedAverage { .. } => {
Ok(Box::new(TimeWeightedAverage::new(event)?))
}
MetricReading::ReportTag { .. } => Ok(Box::new(ReportTag::new(event)?)),
MetricReading::Bool { .. } => Ok(Box::new(Bool::new(event)?)),
}
}
pub fn report_type(&self) -> &MetricReportType {
&self.report_type
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use std::collections::BTreeMap;
use super::*;
use crate::{
metrics::core_metrics::CoreMetricKeys,
test_utils::{in_bools, in_counters, in_histograms},
};
use std::str::FromStr;
use insta::assert_json_snapshot;
use rstest::rstest;
#[rstest]
#[case(in_histograms(vec![("foo", 1.0), ("bar", 2.0), ("baz", 3.0)]), "heartbeat_report_1")]
#[case(in_histograms(vec![("cpu_usage_memfaultd_pct", 1.0), ("cpu_usage_memfaultd_pct", 2.0), ("cpu_usage_memfaultd_pct", 3.0)]), "heartbeat_report_2")]
#[case(in_histograms(vec![("foo", 1.0), ("foo", 1.0)]), "heartbeat_report_3")]
#[case(in_histograms(vec![("memory_pct", 1.0), ("memory_pct", 2.0)]), "heartbeat_report_4")]
#[case(in_histograms(vec![("memory_systemd_pct", 1.0), ("memory_systemd_pct", 2.0), ("memory_systemd_pct", 2.0)]), "heartbeat_report_5")]
#[case(in_histograms(vec![("interface/eth0/bytes_per_second/rx", 1.0), ("interface/eth0/bytes_per_second/rx", 2.0), ("interface/eth0/bytes_per_second/rx", 2.0)]), "heartbeat_report_6")]
#[case(in_histograms(vec![("interface/eth0/packets_per_second/rx", 1.0), ("interface/eth0/packets_per_second/rx", 2.0), ("interface/eth0/packets_per_second/rx", 2.0)]), "heartbeat_report_7")]
fn test_aggregate_metrics(
#[case] metrics: impl Iterator<Item = KeyedMetricReading>,
#[case] test_name: &str,
) {
let mut metric_report = MetricReport::new_heartbeat();
for m in metrics {
metric_report.add_metric(m).unwrap();
}
let sorted_metrics: BTreeMap<_, _> = metric_report.take_metrics().into_iter().collect();
assert_json_snapshot!(test_name, sorted_metrics);
}
#[rstest]
#[case(in_histograms(vec![("foo", 1.0), ("bar", 2.0), ("baz", 3.0)]), "session_report_1")]
#[case(in_histograms(vec![("foo", 1.0), ("foo", 2.0), ("foo", 3.0)]), "session_report_2")]
#[case(in_histograms(vec![("foo", 1.0), ("foo", 1.0)]), "session_report_3")]
#[case(in_histograms(vec![("foo", 1.0), ("foo", 2.0)]), "session_report_4")]
#[case(in_histograms(vec![("foo", 1.0), ("foo", 2.0), ("baz", 2.0), ("bar", 3.0)]), "session_report_5")]
#[case(in_counters(vec![("operational_crashes", 2.0), ("operational_crashes_memfaultd", 3.0), ("operational_crashes_memfaultd", 2.0), ("crashes_systemd", 3.0)]), "session_report_6")]
fn test_aggregate_metrics_session(
#[case] metrics: impl Iterator<Item = KeyedMetricReading>,
#[case] test_name: &str,
) {
let session_core_metrics = CoreMetricKeys::get_session_core_metrics();
let mut metric_keys = vec![
MetricStringKey::from_str("foo").unwrap(),
MetricStringKey::from_str("baz").unwrap(),
];
metric_keys.extend(session_core_metrics.string_keys);
let mut metric_report = MetricReport::new(
MetricReportType::Session(SessionName::from_str("foo_only").unwrap()),
CapturedMetrics::Metrics(MetricsSet {
metric_keys: HashSet::from_iter(metric_keys),
wildcard_metric_keys: session_core_metrics.wildcard_pattern_keys,
}),
MetricsSet::empty(),
);
for m in metrics {
metric_report.add_metric(m).unwrap();
}
let sorted_metrics: BTreeMap<_, _> = metric_report.take_metrics().into_iter().collect();
assert_json_snapshot!(test_name, sorted_metrics);
}
#[rstest]
#[case(in_bools(vec![("foo", true), ("bar", true), ("foo", false)]), "overwrite_previous_reading")]
fn test_boolean_metrics(
#[case] metrics: impl Iterator<Item = KeyedMetricReading>,
#[case] test_name: &str,
) {
let mut metric_report = MetricReport::new_heartbeat();
for m in metrics {
metric_report.add_metric(m).unwrap();
}
let sorted_metrics: BTreeMap<_, _> = metric_report.take_metrics().into_iter().collect();
assert_json_snapshot!(test_name, sorted_metrics);
}
#[rstest]
fn test_empty_after_write() {
let mut metric_report = MetricReport::new_heartbeat();
for m in in_histograms(vec![("foo", 1.0), ("bar", 2.0), ("baz", 3.0)]) {
metric_report.add_metric(m).unwrap();
}
let tempdir = TempDir::new().unwrap();
let _ = metric_report.prepare_metric_report(tempdir.path());
assert_eq!(metric_report.take_metrics().len(), 0);
}
#[rstest]
#[case(
in_histograms(vec![("custom_gauge", 1.0), ("custom_gauge", 2.0), ("custom_gauge", 3.0)]),
"extra_min_max_exact_key"
)]
#[case(
in_histograms(vec![("memory_pct", 1.0), ("memory_pct", 3.0), ("custom_gauge", 5.0), ("custom_gauge", 7.0), ("foo", 4.0)]),
"extra_min_max_merged_with_hardcoded"
)]
fn test_extra_histo_min_max(
#[case] metrics: impl Iterator<Item = KeyedMetricReading>,
#[case] test_name: &str,
) {
let extra =
MetricsSet::from_metric_keys(&[MetricStringKey::from_str("custom_gauge").unwrap()]);
let mut metric_report =
MetricReport::new(MetricReportType::Heartbeat, CapturedMetrics::All, extra);
for m in metrics {
metric_report.add_metric(m).unwrap();
}
let sorted_metrics: BTreeMap<_, _> = metric_report.take_metrics().into_iter().collect();
assert_json_snapshot!(test_name, sorted_metrics);
}
fn key(s: &str) -> MetricStringKey {
MetricStringKey::from_str(s).unwrap()
}
#[test]
fn empty_contains_nothing() {
let set = MetricsSet::empty();
assert!(!set.contains(&key("foo")));
assert!(set.metric_keys.is_empty());
assert!(set.wildcard_metric_keys.is_empty());
}
#[test]
fn from_metric_keys_contains_provided_keys() {
let set = MetricsSet::from_metric_keys(&[key("custom_gauge"), key("my_metric")]);
assert!(set.contains(&key("custom_gauge")));
assert!(set.contains(&key("my_metric")));
assert!(!set.contains(&key("other_metric")));
}
#[test]
fn from_metric_keys_empty_slice_is_empty() {
let set = MetricsSet::from_metric_keys(&[]);
assert!(set.metric_keys.is_empty());
assert!(set.wildcard_metric_keys.is_empty());
}
#[test]
fn extend_merges_keys_from_both_sets() {
let mut a = MetricsSet::from_metric_keys(&[key("foo")]);
let b = MetricsSet::from_metric_keys(&[key("bar")]);
a.extend(b);
assert!(a.contains(&key("foo")));
assert!(a.contains(&key("bar")));
assert!(!a.contains(&key("baz")));
}
#[test]
fn extend_with_empty_is_noop() {
let mut a = MetricsSet::from_metric_keys(&[key("foo")]);
a.extend(MetricsSet::empty());
assert!(a.contains(&key("foo")));
assert_eq!(a.metric_keys.len(), 1);
}
}