Skip to main content

dsfb_database/
metrics_exporter.rs

1//! Phase-C4: Prometheus / OpenMetrics exposition of live engine state.
2//!
3//! This module emits the OpenMetrics 1.0 text format a Prometheus
4//! scraper will happily consume. It is **intentionally** a pure
5//! function over the in-memory episode list and (optionally) the
6//! streaming ingestor's counters — no HTTP, no async runtime, no
7//! hyper / axum / tokio dependency. The operator runbook (§Phase-C6
8//! README) documents three wiring options that cost less than 100
9//! lines each: plain `std::net::TcpListener` loop, `tiny_http` crate,
10//! or a sidecar `textfile` collector read by `node_exporter`.
11//!
12//! Metric catalogue (the names follow the Prometheus
13//! `NAMESPACE_SUBSYSTEM_UNIT` convention):
14//!
15//!   * `dsfb_episodes_total{motif="..."}` — counter, total episodes
16//!     emitted per motif class since process start.
17//!   * `dsfb_episode_peak_last{motif="..."}` — gauge, `|peak|` of the
18//!     most recently closed episode per motif. `NaN` when no episode
19//!     has closed yet.
20//!   * `dsfb_episode_trust_sum_last{motif="..."}` — gauge, trust-sum
21//!     observed at the most recent episode boundary. Should stay in
22//!     `[0.99, 1.01]`; deviations indicate an observer bug.
23//!   * `dsfb_streaming_residuals_staged` — gauge, current reorder
24//!     buffer occupancy.
25//!   * `dsfb_streaming_residuals_flushed_total` — counter, samples
26//!     moved from the reorder buffer into the canonical stream.
27//!   * `dsfb_streaming_residuals_dropped_out_of_window_total` —
28//!     counter, samples dropped because they arrived more than one
29//!     reorder-window behind the flushed frontier. A non-zero value
30//!     is an alertable condition (telemetry-pipeline is worse than
31//!     the configured window).
32//!
33//! The text format is deterministic given the same inputs — helpful
34//! for unit-testing and for diffing scrapes across deploys.
35
36use crate::grammar::{Episode, MotifClass};
37use crate::streaming::StreamingIngestor;
38use std::fmt::Write;
39
40/// Snapshot of counters an exporter can publish directly. Constructed
41/// from an [`Episode`] slice (typically the output of one
42/// `MotifEngine::run` or the accumulated set of closed episodes in a
43/// long-running daemon).
44#[derive(Debug, Clone, Default)]
45pub struct MetricsSnapshot {
46    /// Per-motif total episode count.
47    pub per_motif_count: [u64; MotifClass::ALL.len()],
48    /// Per-motif last-episode peak `|residual|` (NaN when none).
49    pub per_motif_last_peak: [f64; MotifClass::ALL.len()],
50    /// Per-motif last-episode trust sum (NaN when none).
51    pub per_motif_last_trust_sum: [f64; MotifClass::ALL.len()],
52    /// Residuals currently held in a streaming ingestor's buffer.
53    pub streaming_staged: u64,
54    /// Residuals the streaming ingestor has flushed to its stream.
55    pub streaming_flushed: u64,
56    /// Residuals the streaming ingestor dropped because they fell
57    /// outside the reorder window.
58    pub streaming_dropped_out_of_window: u64,
59}
60
61impl MetricsSnapshot {
62    /// Build a snapshot from an [`Episode`] slice. Order matters only
63    /// in that the *last* episode per motif (by insertion order)
64    /// determines `last_peak` and `last_trust_sum`.
65    pub fn from_episodes(episodes: &[Episode]) -> Self {
66        let mut snap = MetricsSnapshot::default();
67        for v in snap.per_motif_last_peak.iter_mut() {
68            *v = f64::NAN;
69        }
70        for v in snap.per_motif_last_trust_sum.iter_mut() {
71            *v = f64::NAN;
72        }
73        for e in episodes {
74            let idx = motif_index(e.motif);
75            snap.per_motif_count[idx] += 1;
76            snap.per_motif_last_peak[idx] = e.peak;
77            snap.per_motif_last_trust_sum[idx] = e.trust_sum;
78        }
79        snap
80    }
81
82    /// Fold the streaming ingestor's counters into an existing
83    /// snapshot. The ingestor is borrowed immutably — callers can
84    /// continue to push samples into it after this call.
85    pub fn with_streaming(mut self, ing: &StreamingIngestor) -> Self {
86        self.streaming_staged = ing.staged() as u64;
87        self.streaming_flushed = ing.flushed() as u64;
88        self.streaming_dropped_out_of_window = ing.dropped_out_of_window();
89        self
90    }
91}
92
93fn motif_index(m: MotifClass) -> usize {
94    // `MotifClass::ALL` is the declared ordering used by the metrics
95    // and episode-emission paths. We locate the motif by linear scan
96    // — there are five elements, so the cost is negligible and the
97    // match stays deny-list-safe under future additions.
98    MotifClass::ALL
99        .iter()
100        .position(|x| *x == m)
101        .expect("MotifClass::ALL covers every motif")
102}
103
104/// Render a [`MetricsSnapshot`] as OpenMetrics 1.0 text. The output
105/// ends with `# EOF\n` as required by the OpenMetrics spec. The
106/// function is deterministic: same input → byte-identical output.
107pub fn render_openmetrics(snap: &MetricsSnapshot) -> String {
108    let mut out = String::with_capacity(2048);
109    // Counters
110    writeln!(
111        out,
112        "# HELP dsfb_episodes_total Total motif episodes emitted since process start."
113    )
114    .unwrap();
115    writeln!(out, "# TYPE dsfb_episodes_total counter").unwrap();
116    for (i, motif) in MotifClass::ALL.iter().enumerate() {
117        writeln!(
118            out,
119            "dsfb_episodes_total{{motif=\"{}\"}} {}",
120            motif.name(),
121            snap.per_motif_count[i]
122        )
123        .unwrap();
124    }
125    // Last-episode peak (gauge)
126    writeln!(
127        out,
128        "# HELP dsfb_episode_peak_last Peak |residual| of the most recently closed episode per motif."
129    )
130    .unwrap();
131    writeln!(out, "# TYPE dsfb_episode_peak_last gauge").unwrap();
132    for (i, motif) in MotifClass::ALL.iter().enumerate() {
133        writeln!(
134            out,
135            "dsfb_episode_peak_last{{motif=\"{}\"}} {}",
136            motif.name(),
137            fmt_f64(snap.per_motif_last_peak[i])
138        )
139        .unwrap();
140    }
141    // Last-episode trust sum (gauge)
142    writeln!(
143        out,
144        "# HELP dsfb_episode_trust_sum_last Trust-sum observed at the most recent episode boundary."
145    )
146    .unwrap();
147    writeln!(out, "# TYPE dsfb_episode_trust_sum_last gauge").unwrap();
148    for (i, motif) in MotifClass::ALL.iter().enumerate() {
149        writeln!(
150            out,
151            "dsfb_episode_trust_sum_last{{motif=\"{}\"}} {}",
152            motif.name(),
153            fmt_f64(snap.per_motif_last_trust_sum[i])
154        )
155        .unwrap();
156    }
157    // Streaming counters
158    writeln!(
159        out,
160        "# HELP dsfb_streaming_residuals_staged Residuals currently held in the streaming reorder buffer."
161    )
162    .unwrap();
163    writeln!(out, "# TYPE dsfb_streaming_residuals_staged gauge").unwrap();
164    writeln!(
165        out,
166        "dsfb_streaming_residuals_staged {}",
167        snap.streaming_staged
168    )
169    .unwrap();
170    writeln!(
171        out,
172        "# HELP dsfb_streaming_residuals_flushed_total Residuals moved from the reorder buffer to the canonical stream."
173    )
174    .unwrap();
175    writeln!(out, "# TYPE dsfb_streaming_residuals_flushed_total counter").unwrap();
176    writeln!(
177        out,
178        "dsfb_streaming_residuals_flushed_total {}",
179        snap.streaming_flushed
180    )
181    .unwrap();
182    writeln!(
183        out,
184        "# HELP dsfb_streaming_residuals_dropped_out_of_window_total Residuals dropped because they arrived outside the reorder window."
185    )
186    .unwrap();
187    writeln!(
188        out,
189        "# TYPE dsfb_streaming_residuals_dropped_out_of_window_total counter"
190    )
191    .unwrap();
192    writeln!(
193        out,
194        "dsfb_streaming_residuals_dropped_out_of_window_total {}",
195        snap.streaming_dropped_out_of_window
196    )
197    .unwrap();
198    out.push_str("# EOF\n");
199    out
200}
201
202/// Format an `f64` the way OpenMetrics wants: `NaN` stays `NaN`,
203/// positive/negative infinity become `+Inf` / `-Inf`, everything else
204/// goes through `{:.6}` so the representation is bounded in length
205/// and deterministic across platforms.
206fn fmt_f64(x: f64) -> String {
207    if x.is_nan() {
208        "NaN".to_string()
209    } else if x.is_infinite() {
210        if x > 0.0 {
211            "+Inf".to_string()
212        } else {
213            "-Inf".to_string()
214        }
215    } else {
216        format!("{:.6}", x)
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::grammar::Episode;
224
225    fn ep(motif: MotifClass, peak: f64, trust: f64) -> Episode {
226        Episode {
227            motif,
228            channel: None,
229            t_start: 0.0,
230            t_end: 1.0,
231            peak,
232            ema_at_boundary: 0.0,
233            trust_sum: trust,
234        }
235    }
236
237    #[test]
238    fn empty_snapshot_has_zero_counts_and_nan_peaks() {
239        let snap = MetricsSnapshot::from_episodes(&[]);
240        for c in snap.per_motif_count {
241            assert_eq!(c, 0);
242        }
243        for p in snap.per_motif_last_peak {
244            assert!(p.is_nan());
245        }
246        let text = render_openmetrics(&snap);
247        assert!(text.contains("dsfb_episodes_total{motif=\"plan_regression_onset\"} 0"));
248        assert!(text.contains("dsfb_episode_peak_last{motif=\"plan_regression_onset\"} NaN"));
249        assert!(text.ends_with("# EOF\n"));
250    }
251
252    #[test]
253    fn counts_and_last_peak_track_episode_order() {
254        let eps = vec![
255            ep(MotifClass::PlanRegressionOnset, 0.5, 1.0),
256            ep(MotifClass::PlanRegressionOnset, 0.8, 1.0),
257            ep(MotifClass::CacheCollapse, 0.3, 1.0),
258        ];
259        let snap = MetricsSnapshot::from_episodes(&eps);
260        let idx_plan = motif_index(MotifClass::PlanRegressionOnset);
261        let idx_cache = motif_index(MotifClass::CacheCollapse);
262        assert_eq!(snap.per_motif_count[idx_plan], 2);
263        assert_eq!(snap.per_motif_count[idx_cache], 1);
264        assert!((snap.per_motif_last_peak[idx_plan] - 0.8).abs() < 1e-12);
265        assert!((snap.per_motif_last_peak[idx_cache] - 0.3).abs() < 1e-12);
266    }
267
268    #[test]
269    fn openmetrics_output_is_deterministic() {
270        let eps = vec![
271            ep(MotifClass::ContentionRamp, 1.2, 1.0),
272            ep(MotifClass::WorkloadPhaseTransition, 0.4, 1.0),
273        ];
274        let snap = MetricsSnapshot::from_episodes(&eps);
275        let a = render_openmetrics(&snap);
276        let b = render_openmetrics(&snap);
277        assert_eq!(a, b);
278    }
279
280    #[test]
281    fn streaming_fold_propagates_counters() {
282        let mut ing = crate::streaming::StreamingIngestor::with_window("t", 1.0);
283        ing.push(crate::residual::ResidualSample::new(
284            0.0,
285            crate::residual::ResidualClass::PlanRegression,
286            0.1,
287        ));
288        let snap = MetricsSnapshot::from_episodes(&[]).with_streaming(&ing);
289        let text = render_openmetrics(&snap);
290        assert!(text.contains("dsfb_streaming_residuals_staged 1"));
291        assert!(text.contains("dsfb_streaming_residuals_dropped_out_of_window_total 0"));
292    }
293}