prometheus_extensions/lib.rs
1//! Prometheus extensions for richer metric collection.
2//!
3//! This crate provides three utilities that complement the
4//! [`prometheus`](https://docs.rs/prometheus) crate:
5//!
6//! | Type | Purpose |
7//! |------|---------|
8//! | [`AggregateCounter`] | A `CounterVec` wrapper that automatically emits an extra **unlabeled total** alongside every per-label counter. |
9//! | [`ScientificEncoder`] | A lightweight Prometheus text-format encoder that renders counter values in scientific notation (`1.23E4`) with a trailing comma after the last label — matching the format Kafka JMX exporters produce. |
10//! | [`Sensor`] | A lock-free exponential moving average (EMA) gauge for tracking rates or throughput. |
11//!
12//! # Quick start
13//!
14//! ```rust
15//! use prometheus::Opts;
16//! use prometheus::core::Collector;
17//! use prometheus_extensions::{AggregateCounter, ScientificEncoder};
18//!
19//! // 1. Create an aggregate counter
20//! let counter = AggregateCounter::new(
21//! Opts::new("http_requests_total", "Total HTTP requests"),
22//! &["method"],
23//! ).unwrap();
24//!
25//! counter.with_label_values(&["GET"]).inc_by(100.0);
26//! counter.with_label_values(&["POST"]).inc_by(42.0);
27//!
28//! // Collecting yields 3 metrics: the unlabeled total (142) plus the two labeled ones.
29//! let families = counter.collect();
30//! assert_eq!(families[0].get_metric().len(), 3);
31//!
32//! // 2. Encode in Kafka-compatible scientific notation
33//! let encoder = ScientificEncoder::new();
34//! let mut buf = Vec::new();
35//! encoder.encode(&families, &mut buf).unwrap();
36//! let output = String::from_utf8(buf).unwrap();
37//! assert!(output.contains("http_requests_total 1.4200000000E2"));
38//! ```
39//!
40//! # Sensor (EMA)
41//!
42//! ```rust
43//! use prometheus_extensions::Sensor;
44//!
45//! let sensor = Sensor::new(0.05); // alpha = 0.05
46//! sensor.measure(100.0);
47//! sensor.measure(200.0);
48//! // EMA tracks the smoothed value
49//! let value = sensor.get();
50//! assert!(value > 0.0);
51//! ```
52
53#![deny(missing_docs)]
54#![deny(warnings)]
55
56use prometheus::core::{Atomic, AtomicF64, Collector, Desc};
57use prometheus::proto::MetricFamily;
58use prometheus::{Counter, CounterVec, Opts};
59
60use std::io::Write;
61
62// ---------------------------------------------------------------------------
63// AggregateCounter
64// ---------------------------------------------------------------------------
65
66/// A Prometheus [`CounterVec`] wrapper that automatically produces an extra
67/// **unlabeled aggregate** metric (the sum of all label combinations) in
68/// addition to the per-label counters.
69///
70/// This is useful when you want a single metric name to expose both a total
71/// and a per-dimension breakdown without maintaining a separate `Counter`.
72///
73/// # Example
74///
75/// ```rust
76/// use prometheus::Opts;
77/// use prometheus::core::Collector;
78/// use prometheus_extensions::AggregateCounter;
79///
80/// let counter = AggregateCounter::new(
81/// Opts::new("rpc_calls_total", "Total RPC calls"),
82/// &["service"],
83/// ).unwrap();
84///
85/// counter.with_label_values(&["auth"]).inc_by(10.0);
86/// counter.with_label_values(&["billing"]).inc_by(20.0);
87///
88/// let families = counter.collect();
89/// let metrics = families[0].get_metric();
90///
91/// // First metric is the aggregate (no labels, value = 30)
92/// assert_eq!(metrics[0].get_label().len(), 0);
93/// assert_eq!(metrics[0].get_counter().value(), 30.0);
94///
95/// // Remaining metrics are the per-label counters
96/// assert_eq!(metrics.len(), 3); // 1 aggregate + 2 labeled
97/// ```
98#[derive(Clone)]
99pub struct AggregateCounter {
100 labeled_counter: CounterVec,
101 aggregate_desc: Desc,
102}
103
104impl AggregateCounter {
105 /// Create a new `AggregateCounter`.
106 ///
107 /// `opts` defines the metric name and help string.
108 /// `label_names` are the label dimensions for the per-label counters.
109 pub fn new(opts: Opts, label_names: &[&str]) -> Result<Self, prometheus::Error> {
110 let labeled_counter = CounterVec::new(opts.clone(), label_names)?;
111
112 let aggregate_desc = Desc::new(
113 opts.name.clone(),
114 opts.help.clone(),
115 vec![],
116 opts.const_labels.clone(),
117 )?;
118
119 Ok(AggregateCounter {
120 labeled_counter,
121 aggregate_desc,
122 })
123 }
124
125 /// Return a [`Counter`] for the given label values, creating it if it
126 /// does not already exist.
127 pub fn with_label_values(&self, vals: &[&str]) -> Counter {
128 self.labeled_counter.with_label_values(vals)
129 }
130}
131
132impl Collector for AggregateCounter {
133 fn desc(&self) -> Vec<&Desc> {
134 let mut descs = vec![&self.aggregate_desc];
135 descs.extend(self.labeled_counter.desc());
136 descs
137 }
138
139 fn collect(&self) -> Vec<MetricFamily> {
140 let labeled_metrics = self.labeled_counter.collect();
141
142 if labeled_metrics.is_empty() {
143 return vec![];
144 }
145
146 let mut metric_family = labeled_metrics[0].clone();
147
148 // Sum all labeled counter values into a single aggregate.
149 let total: f64 = metric_family
150 .get_metric()
151 .iter()
152 .map(|m| m.get_counter().value())
153 .sum();
154
155 let mut aggregate_metric = prometheus::proto::Metric::new();
156 let mut counter = prometheus::proto::Counter::new();
157 counter.set_value(total);
158 aggregate_metric.set_counter(counter);
159
160 // Prepend the aggregate metric before the labeled ones.
161 let mut all_metrics = vec![aggregate_metric];
162 all_metrics.extend(metric_family.take_metric().into_iter());
163 metric_family.set_metric(all_metrics.into());
164
165 vec![metric_family]
166 }
167}
168
169// ---------------------------------------------------------------------------
170// ScientificEncoder
171// ---------------------------------------------------------------------------
172
173/// A Prometheus text-format encoder that renders counter values in scientific
174/// notation (`{:.10E}`) and appends a trailing comma after the last label.
175///
176/// This matches the output format produced by Kafka's JMX-to-Prometheus
177/// exporters, making it useful for dashboards that expect that convention.
178///
179/// # Example
180///
181/// ```rust
182/// use prometheus::Opts;
183/// use prometheus::core::Collector;
184/// use prometheus_extensions::{AggregateCounter, ScientificEncoder};
185///
186/// let counter = AggregateCounter::new(
187/// Opts::new("bytes_in", "Bytes received"),
188/// &["topic"],
189/// ).unwrap();
190/// counter.with_label_values(&["events"]).inc_by(4.6384186519e10);
191///
192/// let families = counter.collect();
193/// let encoder = ScientificEncoder::new();
194/// let mut buf = Vec::new();
195/// encoder.encode(&families, &mut buf).unwrap();
196///
197/// let output = String::from_utf8(buf).unwrap();
198/// assert!(output.contains("bytes_in{topic=\"events\",}"));
199/// assert!(output.contains("E10"));
200/// ```
201#[derive(Default)]
202pub struct ScientificEncoder;
203
204impl ScientificEncoder {
205 /// Create a new encoder.
206 pub fn new() -> Self {
207 Self
208 }
209
210 /// Encode `metric_families` into the Prometheus text exposition format
211 /// with scientific-notation counter values.
212 pub fn encode<W: Write>(
213 &self,
214 metric_families: &[MetricFamily],
215 writer: &mut W,
216 ) -> Result<(), std::io::Error> {
217 for mf in metric_families {
218 writeln!(writer, "# HELP {} {}", mf.name(), mf.help())?;
219 writeln!(writer, "# TYPE {} counter", mf.name())?;
220
221 for metric in mf.get_metric() {
222 let value = metric.get_counter().value();
223
224 if metric.get_label().is_empty() {
225 // Aggregate metric (no labels)
226 writeln!(writer, "{} {:.10E}", mf.name(), value)?;
227 } else {
228 // Labeled metric — format with trailing comma
229 let mut label_str = String::new();
230 for label in metric.get_label() {
231 if !label_str.is_empty() {
232 label_str.push(',');
233 }
234 label_str.push_str(&format!(
235 "{}=\"{}\"",
236 label.name(),
237 label.value()
238 ));
239 }
240 label_str.push(',');
241
242 writeln!(writer, "{}{{{}}} {:.10E}", mf.name(), label_str, value)?;
243 }
244 }
245 }
246 Ok(())
247 }
248}
249
250// ---------------------------------------------------------------------------
251// Sensor (EMA)
252// ---------------------------------------------------------------------------
253
254/// A lock-free exponential moving average (EMA) gauge.
255///
256/// Useful for tracking smoothed rates or throughput where you want to dampen
257/// spikes. The smoothing factor `alpha` controls responsiveness:
258///
259/// - `alpha` close to 1.0 → responds quickly to new values (noisy)
260/// - `alpha` close to 0.0 → responds slowly (smooth)
261///
262/// A typical value for per-second throughput is `0.05`.
263///
264/// # Example
265///
266/// ```rust
267/// use prometheus_extensions::Sensor;
268///
269/// let sensor = Sensor::new(0.05);
270/// for _ in 0..100 {
271/// sensor.measure(1000.0);
272/// }
273/// // After many identical measurements, EMA converges to the input value.
274/// assert!((sensor.get() - 1000.0).abs() < 10.0);
275/// ```
276pub struct Sensor {
277 ema: AtomicF64,
278 alpha: f64,
279}
280
281impl Sensor {
282 /// Create a new sensor with the given smoothing factor `alpha`.
283 ///
284 /// # Panics
285 ///
286 /// Panics if `alpha` is not in the range `(0.0, 1.0]`.
287 pub fn new(alpha: f64) -> Self {
288 assert!(
289 alpha > 0.0 && alpha <= 1.0,
290 "alpha must be in (0.0, 1.0], got {alpha}"
291 );
292 Self {
293 ema: AtomicF64::new(0.0),
294 alpha,
295 }
296 }
297
298 /// Feed a new measurement into the EMA.
299 pub fn measure(&self, value: f64) {
300 let current = self.ema.get();
301 let new_value = self.alpha * value + (1.0 - self.alpha) * current;
302 self.ema.set(new_value);
303 }
304
305 /// Read the current smoothed value.
306 pub fn get(&self) -> f64 {
307 self.ema.get()
308 }
309}
310
311// ---------------------------------------------------------------------------
312// Tests
313// ---------------------------------------------------------------------------
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use prometheus::core::Collector;
319
320 #[test]
321 fn aggregate_counter_produces_both_metrics() {
322 let opts = Opts::new("test_counter", "Test counter for aggregation");
323 let counter = AggregateCounter::new(opts, &["label"]).unwrap();
324
325 counter.with_label_values(&["value1"]).inc_by(10.0);
326 counter.with_label_values(&["value2"]).inc_by(20.0);
327 counter.with_label_values(&["value3"]).inc_by(30.0);
328
329 let metric_families = counter.collect();
330 assert_eq!(metric_families.len(), 1);
331
332 let mf = &metric_families[0];
333 assert_eq!(mf.name(), "test_counter");
334 assert_eq!(mf.get_metric().len(), 4); // 1 aggregate + 3 labeled
335
336 // First metric is the aggregate (no labels)
337 let aggregate = &mf.get_metric()[0];
338 assert_eq!(aggregate.get_label().len(), 0);
339 assert_eq!(aggregate.get_counter().value(), 60.0);
340
341 // Remaining are labeled
342 for i in 1..4 {
343 let metric = &mf.get_metric()[i];
344 assert_eq!(metric.get_label().len(), 1);
345 let label_value = metric.get_label()[0].value();
346 let counter_value = metric.get_counter().value();
347 match label_value {
348 "value1" => assert_eq!(counter_value, 10.0),
349 "value2" => assert_eq!(counter_value, 20.0),
350 "value3" => assert_eq!(counter_value, 30.0),
351 _ => panic!("Unexpected label value: {label_value}"),
352 }
353 }
354 }
355
356 #[test]
357 fn aggregate_counter_empty_collect() {
358 let opts = Opts::new("empty_counter", "No observations yet");
359 let counter = AggregateCounter::new(opts, &["x"]).unwrap();
360 let families = counter.collect();
361 // CounterVec with no observations still returns a family with 0 metrics
362 // or an empty vec depending on prometheus version — either is fine.
363 if !families.is_empty() {
364 let total: f64 = families[0]
365 .get_metric()
366 .iter()
367 .map(|m| m.get_counter().value())
368 .sum();
369 assert_eq!(total, 0.0);
370 }
371 }
372
373 #[test]
374 fn scientific_encoder_format() {
375 let opts = Opts::new("test_kafka_metrics", "Test counter for Kafka format");
376 let counter = AggregateCounter::new(opts, &["topic"]).unwrap();
377
378 counter
379 .with_label_values(&["hits"])
380 .inc_by(4.6384186519e10);
381 counter.with_label_values(&["logs"]).inc_by(1.2345e9);
382
383 let families = counter.collect();
384 let encoder = ScientificEncoder::new();
385 let mut buf = Vec::new();
386 encoder.encode(&families, &mut buf).unwrap();
387 let output = String::from_utf8(buf).unwrap();
388
389 assert!(output.contains("# HELP test_kafka_metrics Test counter for Kafka format"));
390 assert!(output.contains("# TYPE test_kafka_metrics counter"));
391 assert!(output.contains("test_kafka_metrics 4.7618686519E10"));
392 assert!(output.contains("test_kafka_metrics{topic=\"hits\",} 4.6384186519E10"));
393 assert!(output.contains("test_kafka_metrics{topic=\"logs\",} 1.2345000000E9"));
394 }
395
396 #[test]
397 fn sensor_converges() {
398 let sensor = Sensor::new(0.05);
399 for _ in 0..1000 {
400 sensor.measure(500.0);
401 }
402 assert!(
403 (sensor.get() - 500.0).abs() < 1.0,
404 "EMA should converge to 500.0, got {}",
405 sensor.get()
406 );
407 }
408
409 #[test]
410 fn sensor_starts_at_zero() {
411 let sensor = Sensor::new(0.1);
412 assert_eq!(sensor.get(), 0.0);
413 }
414
415 #[test]
416 #[should_panic(expected = "alpha must be in (0.0, 1.0]")]
417 fn sensor_rejects_zero_alpha() {
418 Sensor::new(0.0);
419 }
420
421 #[test]
422 #[should_panic(expected = "alpha must be in (0.0, 1.0]")]
423 fn sensor_rejects_negative_alpha() {
424 Sensor::new(-0.5);
425 }
426
427 #[test]
428 fn sensor_alpha_one_tracks_instantly() {
429 let sensor = Sensor::new(1.0);
430 sensor.measure(42.0);
431 assert_eq!(sensor.get(), 42.0);
432 sensor.measure(99.0);
433 assert_eq!(sensor.get(), 99.0);
434 }
435}