metrique_aggregation/histogram.rs
1#![deny(clippy::arithmetic_side_effects)]
2
3//! Histogram types for aggregating multiple observations into distributions.
4//!
5//! When emitting high-frequency metrics, you often want to aggregate multiple observations
6//! into a single metric entry rather than emitting each one individually. This module provides
7//! histogram types that collect observations and emit them as distributions.
8//!
9//! # When to use histograms
10//!
11//! Use histograms when you have many observations of the same metric within a single unit of work:
12//!
13//! - A distributed query that fans out to multiple backend services
14//! - Processing a batch of items where you want to track per-item latency
15//! - Any operation that generates multiple measurements you want to aggregate
16//!
17//! For most applications, [sampling](https://github.com/awslabs/metrique/blob/main/docs/sampling.md)
18//! is a better approach than aggregation. Consider histograms when you need precise distributions
19//! for high-frequency events.
20//!
21//! # Example
22//!
23//! ```
24//! use metrique::unit_of_work::metrics;
25//! use metrique_aggregation::histogram::Histogram;
26//! use metrique_writer::unit::Millisecond;
27//! use std::time::Duration;
28//!
29//! #[metrics(rename_all = "PascalCase")]
30//! struct QueryMetrics {
31//! query_id: String,
32//!
33//! #[metrics(unit = Millisecond)]
34//! backend_latency: Histogram<Duration>,
35//! }
36//!
37//! fn execute_query(query_id: String) {
38//! let mut metrics = QueryMetrics {
39//! query_id,
40//! backend_latency: Histogram::default(),
41//! };
42//!
43//! // Record multiple observations
44//! metrics.backend_latency.add_value(Duration::from_millis(45));
45//! metrics.backend_latency.add_value(Duration::from_millis(67));
46//! metrics.backend_latency.add_value(Duration::from_millis(52));
47//!
48//! // When metrics drops, emits a single entry with the distribution
49//! }
50//! ```
51//!
52//! # Choosing an aggregation strategy
53//!
54//! By default, histograms use [`ExponentialAggregationStrategy`]. To use a different strategy,
55//! specify it as the second type parameter:
56//!
57//! ```
58//! use metrique_aggregation::histogram::{Histogram, SortAndMerge};
59//! use std::time::Duration;
60//!
61//! let histogram: Histogram<Duration, SortAndMerge> = Histogram::new(SortAndMerge::new());
62//! ```
63//!
64//! ## ExponentialAggregationStrategy (default)
65//!
66//! Uses exponential bucketing with ~6.25% error. This is the best choice for most use cases:
67//!
68//! - Provides consistent relative precision across wide value ranges
69//! - Memory efficient with fixed bucket count (464 buckets)
70//! - Fast recording and draining operations
71//!
72//! Use this when you need good precision across values that span multiple orders of magnitude
73//! (e.g., latencies from microseconds to seconds).
74//!
75//! ## AtomicExponentialAggregationStrategy
76//!
77//! Thread-safe version of exponential bucketing. Use with [`crate::histogram::SharedHistogram`] when you need
78//! to record values from multiple threads concurrently:
79//!
80//! ```
81//! use metrique_aggregation::histogram::{SharedHistogram, AtomicExponentialAggregationStrategy};
82//! use std::time::Duration;
83//!
84//! let histogram: SharedHistogram<Duration, AtomicExponentialAggregationStrategy> =
85//! SharedHistogram::new(AtomicExponentialAggregationStrategy::new());
86//! ```
87//!
88//! ## SortAndMerge
89//!
90//! Stores all observations exactly and sorts them on emission:
91//!
92//! - Perfect precision - no bucketing error
93//! - Memory usage grows with observation count
94//! - Slower drain operation due to sorting
95//!
96//! Use this when you need exact values and have a bounded number of observations (typically < 1000).
97
98use histogram::Config;
99use metrique_core::CloseValue;
100use metrique_writer::{Distribution, MetricFlags, MetricValue, Observation, Value, ValueWriter};
101use ordered_float::OrderedFloat;
102use smallvec::SmallVec;
103use std::{borrow::Borrow, marker::PhantomData};
104
105use crate::traits::AggregateValue;
106
107/// Strategy for aggregating observations in a histogram.
108///
109/// Implementations determine how values are stored and converted to observations
110/// when the histogram is closed.
111pub trait AggregationStrategy {
112 /// Record a single observation.
113 fn record(&mut self, value: f64) {
114 self.record_many(value, 1);
115 }
116
117 /// Record multiple observations of the same value.
118 fn record_many(&mut self, value: f64, count: u64);
119
120 /// Drain all observations and return them as a vector.
121 ///
122 /// This resets the strategy's internal state.
123 fn drain(&mut self) -> Vec<Observation>;
124}
125
126/// Thread-safe strategy for aggregating observations in a histogram.
127///
128/// Like [`AggregationStrategy`] but allows recording values through a shared reference.
129pub trait SharedAggregationStrategy {
130 /// Record a single observation.
131 fn record(&self, value: f64) {
132 self.record_many(value, 1);
133 }
134
135 /// Record multiple observations of the same value through a shared reference.
136 fn record_many(&self, value: f64, count: u64);
137
138 /// Drain all observations and return them as a vector.
139 ///
140 /// This resets the strategy's internal state.
141 fn drain(&self) -> Vec<Observation>;
142}
143
144/// A histogram that collects multiple observations and emits them as a distribution.
145///
146/// Use this when you have many observations of the same metric within a single unit of work.
147/// The histogram aggregates values in memory and emits them as a single metric entry.
148///
149/// If you want to preserve all values instead of bucketing them, use `Histogram<T, SortAndMerge>` as
150/// the strategy.
151///
152/// Requires `&mut self` to add values. For thread-safe access, use [`SharedHistogram`].
153pub struct Histogram<T, S = ExponentialAggregationStrategy> {
154 strategy: S,
155 _value: PhantomData<T>,
156}
157
158impl<T, S: AggregationStrategy> Histogram<T, S> {
159 /// Create a new histogram with the given aggregation strategy.
160 pub fn new(strategy: S) -> Self {
161 Self {
162 strategy,
163 _value: PhantomData,
164 }
165 }
166
167 /// Add a value to the histogram.
168 ///
169 /// The value is converted to observations using the metric value's implementation,
170 /// then recorded in the aggregation strategy.
171 pub fn add_value(&mut self, value: impl Borrow<T>)
172 where
173 T: MetricValue,
174 {
175 let value = value.borrow();
176 struct Capturer<'a, S>(&'a mut S);
177 impl<'b, S: AggregationStrategy> ValueWriter for Capturer<'b, S> {
178 fn string(self, _value: &str) {}
179 fn metric<'a>(
180 self,
181 distribution: impl IntoIterator<Item = Observation>,
182 _unit: metrique_writer::Unit,
183 _dimensions: impl IntoIterator<Item = (&'a str, &'a str)>,
184 _flags: MetricFlags<'_>,
185 ) {
186 for obs in distribution {
187 match obs {
188 Observation::Unsigned(v) => self.0.record(v as f64),
189 Observation::Floating(v) => self.0.record(v),
190 Observation::Repeated { total, occurrences } => {
191 if occurrences > 0 {
192 let avg = total / occurrences as f64;
193 self.0.record_many(avg, occurrences);
194 }
195 }
196 _ => {}
197 }
198 }
199 }
200 fn error(self, _error: metrique_writer::ValidationError) {}
201 }
202
203 let capturer = Capturer(&mut self.strategy);
204 value.write(capturer);
205 }
206}
207
208impl<T, S: Default + AggregationStrategy> Default for Histogram<T, S> {
209 fn default() -> Self {
210 Self::new(S::default())
211 }
212}
213
214impl<T: MetricValue, S: AggregationStrategy> CloseValue for Histogram<T, S> {
215 type Closed = HistogramClosed<T>;
216
217 fn close(mut self) -> Self::Closed {
218 HistogramClosed {
219 observations: self.strategy.drain(),
220 _value: PhantomData,
221 }
222 }
223}
224
225/// Thread-safe histogram that collects multiple observations and emits them as a distribution.
226///
227/// Like [`Histogram`] but allows adding values through a shared reference, making it
228/// suitable for concurrent access patterns.
229pub struct SharedHistogram<T, S = AtomicExponentialAggregationStrategy> {
230 strategy: S,
231 _value: PhantomData<T>,
232}
233
234impl<T, S: Default> Default for SharedHistogram<T, S> {
235 fn default() -> Self {
236 Self {
237 strategy: Default::default(),
238 _value: Default::default(),
239 }
240 }
241}
242
243impl<T, S: SharedAggregationStrategy> SharedHistogram<T, S> {
244 /// Create a new atomic histogram with the given aggregation strategy.
245 pub fn new(strategy: S) -> Self {
246 Self {
247 strategy,
248 _value: PhantomData,
249 }
250 }
251
252 /// Add a value to the histogram through a shared reference.
253 ///
254 /// The value is converted to observations using the metric value's implementation,
255 /// then recorded in the aggregation strategy.
256 pub fn add_value(&self, value: T)
257 where
258 T: MetricValue,
259 {
260 struct Capturer<'a, S>(&'a S);
261 impl<'b, S: SharedAggregationStrategy> ValueWriter for Capturer<'b, S> {
262 fn string(self, _value: &str) {}
263 fn metric<'a>(
264 self,
265 distribution: impl IntoIterator<Item = Observation>,
266 _unit: metrique_writer::Unit,
267 _dimensions: impl IntoIterator<Item = (&'a str, &'a str)>,
268 _flags: MetricFlags<'_>,
269 ) {
270 for obs in distribution {
271 match obs {
272 Observation::Unsigned(v) => self.0.record(v as f64),
273 Observation::Floating(v) => self.0.record(v),
274 Observation::Repeated { total, occurrences } => {
275 if occurrences > 0 {
276 let avg = total / occurrences as f64;
277 self.0.record_many(avg, occurrences);
278 }
279 }
280 _ => {}
281 }
282 }
283 }
284 fn error(self, _error: metrique_writer::ValidationError) {}
285 }
286
287 let capturer = Capturer(&self.strategy);
288 value.write(capturer);
289 }
290}
291
292impl<T: MetricValue, S: SharedAggregationStrategy> CloseValue for SharedHistogram<T, S> {
293 type Closed = HistogramClosed<T>;
294
295 fn close(self) -> Self::Closed {
296 HistogramClosed {
297 observations: self.strategy.drain(),
298 _value: PhantomData,
299 }
300 }
301}
302
303/// Closed histogram value containing aggregated observations.
304///
305/// This is the result of closing a histogram and is emitted as a metric distribution.
306pub struct HistogramClosed<T> {
307 observations: Vec<Observation>,
308 _value: PhantomData<T>,
309}
310
311impl<T> Value for HistogramClosed<T>
312where
313 T: MetricValue,
314{
315 fn write(&self, writer: impl ValueWriter) {
316 use metrique_writer::unit::UnitTag;
317 writer.metric(
318 self.observations.iter().copied(),
319 T::Unit::UNIT,
320 [],
321 MetricFlags::upcast(&Distribution),
322 )
323 }
324}
325
326impl<T> MetricValue for HistogramClosed<T>
327where
328 T: MetricValue,
329{
330 type Unit = T::Unit;
331}
332
333const SCALING_FACTOR: f64 = (1 << 10) as f64;
334
335fn scale_up(v: impl Into<f64>) -> f64 {
336 v.into() * SCALING_FACTOR
337}
338
339fn scale_down(v: impl Into<f64>) -> f64 {
340 v.into() / SCALING_FACTOR
341}
342
343/// Exponential bucketing strategy using the histogram crate.
344///
345/// This uses 976 buckets and supports values from 0 to u64::MAX. Values greater than u64::MAX are truncated to u64::MAX.
346/// Scaling factor for converting floating point values to integers for histogram bucketing.
347/// 2^10 = 1024, providing 3 decimal places of precision.
348///
349/// Uses exponential bucketing with configurable precision. Default configuration
350/// uses 4-bit mantissa precision (16 buckets per order of magnitude, ~6.25% error).
351pub struct ExponentialAggregationStrategy {
352 inner: histogram::Histogram,
353}
354
355impl ExponentialAggregationStrategy {
356 /// Create a new exponential aggregation strategy with default configuration.
357 pub fn new() -> Self {
358 let config = default_histogram_config();
359 Self {
360 inner: histogram::Histogram::with_config(&config),
361 }
362 }
363}
364
365impl Default for ExponentialAggregationStrategy {
366 fn default() -> Self {
367 Self::new()
368 }
369}
370
371fn default_histogram_config() -> Config {
372 Config::new(4, 64).expect("known good")
373}
374
375impl AggregationStrategy for ExponentialAggregationStrategy {
376 fn record_many(&mut self, value: f64, count: u64) {
377 // the inner histogram drops data above u64::MAX in our default configuration
378 let value = scale_up(value);
379 self.inner
380 .add(value.min(u64::MAX as f64) as u64, count)
381 .ok();
382 }
383
384 fn drain(&mut self) -> Vec<Observation> {
385 let snapshot = std::mem::replace(
386 &mut self.inner,
387 histogram::Histogram::with_config(&default_histogram_config()),
388 );
389 snapshot
390 .iter()
391 .filter(|bucket| bucket.count() > 0)
392 .map(|bucket| {
393 let range = bucket.range();
394 let midpoint = range.start().midpoint(*range.end());
395 let midpoint = scale_down(midpoint as f64);
396 Observation::Repeated {
397 total: midpoint * bucket.count() as f64,
398 occurrences: bucket.count(),
399 }
400 })
401 .collect()
402 }
403}
404
405/// Strategy that stores all observations and sorts them on emission.
406///
407/// This preserves all observations exactly but uses more memory than bucketing strategies.
408/// This uses a `SmallVec` (default size 32, memory usage of 256 bytes) to avoid allocations for small numbers of observations.
409///
410/// The const generic `N` controls the inline capacity before heap allocation.
411#[derive(Default)]
412pub struct SortAndMerge<const N: usize = 32> {
413 values: SmallVec<[f64; N]>,
414}
415
416impl<const N: usize> SortAndMerge<N> {
417 /// Create a new sort-and-merge strategy.
418 pub fn new() -> Self {
419 Self {
420 values: SmallVec::new(),
421 }
422 }
423}
424
425impl<const N: usize> AggregationStrategy for SortAndMerge<N> {
426 fn record_many(&mut self, value: f64, count: u64) {
427 self.values
428 .extend(std::iter::repeat_n(value, count as usize));
429 }
430
431 fn drain(&mut self) -> Vec<Observation> {
432 self.values.sort_by_key(|v| OrderedFloat(*v));
433 let mut observations = Vec::new();
434 let mut iter = self.values.iter().copied().filter(|v| !v.is_nan());
435
436 if let Some(first) = iter.next() {
437 let mut current_value = first;
438 let mut current_count: u64 = 1;
439
440 for value in iter {
441 if value == current_value {
442 current_count = current_count.saturating_add(1);
443 } else {
444 observations.push(Observation::Repeated {
445 total: current_value * current_count as f64,
446 occurrences: current_count,
447 });
448 current_value = value;
449 current_count = 1;
450 }
451 }
452
453 observations.push(Observation::Repeated {
454 total: current_value * current_count as f64,
455 occurrences: current_count,
456 });
457 }
458
459 self.values.clear();
460 observations
461 }
462}
463
464/// Thread-safe exponential bucketing strategy using atomic counters.
465///
466/// This uses 976 buckets and supports values from 0 to u64::MAX. Values greater than u64::MAX are truncated to u64::MAX.
467///
468/// Like [`ExponentialAggregationStrategy`] but uses atomic operations to allow concurrent
469/// recording from multiple threads.
470pub struct AtomicExponentialAggregationStrategy {
471 inner: histogram::AtomicHistogram,
472}
473
474impl AtomicExponentialAggregationStrategy {
475 /// Create a new atomic exponential aggregation strategy with default configuration.
476 pub fn new() -> Self {
477 Self {
478 inner: histogram::AtomicHistogram::with_config(&default_histogram_config()),
479 }
480 }
481}
482
483impl Default for AtomicExponentialAggregationStrategy {
484 fn default() -> Self {
485 Self::new()
486 }
487}
488
489impl SharedAggregationStrategy for AtomicExponentialAggregationStrategy {
490 fn record_many(&self, value: f64, count: u64) {
491 let value = scale_up(value);
492 self.inner
493 .add(value.min(u64::MAX as f64) as u64, count)
494 .ok();
495 }
496
497 fn drain(&self) -> Vec<Observation> {
498 self.inner
499 .drain()
500 .iter()
501 .filter(|bucket| bucket.count() > 0)
502 .map(|bucket| {
503 let range = bucket.range();
504 let midpoint = range.start().midpoint(*range.end());
505 let midpoint = scale_down(midpoint as f64);
506 Observation::Repeated {
507 total: midpoint * bucket.count() as f64,
508 occurrences: bucket.count(),
509 }
510 })
511 .collect()
512 }
513}
514
515/// AggregateValue implementation for Histogram
516impl<T, S> AggregateValue<T> for Histogram<T, S>
517where
518 T: MetricValue,
519 S: AggregationStrategy + Default,
520{
521 type Aggregated = Histogram<T, S>;
522
523 fn insert(accum: &mut Self::Aggregated, value: T) {
524 accum.add_value(value);
525 }
526}
527
528/// AggregateValue implementation for merging closed histograms into a histogram.
529///
530/// This enables aggregating structs that already contain `Histogram` fields —
531/// when the source is closed, each `Histogram<T>` becomes a `HistogramClosed<T>`,
532/// and this impl replays those observations into the accumulator histogram.
533impl<T, S> AggregateValue<HistogramClosed<T>> for Histogram<T, S>
534where
535 T: MetricValue,
536 S: AggregationStrategy + Default,
537{
538 type Aggregated = Histogram<T, S>;
539
540 fn insert(accum: &mut Self::Aggregated, value: HistogramClosed<T>) {
541 for obs in value.observations {
542 match obs {
543 Observation::Repeated { total, occurrences } => {
544 if occurrences > 0 {
545 accum
546 .strategy
547 .record_many(total / occurrences as f64, occurrences);
548 }
549 }
550 Observation::Unsigned(v) => accum.strategy.record(v as f64),
551 Observation::Floating(v) => accum.strategy.record(v),
552 _ => {}
553 }
554 }
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use assert2::check;
561 use metrique_writer::Observation;
562
563 use crate::histogram::{
564 AggregationStrategy, AtomicExponentialAggregationStrategy, ExponentialAggregationStrategy,
565 SharedAggregationStrategy, default_histogram_config, scale_down, scale_up,
566 };
567
568 #[test]
569 fn test_histogram_max_values() {
570 let v = f64::MAX;
571 let mut strat = ExponentialAggregationStrategy::new();
572 strat.record(v);
573 check!(
574 strat.drain()
575 == vec![Observation::Repeated {
576 // value is truncated to u64::MAX
577 total: 1.7732923532771328e16,
578 occurrences: 1,
579 }]
580 );
581 }
582
583 #[test]
584 fn test_atomic_histogram_max_values() {
585 let v = f64::MAX;
586 let strat = AtomicExponentialAggregationStrategy::new();
587 strat.record(v);
588 check!(
589 strat.drain()
590 == vec![Observation::Repeated {
591 // value is truncated to u64::MAX
592 total: 1.7732923532771328e16,
593 occurrences: 1,
594 }]
595 );
596 }
597
598 #[test]
599 fn num_buckets() {
600 check!(default_histogram_config().total_buckets() == 976);
601 }
602
603 #[test]
604 fn test_scaling() {
605 let x = 0.001;
606 check!(scale_down(scale_up(x)) == x);
607 }
608}