dsfb_database/
metrics_exporter.rs1use crate::grammar::{Episode, MotifClass};
37use crate::streaming::StreamingIngestor;
38use std::fmt::Write;
39
40#[derive(Debug, Clone, Default)]
45pub struct MetricsSnapshot {
46 pub per_motif_count: [u64; MotifClass::ALL.len()],
48 pub per_motif_last_peak: [f64; MotifClass::ALL.len()],
50 pub per_motif_last_trust_sum: [f64; MotifClass::ALL.len()],
52 pub streaming_staged: u64,
54 pub streaming_flushed: u64,
56 pub streaming_dropped_out_of_window: u64,
59}
60
61impl MetricsSnapshot {
62 pub fn from_episodes(episodes: &[Episode]) -> Self {
66 let mut snap = MetricsSnapshot::default();
67 for v in snap.per_motif_last_peak.iter_mut() {
68 *v = f64::NAN;
69 }
70 for v in snap.per_motif_last_trust_sum.iter_mut() {
71 *v = f64::NAN;
72 }
73 for e in episodes {
74 let idx = motif_index(e.motif);
75 snap.per_motif_count[idx] += 1;
76 snap.per_motif_last_peak[idx] = e.peak;
77 snap.per_motif_last_trust_sum[idx] = e.trust_sum;
78 }
79 snap
80 }
81
82 pub fn with_streaming(mut self, ing: &StreamingIngestor) -> Self {
86 self.streaming_staged = ing.staged() as u64;
87 self.streaming_flushed = ing.flushed() as u64;
88 self.streaming_dropped_out_of_window = ing.dropped_out_of_window();
89 self
90 }
91}
92
93fn motif_index(m: MotifClass) -> usize {
94 MotifClass::ALL
99 .iter()
100 .position(|x| *x == m)
101 .expect("MotifClass::ALL covers every motif")
102}
103
104pub fn render_openmetrics(snap: &MetricsSnapshot) -> String {
108 let mut out = String::with_capacity(2048);
109 writeln!(
111 out,
112 "# HELP dsfb_episodes_total Total motif episodes emitted since process start."
113 )
114 .unwrap();
115 writeln!(out, "# TYPE dsfb_episodes_total counter").unwrap();
116 for (i, motif) in MotifClass::ALL.iter().enumerate() {
117 writeln!(
118 out,
119 "dsfb_episodes_total{{motif=\"{}\"}} {}",
120 motif.name(),
121 snap.per_motif_count[i]
122 )
123 .unwrap();
124 }
125 writeln!(
127 out,
128 "# HELP dsfb_episode_peak_last Peak |residual| of the most recently closed episode per motif."
129 )
130 .unwrap();
131 writeln!(out, "# TYPE dsfb_episode_peak_last gauge").unwrap();
132 for (i, motif) in MotifClass::ALL.iter().enumerate() {
133 writeln!(
134 out,
135 "dsfb_episode_peak_last{{motif=\"{}\"}} {}",
136 motif.name(),
137 fmt_f64(snap.per_motif_last_peak[i])
138 )
139 .unwrap();
140 }
141 writeln!(
143 out,
144 "# HELP dsfb_episode_trust_sum_last Trust-sum observed at the most recent episode boundary."
145 )
146 .unwrap();
147 writeln!(out, "# TYPE dsfb_episode_trust_sum_last gauge").unwrap();
148 for (i, motif) in MotifClass::ALL.iter().enumerate() {
149 writeln!(
150 out,
151 "dsfb_episode_trust_sum_last{{motif=\"{}\"}} {}",
152 motif.name(),
153 fmt_f64(snap.per_motif_last_trust_sum[i])
154 )
155 .unwrap();
156 }
157 writeln!(
159 out,
160 "# HELP dsfb_streaming_residuals_staged Residuals currently held in the streaming reorder buffer."
161 )
162 .unwrap();
163 writeln!(out, "# TYPE dsfb_streaming_residuals_staged gauge").unwrap();
164 writeln!(
165 out,
166 "dsfb_streaming_residuals_staged {}",
167 snap.streaming_staged
168 )
169 .unwrap();
170 writeln!(
171 out,
172 "# HELP dsfb_streaming_residuals_flushed_total Residuals moved from the reorder buffer to the canonical stream."
173 )
174 .unwrap();
175 writeln!(out, "# TYPE dsfb_streaming_residuals_flushed_total counter").unwrap();
176 writeln!(
177 out,
178 "dsfb_streaming_residuals_flushed_total {}",
179 snap.streaming_flushed
180 )
181 .unwrap();
182 writeln!(
183 out,
184 "# HELP dsfb_streaming_residuals_dropped_out_of_window_total Residuals dropped because they arrived outside the reorder window."
185 )
186 .unwrap();
187 writeln!(
188 out,
189 "# TYPE dsfb_streaming_residuals_dropped_out_of_window_total counter"
190 )
191 .unwrap();
192 writeln!(
193 out,
194 "dsfb_streaming_residuals_dropped_out_of_window_total {}",
195 snap.streaming_dropped_out_of_window
196 )
197 .unwrap();
198 out.push_str("# EOF\n");
199 out
200}
201
202fn fmt_f64(x: f64) -> String {
207 if x.is_nan() {
208 "NaN".to_string()
209 } else if x.is_infinite() {
210 if x > 0.0 {
211 "+Inf".to_string()
212 } else {
213 "-Inf".to_string()
214 }
215 } else {
216 format!("{:.6}", x)
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::grammar::Episode;
224
225 fn ep(motif: MotifClass, peak: f64, trust: f64) -> Episode {
226 Episode {
227 motif,
228 channel: None,
229 t_start: 0.0,
230 t_end: 1.0,
231 peak,
232 ema_at_boundary: 0.0,
233 trust_sum: trust,
234 }
235 }
236
237 #[test]
238 fn empty_snapshot_has_zero_counts_and_nan_peaks() {
239 let snap = MetricsSnapshot::from_episodes(&[]);
240 for c in snap.per_motif_count {
241 assert_eq!(c, 0);
242 }
243 for p in snap.per_motif_last_peak {
244 assert!(p.is_nan());
245 }
246 let text = render_openmetrics(&snap);
247 assert!(text.contains("dsfb_episodes_total{motif=\"plan_regression_onset\"} 0"));
248 assert!(text.contains("dsfb_episode_peak_last{motif=\"plan_regression_onset\"} NaN"));
249 assert!(text.ends_with("# EOF\n"));
250 }
251
252 #[test]
253 fn counts_and_last_peak_track_episode_order() {
254 let eps = vec![
255 ep(MotifClass::PlanRegressionOnset, 0.5, 1.0),
256 ep(MotifClass::PlanRegressionOnset, 0.8, 1.0),
257 ep(MotifClass::CacheCollapse, 0.3, 1.0),
258 ];
259 let snap = MetricsSnapshot::from_episodes(&eps);
260 let idx_plan = motif_index(MotifClass::PlanRegressionOnset);
261 let idx_cache = motif_index(MotifClass::CacheCollapse);
262 assert_eq!(snap.per_motif_count[idx_plan], 2);
263 assert_eq!(snap.per_motif_count[idx_cache], 1);
264 assert!((snap.per_motif_last_peak[idx_plan] - 0.8).abs() < 1e-12);
265 assert!((snap.per_motif_last_peak[idx_cache] - 0.3).abs() < 1e-12);
266 }
267
268 #[test]
269 fn openmetrics_output_is_deterministic() {
270 let eps = vec![
271 ep(MotifClass::ContentionRamp, 1.2, 1.0),
272 ep(MotifClass::WorkloadPhaseTransition, 0.4, 1.0),
273 ];
274 let snap = MetricsSnapshot::from_episodes(&eps);
275 let a = render_openmetrics(&snap);
276 let b = render_openmetrics(&snap);
277 assert_eq!(a, b);
278 }
279
280 #[test]
281 fn streaming_fold_propagates_counters() {
282 let mut ing = crate::streaming::StreamingIngestor::with_window("t", 1.0);
283 ing.push(crate::residual::ResidualSample::new(
284 0.0,
285 crate::residual::ResidualClass::PlanRegression,
286 0.1,
287 ));
288 let snap = MetricsSnapshot::from_episodes(&[]).with_streaming(&ing);
289 let text = render_openmetrics(&snap);
290 assert!(text.contains("dsfb_streaming_residuals_staged 1"));
291 assert!(text.contains("dsfb_streaming_residuals_dropped_out_of_window_total 0"));
292 }
293}