Skip to main content

prometheus_extensions/
lib.rs

1//! Prometheus extensions for richer metric collection.
2//!
3//! This crate provides three utilities that complement the
4//! [`prometheus`](https://docs.rs/prometheus) crate:
5//!
6//! | Type | Purpose |
7//! |------|---------|
8//! | [`AggregateCounter`] | A `CounterVec` wrapper that automatically emits an extra **unlabeled total** alongside every per-label counter. |
9//! | [`ScientificEncoder`] | A lightweight Prometheus text-format encoder that renders counter values in scientific notation (`1.23E4`) with a trailing comma after the last label — matching the format Kafka JMX exporters produce. |
10//! | [`Sensor`] | A lock-free exponential moving average (EMA) gauge for tracking rates or throughput. |
11//!
12//! # Quick start
13//!
14//! ```rust
15//! use prometheus::Opts;
16//! use prometheus::core::Collector;
17//! use prometheus_extensions::{AggregateCounter, ScientificEncoder};
18//!
19//! // 1. Create an aggregate counter
20//! let counter = AggregateCounter::new(
21//!     Opts::new("http_requests_total", "Total HTTP requests"),
22//!     &["method"],
23//! ).unwrap();
24//!
25//! counter.with_label_values(&["GET"]).inc_by(100.0);
26//! counter.with_label_values(&["POST"]).inc_by(42.0);
27//!
28//! // Collecting yields 3 metrics: the unlabeled total (142) plus the two labeled ones.
29//! let families = counter.collect();
30//! assert_eq!(families[0].get_metric().len(), 3);
31//!
32//! // 2. Encode in Kafka-compatible scientific notation
33//! let encoder = ScientificEncoder::new();
34//! let mut buf = Vec::new();
35//! encoder.encode(&families, &mut buf).unwrap();
36//! let output = String::from_utf8(buf).unwrap();
37//! assert!(output.contains("http_requests_total 1.4200000000E2"));
38//! ```
39//!
40//! # Sensor (EMA)
41//!
42//! ```rust
43//! use prometheus_extensions::Sensor;
44//!
45//! let sensor = Sensor::new(0.05); // alpha = 0.05
46//! sensor.measure(100.0);
47//! sensor.measure(200.0);
48//! // EMA tracks the smoothed value
49//! let value = sensor.get();
50//! assert!(value > 0.0);
51//! ```
52
53#![deny(missing_docs)]
54#![deny(warnings)]
55
56use prometheus::core::{Atomic, AtomicF64, Collector, Desc};
57use prometheus::proto::MetricFamily;
58use prometheus::{Counter, CounterVec, Opts};
59
60use std::io::Write;
61
62// ---------------------------------------------------------------------------
63// AggregateCounter
64// ---------------------------------------------------------------------------
65
66/// A Prometheus [`CounterVec`] wrapper that automatically produces an extra
67/// **unlabeled aggregate** metric (the sum of all label combinations) in
68/// addition to the per-label counters.
69///
70/// This is useful when you want a single metric name to expose both a total
71/// and a per-dimension breakdown without maintaining a separate `Counter`.
72///
73/// # Example
74///
75/// ```rust
76/// use prometheus::Opts;
77/// use prometheus::core::Collector;
78/// use prometheus_extensions::AggregateCounter;
79///
80/// let counter = AggregateCounter::new(
81///     Opts::new("rpc_calls_total", "Total RPC calls"),
82///     &["service"],
83/// ).unwrap();
84///
85/// counter.with_label_values(&["auth"]).inc_by(10.0);
86/// counter.with_label_values(&["billing"]).inc_by(20.0);
87///
88/// let families = counter.collect();
89/// let metrics = families[0].get_metric();
90///
91/// // First metric is the aggregate (no labels, value = 30)
92/// assert_eq!(metrics[0].get_label().len(), 0);
93/// assert_eq!(metrics[0].get_counter().value(), 30.0);
94///
95/// // Remaining metrics are the per-label counters
96/// assert_eq!(metrics.len(), 3); // 1 aggregate + 2 labeled
97/// ```
98#[derive(Clone)]
99pub struct AggregateCounter {
100    labeled_counter: CounterVec,
101    aggregate_desc: Desc,
102}
103
104impl AggregateCounter {
105    /// Create a new `AggregateCounter`.
106    ///
107    /// `opts` defines the metric name and help string.
108    /// `label_names` are the label dimensions for the per-label counters.
109    pub fn new(opts: Opts, label_names: &[&str]) -> Result<Self, prometheus::Error> {
110        let labeled_counter = CounterVec::new(opts.clone(), label_names)?;
111
112        let aggregate_desc = Desc::new(
113            opts.name.clone(),
114            opts.help.clone(),
115            vec![],
116            opts.const_labels.clone(),
117        )?;
118
119        Ok(AggregateCounter {
120            labeled_counter,
121            aggregate_desc,
122        })
123    }
124
125    /// Return a [`Counter`] for the given label values, creating it if it
126    /// does not already exist.
127    pub fn with_label_values(&self, vals: &[&str]) -> Counter {
128        self.labeled_counter.with_label_values(vals)
129    }
130}
131
132impl Collector for AggregateCounter {
133    fn desc(&self) -> Vec<&Desc> {
134        let mut descs = vec![&self.aggregate_desc];
135        descs.extend(self.labeled_counter.desc());
136        descs
137    }
138
139    fn collect(&self) -> Vec<MetricFamily> {
140        let labeled_metrics = self.labeled_counter.collect();
141
142        if labeled_metrics.is_empty() {
143            return vec![];
144        }
145
146        let mut metric_family = labeled_metrics[0].clone();
147
148        // Sum all labeled counter values into a single aggregate.
149        let total: f64 = metric_family
150            .get_metric()
151            .iter()
152            .map(|m| m.get_counter().value())
153            .sum();
154
155        let mut aggregate_metric = prometheus::proto::Metric::new();
156        let mut counter = prometheus::proto::Counter::new();
157        counter.set_value(total);
158        aggregate_metric.set_counter(counter);
159
160        // Prepend the aggregate metric before the labeled ones.
161        let mut all_metrics = vec![aggregate_metric];
162        all_metrics.extend(metric_family.take_metric().into_iter());
163        metric_family.set_metric(all_metrics.into());
164
165        vec![metric_family]
166    }
167}
168
169// ---------------------------------------------------------------------------
170// ScientificEncoder
171// ---------------------------------------------------------------------------
172
173/// A Prometheus text-format encoder that renders counter values in scientific
174/// notation (`{:.10E}`) and appends a trailing comma after the last label.
175///
176/// This matches the output format produced by Kafka's JMX-to-Prometheus
177/// exporters, making it useful for dashboards that expect that convention.
178///
179/// # Example
180///
181/// ```rust
182/// use prometheus::Opts;
183/// use prometheus::core::Collector;
184/// use prometheus_extensions::{AggregateCounter, ScientificEncoder};
185///
186/// let counter = AggregateCounter::new(
187///     Opts::new("bytes_in", "Bytes received"),
188///     &["topic"],
189/// ).unwrap();
190/// counter.with_label_values(&["events"]).inc_by(4.6384186519e10);
191///
192/// let families = counter.collect();
193/// let encoder = ScientificEncoder::new();
194/// let mut buf = Vec::new();
195/// encoder.encode(&families, &mut buf).unwrap();
196///
197/// let output = String::from_utf8(buf).unwrap();
198/// assert!(output.contains("bytes_in{topic=\"events\",}"));
199/// assert!(output.contains("E10"));
200/// ```
201#[derive(Default)]
202pub struct ScientificEncoder;
203
204impl ScientificEncoder {
205    /// Create a new encoder.
206    pub fn new() -> Self {
207        Self
208    }
209
210    /// Encode `metric_families` into the Prometheus text exposition format
211    /// with scientific-notation counter values.
212    pub fn encode<W: Write>(
213        &self,
214        metric_families: &[MetricFamily],
215        writer: &mut W,
216    ) -> Result<(), std::io::Error> {
217        for mf in metric_families {
218            writeln!(writer, "# HELP {} {}", mf.name(), mf.help())?;
219            writeln!(writer, "# TYPE {} counter", mf.name())?;
220
221            for metric in mf.get_metric() {
222                let value = metric.get_counter().value();
223
224                if metric.get_label().is_empty() {
225                    // Aggregate metric (no labels)
226                    writeln!(writer, "{} {:.10E}", mf.name(), value)?;
227                } else {
228                    // Labeled metric — format with trailing comma
229                    let mut label_str = String::new();
230                    for label in metric.get_label() {
231                        if !label_str.is_empty() {
232                            label_str.push(',');
233                        }
234                        label_str.push_str(&format!(
235                            "{}=\"{}\"",
236                            label.name(),
237                            label.value()
238                        ));
239                    }
240                    label_str.push(',');
241
242                    writeln!(writer, "{}{{{}}} {:.10E}", mf.name(), label_str, value)?;
243                }
244            }
245        }
246        Ok(())
247    }
248}
249
250// ---------------------------------------------------------------------------
251// Sensor (EMA)
252// ---------------------------------------------------------------------------
253
254/// A lock-free exponential moving average (EMA) gauge.
255///
256/// Useful for tracking smoothed rates or throughput where you want to dampen
257/// spikes. The smoothing factor `alpha` controls responsiveness:
258///
259/// - `alpha` close to 1.0 → responds quickly to new values (noisy)
260/// - `alpha` close to 0.0 → responds slowly (smooth)
261///
262/// A typical value for per-second throughput is `0.05`.
263///
264/// # Example
265///
266/// ```rust
267/// use prometheus_extensions::Sensor;
268///
269/// let sensor = Sensor::new(0.05);
270/// for _ in 0..100 {
271///     sensor.measure(1000.0);
272/// }
273/// // After many identical measurements, EMA converges to the input value.
274/// assert!((sensor.get() - 1000.0).abs() < 10.0);
275/// ```
276pub struct Sensor {
277    ema: AtomicF64,
278    alpha: f64,
279}
280
281impl Sensor {
282    /// Create a new sensor with the given smoothing factor `alpha`.
283    ///
284    /// # Panics
285    ///
286    /// Panics if `alpha` is not in the range `(0.0, 1.0]`.
287    pub fn new(alpha: f64) -> Self {
288        assert!(
289            alpha > 0.0 && alpha <= 1.0,
290            "alpha must be in (0.0, 1.0], got {alpha}"
291        );
292        Self {
293            ema: AtomicF64::new(0.0),
294            alpha,
295        }
296    }
297
298    /// Feed a new measurement into the EMA.
299    pub fn measure(&self, value: f64) {
300        let current = self.ema.get();
301        let new_value = self.alpha * value + (1.0 - self.alpha) * current;
302        self.ema.set(new_value);
303    }
304
305    /// Read the current smoothed value.
306    pub fn get(&self) -> f64 {
307        self.ema.get()
308    }
309}
310
311// ---------------------------------------------------------------------------
312// Tests
313// ---------------------------------------------------------------------------
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use prometheus::core::Collector;
319
320    #[test]
321    fn aggregate_counter_produces_both_metrics() {
322        let opts = Opts::new("test_counter", "Test counter for aggregation");
323        let counter = AggregateCounter::new(opts, &["label"]).unwrap();
324
325        counter.with_label_values(&["value1"]).inc_by(10.0);
326        counter.with_label_values(&["value2"]).inc_by(20.0);
327        counter.with_label_values(&["value3"]).inc_by(30.0);
328
329        let metric_families = counter.collect();
330        assert_eq!(metric_families.len(), 1);
331
332        let mf = &metric_families[0];
333        assert_eq!(mf.name(), "test_counter");
334        assert_eq!(mf.get_metric().len(), 4); // 1 aggregate + 3 labeled
335
336        // First metric is the aggregate (no labels)
337        let aggregate = &mf.get_metric()[0];
338        assert_eq!(aggregate.get_label().len(), 0);
339        assert_eq!(aggregate.get_counter().value(), 60.0);
340
341        // Remaining are labeled
342        for i in 1..4 {
343            let metric = &mf.get_metric()[i];
344            assert_eq!(metric.get_label().len(), 1);
345            let label_value = metric.get_label()[0].value();
346            let counter_value = metric.get_counter().value();
347            match label_value {
348                "value1" => assert_eq!(counter_value, 10.0),
349                "value2" => assert_eq!(counter_value, 20.0),
350                "value3" => assert_eq!(counter_value, 30.0),
351                _ => panic!("Unexpected label value: {label_value}"),
352            }
353        }
354    }
355
356    #[test]
357    fn aggregate_counter_empty_collect() {
358        let opts = Opts::new("empty_counter", "No observations yet");
359        let counter = AggregateCounter::new(opts, &["x"]).unwrap();
360        let families = counter.collect();
361        // CounterVec with no observations still returns a family with 0 metrics
362        // or an empty vec depending on prometheus version — either is fine.
363        if !families.is_empty() {
364            let total: f64 = families[0]
365                .get_metric()
366                .iter()
367                .map(|m| m.get_counter().value())
368                .sum();
369            assert_eq!(total, 0.0);
370        }
371    }
372
373    #[test]
374    fn scientific_encoder_format() {
375        let opts = Opts::new("test_kafka_metrics", "Test counter for Kafka format");
376        let counter = AggregateCounter::new(opts, &["topic"]).unwrap();
377
378        counter
379            .with_label_values(&["hits"])
380            .inc_by(4.6384186519e10);
381        counter.with_label_values(&["logs"]).inc_by(1.2345e9);
382
383        let families = counter.collect();
384        let encoder = ScientificEncoder::new();
385        let mut buf = Vec::new();
386        encoder.encode(&families, &mut buf).unwrap();
387        let output = String::from_utf8(buf).unwrap();
388
389        assert!(output.contains("# HELP test_kafka_metrics Test counter for Kafka format"));
390        assert!(output.contains("# TYPE test_kafka_metrics counter"));
391        assert!(output.contains("test_kafka_metrics 4.7618686519E10"));
392        assert!(output.contains("test_kafka_metrics{topic=\"hits\",} 4.6384186519E10"));
393        assert!(output.contains("test_kafka_metrics{topic=\"logs\",} 1.2345000000E9"));
394    }
395
396    #[test]
397    fn sensor_converges() {
398        let sensor = Sensor::new(0.05);
399        for _ in 0..1000 {
400            sensor.measure(500.0);
401        }
402        assert!(
403            (sensor.get() - 500.0).abs() < 1.0,
404            "EMA should converge to 500.0, got {}",
405            sensor.get()
406        );
407    }
408
409    #[test]
410    fn sensor_starts_at_zero() {
411        let sensor = Sensor::new(0.1);
412        assert_eq!(sensor.get(), 0.0);
413    }
414
415    #[test]
416    #[should_panic(expected = "alpha must be in (0.0, 1.0]")]
417    fn sensor_rejects_zero_alpha() {
418        Sensor::new(0.0);
419    }
420
421    #[test]
422    #[should_panic(expected = "alpha must be in (0.0, 1.0]")]
423    fn sensor_rejects_negative_alpha() {
424        Sensor::new(-0.5);
425    }
426
427    #[test]
428    fn sensor_alpha_one_tracks_instantly() {
429        let sensor = Sensor::new(1.0);
430        sensor.measure(42.0);
431        assert_eq!(sensor.get(), 42.0);
432        sensor.measure(99.0);
433        assert_eq!(sensor.get(), 99.0);
434    }
435}