use crate::grammar::{Episode, MotifClass};
use crate::streaming::StreamingIngestor;
use std::fmt::Write;
#[derive(Debug, Clone, Default)]
pub struct MetricsSnapshot {
pub per_motif_count: [u64; MotifClass::ALL.len()],
pub per_motif_last_peak: [f64; MotifClass::ALL.len()],
pub per_motif_last_trust_sum: [f64; MotifClass::ALL.len()],
pub streaming_staged: u64,
pub streaming_flushed: u64,
pub streaming_dropped_out_of_window: u64,
}
impl MetricsSnapshot {
pub fn from_episodes(episodes: &[Episode]) -> Self {
let mut snap = MetricsSnapshot::default();
for v in snap.per_motif_last_peak.iter_mut() {
*v = f64::NAN;
}
for v in snap.per_motif_last_trust_sum.iter_mut() {
*v = f64::NAN;
}
for e in episodes {
let idx = motif_index(e.motif);
snap.per_motif_count[idx] += 1;
snap.per_motif_last_peak[idx] = e.peak;
snap.per_motif_last_trust_sum[idx] = e.trust_sum;
}
snap
}
pub fn with_streaming(mut self, ing: &StreamingIngestor) -> Self {
self.streaming_staged = ing.staged() as u64;
self.streaming_flushed = ing.flushed() as u64;
self.streaming_dropped_out_of_window = ing.dropped_out_of_window();
self
}
}
fn motif_index(m: MotifClass) -> usize {
MotifClass::ALL
.iter()
.position(|x| *x == m)
.expect("MotifClass::ALL covers every motif")
}
pub fn render_openmetrics(snap: &MetricsSnapshot) -> String {
let mut out = String::with_capacity(2048);
writeln!(
out,
"# HELP dsfb_episodes_total Total motif episodes emitted since process start."
)
.unwrap();
writeln!(out, "# TYPE dsfb_episodes_total counter").unwrap();
for (i, motif) in MotifClass::ALL.iter().enumerate() {
writeln!(
out,
"dsfb_episodes_total{{motif=\"{}\"}} {}",
motif.name(),
snap.per_motif_count[i]
)
.unwrap();
}
writeln!(
out,
"# HELP dsfb_episode_peak_last Peak |residual| of the most recently closed episode per motif."
)
.unwrap();
writeln!(out, "# TYPE dsfb_episode_peak_last gauge").unwrap();
for (i, motif) in MotifClass::ALL.iter().enumerate() {
writeln!(
out,
"dsfb_episode_peak_last{{motif=\"{}\"}} {}",
motif.name(),
fmt_f64(snap.per_motif_last_peak[i])
)
.unwrap();
}
writeln!(
out,
"# HELP dsfb_episode_trust_sum_last Trust-sum observed at the most recent episode boundary."
)
.unwrap();
writeln!(out, "# TYPE dsfb_episode_trust_sum_last gauge").unwrap();
for (i, motif) in MotifClass::ALL.iter().enumerate() {
writeln!(
out,
"dsfb_episode_trust_sum_last{{motif=\"{}\"}} {}",
motif.name(),
fmt_f64(snap.per_motif_last_trust_sum[i])
)
.unwrap();
}
writeln!(
out,
"# HELP dsfb_streaming_residuals_staged Residuals currently held in the streaming reorder buffer."
)
.unwrap();
writeln!(out, "# TYPE dsfb_streaming_residuals_staged gauge").unwrap();
writeln!(
out,
"dsfb_streaming_residuals_staged {}",
snap.streaming_staged
)
.unwrap();
writeln!(
out,
"# HELP dsfb_streaming_residuals_flushed_total Residuals moved from the reorder buffer to the canonical stream."
)
.unwrap();
writeln!(out, "# TYPE dsfb_streaming_residuals_flushed_total counter").unwrap();
writeln!(
out,
"dsfb_streaming_residuals_flushed_total {}",
snap.streaming_flushed
)
.unwrap();
writeln!(
out,
"# HELP dsfb_streaming_residuals_dropped_out_of_window_total Residuals dropped because they arrived outside the reorder window."
)
.unwrap();
writeln!(
out,
"# TYPE dsfb_streaming_residuals_dropped_out_of_window_total counter"
)
.unwrap();
writeln!(
out,
"dsfb_streaming_residuals_dropped_out_of_window_total {}",
snap.streaming_dropped_out_of_window
)
.unwrap();
out.push_str("# EOF\n");
out
}
fn fmt_f64(x: f64) -> String {
if x.is_nan() {
"NaN".to_string()
} else if x.is_infinite() {
if x > 0.0 {
"+Inf".to_string()
} else {
"-Inf".to_string()
}
} else {
format!("{:.6}", x)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::grammar::Episode;
fn ep(motif: MotifClass, peak: f64, trust: f64) -> Episode {
Episode {
motif,
channel: None,
t_start: 0.0,
t_end: 1.0,
peak,
ema_at_boundary: 0.0,
trust_sum: trust,
}
}
#[test]
fn empty_snapshot_has_zero_counts_and_nan_peaks() {
let snap = MetricsSnapshot::from_episodes(&[]);
for c in snap.per_motif_count {
assert_eq!(c, 0);
}
for p in snap.per_motif_last_peak {
assert!(p.is_nan());
}
let text = render_openmetrics(&snap);
assert!(text.contains("dsfb_episodes_total{motif=\"plan_regression_onset\"} 0"));
assert!(text.contains("dsfb_episode_peak_last{motif=\"plan_regression_onset\"} NaN"));
assert!(text.ends_with("# EOF\n"));
}
#[test]
fn counts_and_last_peak_track_episode_order() {
let eps = vec![
ep(MotifClass::PlanRegressionOnset, 0.5, 1.0),
ep(MotifClass::PlanRegressionOnset, 0.8, 1.0),
ep(MotifClass::CacheCollapse, 0.3, 1.0),
];
let snap = MetricsSnapshot::from_episodes(&eps);
let idx_plan = motif_index(MotifClass::PlanRegressionOnset);
let idx_cache = motif_index(MotifClass::CacheCollapse);
assert_eq!(snap.per_motif_count[idx_plan], 2);
assert_eq!(snap.per_motif_count[idx_cache], 1);
assert!((snap.per_motif_last_peak[idx_plan] - 0.8).abs() < 1e-12);
assert!((snap.per_motif_last_peak[idx_cache] - 0.3).abs() < 1e-12);
}
#[test]
fn openmetrics_output_is_deterministic() {
let eps = vec![
ep(MotifClass::ContentionRamp, 1.2, 1.0),
ep(MotifClass::WorkloadPhaseTransition, 0.4, 1.0),
];
let snap = MetricsSnapshot::from_episodes(&eps);
let a = render_openmetrics(&snap);
let b = render_openmetrics(&snap);
assert_eq!(a, b);
}
#[test]
fn streaming_fold_propagates_counters() {
let mut ing = crate::streaming::StreamingIngestor::with_window("t", 1.0);
ing.push(crate::residual::ResidualSample::new(
0.0,
crate::residual::ResidualClass::PlanRegression,
0.1,
));
let snap = MetricsSnapshot::from_episodes(&[]).with_streaming(&ing);
let text = render_openmetrics(&snap);
assert!(text.contains("dsfb_streaming_residuals_staged 1"));
assert!(text.contains("dsfb_streaming_residuals_dropped_out_of_window_total 0"));
}
}