Skip to main content

metrique_aggregation/
histogram.rs

1#![deny(clippy::arithmetic_side_effects)]
2
3//! Histogram types for aggregating multiple observations into distributions.
4//!
5//! When emitting high-frequency metrics, you often want to aggregate multiple observations
6//! into a single metric entry rather than emitting each one individually. This module provides
7//! histogram types that collect observations and emit them as distributions.
8//!
9//! # When to use histograms
10//!
11//! Use histograms when you have many observations of the same metric within a single unit of work:
12//!
13//! - A distributed query that fans out to multiple backend services
14//! - Processing a batch of items where you want to track per-item latency
15//! - Any operation that generates multiple measurements you want to aggregate
16//!
17//! For most applications, [sampling](https://github.com/awslabs/metrique/blob/main/docs/sampling.md)
18//! is a better approach than aggregation. Consider histograms when you need precise distributions
19//! for high-frequency events.
20//!
21//! # Example
22//!
23//! ```
24//! use metrique::unit_of_work::metrics;
25//! use metrique_aggregation::histogram::Histogram;
26//! use metrique_writer::unit::Millisecond;
27//! use std::time::Duration;
28//!
29//! #[metrics(rename_all = "PascalCase")]
30//! struct QueryMetrics {
31//!     query_id: String,
32//!
33//!     #[metrics(unit = Millisecond)]
34//!     backend_latency: Histogram<Duration>,
35//! }
36//!
37//! fn execute_query(query_id: String) {
38//!     let mut metrics = QueryMetrics {
39//!         query_id,
40//!         backend_latency: Histogram::default(),
41//!     };
42//!
43//!     // Record multiple observations
44//!     metrics.backend_latency.add_value(Duration::from_millis(45));
45//!     metrics.backend_latency.add_value(Duration::from_millis(67));
46//!     metrics.backend_latency.add_value(Duration::from_millis(52));
47//!
48//!     // When metrics drops, emits a single entry with the distribution
49//! }
50//! ```
51//!
52//! # Choosing an aggregation strategy
53//!
54//! By default, histograms use [`ExponentialAggregationStrategy`]. To use a different strategy,
55//! specify it as the second type parameter:
56//!
57//! ```
58//! use metrique_aggregation::histogram::{Histogram, SortAndMerge};
59//! use std::time::Duration;
60//!
61//! let histogram: Histogram<Duration, SortAndMerge> = Histogram::new(SortAndMerge::new());
62//! ```
63//!
64//! ## ExponentialAggregationStrategy (default)
65//!
66//! Uses exponential bucketing with ~6.25% error. This is the best choice for most use cases:
67//!
68//! - Provides consistent relative precision across wide value ranges
69//! - Memory efficient with fixed bucket count (464 buckets)
70//! - Fast recording and draining operations
71//!
72//! Use this when you need good precision across values that span multiple orders of magnitude
73//! (e.g., latencies from microseconds to seconds).
74//!
75//! ## AtomicExponentialAggregationStrategy
76//!
77//! Thread-safe version of exponential bucketing. Use with [`crate::histogram::SharedHistogram`] when you need
78//! to record values from multiple threads concurrently:
79//!
80//! ```
81//! use metrique_aggregation::histogram::{SharedHistogram, AtomicExponentialAggregationStrategy};
82//! use std::time::Duration;
83//!
84//! let histogram: SharedHistogram<Duration, AtomicExponentialAggregationStrategy> =
85//!     SharedHistogram::new(AtomicExponentialAggregationStrategy::new());
86//! ```
87//!
88//! ## SortAndMerge
89//!
90//! Stores all observations exactly and sorts them on emission:
91//!
92//! - Perfect precision - no bucketing error
93//! - Memory usage grows with observation count
94//! - Slower drain operation due to sorting
95//!
96//! Use this when you need exact values and have a bounded number of observations (typically < 1000).
97
98use histogram::Config;
99use metrique_core::CloseValue;
100use metrique_writer::{Distribution, MetricFlags, MetricValue, Observation, Value, ValueWriter};
101use ordered_float::OrderedFloat;
102use smallvec::SmallVec;
103use std::{borrow::Borrow, marker::PhantomData};
104
105use crate::traits::AggregateValue;
106
107/// Strategy for aggregating observations in a histogram.
108///
109/// Implementations determine how values are stored and converted to observations
110/// when the histogram is closed.
111pub trait AggregationStrategy {
112    /// Record a single observation.
113    fn record(&mut self, value: f64) {
114        self.record_many(value, 1);
115    }
116
117    /// Record multiple observations of the same value.
118    fn record_many(&mut self, value: f64, count: u64);
119
120    /// Drain all observations and return them as a vector.
121    ///
122    /// This resets the strategy's internal state.
123    fn drain(&mut self) -> Vec<Observation>;
124}
125
126/// Thread-safe strategy for aggregating observations in a histogram.
127///
128/// Like [`AggregationStrategy`] but allows recording values through a shared reference.
129pub trait SharedAggregationStrategy {
130    /// Record a single observation.
131    fn record(&self, value: f64) {
132        self.record_many(value, 1);
133    }
134
135    /// Record multiple observations of the same value through a shared reference.
136    fn record_many(&self, value: f64, count: u64);
137
138    /// Drain all observations and return them as a vector.
139    ///
140    /// This resets the strategy's internal state.
141    fn drain(&self) -> Vec<Observation>;
142}
143
144/// A histogram that collects multiple observations and emits them as a distribution.
145///
146/// Use this when you have many observations of the same metric within a single unit of work.
147/// The histogram aggregates values in memory and emits them as a single metric entry.
148///
149/// If you want to preserve all values instead of bucketing them, use `Histogram<T, SortAndMerge>` as
150/// the strategy.
151///
152/// Requires `&mut self` to add values. For thread-safe access, use [`SharedHistogram`].
153pub struct Histogram<T, S = ExponentialAggregationStrategy> {
154    strategy: S,
155    _value: PhantomData<T>,
156}
157
158impl<T, S: AggregationStrategy> Histogram<T, S> {
159    /// Create a new histogram with the given aggregation strategy.
160    pub fn new(strategy: S) -> Self {
161        Self {
162            strategy,
163            _value: PhantomData,
164        }
165    }
166
167    /// Add a value to the histogram.
168    ///
169    /// The value is converted to observations using the metric value's implementation,
170    /// then recorded in the aggregation strategy.
171    pub fn add_value(&mut self, value: impl Borrow<T>)
172    where
173        T: MetricValue,
174    {
175        let value = value.borrow();
176        struct Capturer<'a, S>(&'a mut S);
177        impl<'b, S: AggregationStrategy> ValueWriter for Capturer<'b, S> {
178            fn string(self, _value: &str) {}
179            fn metric<'a>(
180                self,
181                distribution: impl IntoIterator<Item = Observation>,
182                _unit: metrique_writer::Unit,
183                _dimensions: impl IntoIterator<Item = (&'a str, &'a str)>,
184                _flags: MetricFlags<'_>,
185            ) {
186                for obs in distribution {
187                    match obs {
188                        Observation::Unsigned(v) => self.0.record(v as f64),
189                        Observation::Floating(v) => self.0.record(v),
190                        Observation::Repeated { total, occurrences } => {
191                            if occurrences > 0 {
192                                let avg = total / occurrences as f64;
193                                self.0.record_many(avg, occurrences);
194                            }
195                        }
196                        _ => {}
197                    }
198                }
199            }
200            fn error(self, _error: metrique_writer::ValidationError) {}
201        }
202
203        let capturer = Capturer(&mut self.strategy);
204        value.write(capturer);
205    }
206}
207
208impl<T, S: Default + AggregationStrategy> Default for Histogram<T, S> {
209    fn default() -> Self {
210        Self::new(S::default())
211    }
212}
213
214impl<T: MetricValue, S: AggregationStrategy> CloseValue for Histogram<T, S> {
215    type Closed = HistogramClosed<T>;
216
217    fn close(mut self) -> Self::Closed {
218        HistogramClosed {
219            observations: self.strategy.drain(),
220            _value: PhantomData,
221        }
222    }
223}
224
225/// Thread-safe histogram that collects multiple observations and emits them as a distribution.
226///
227/// Like [`Histogram`] but allows adding values through a shared reference, making it
228/// suitable for concurrent access patterns.
229pub struct SharedHistogram<T, S = AtomicExponentialAggregationStrategy> {
230    strategy: S,
231    _value: PhantomData<T>,
232}
233
234impl<T, S: Default> Default for SharedHistogram<T, S> {
235    fn default() -> Self {
236        Self {
237            strategy: Default::default(),
238            _value: Default::default(),
239        }
240    }
241}
242
243impl<T, S: SharedAggregationStrategy> SharedHistogram<T, S> {
244    /// Create a new atomic histogram with the given aggregation strategy.
245    pub fn new(strategy: S) -> Self {
246        Self {
247            strategy,
248            _value: PhantomData,
249        }
250    }
251
252    /// Add a value to the histogram through a shared reference.
253    ///
254    /// The value is converted to observations using the metric value's implementation,
255    /// then recorded in the aggregation strategy.
256    pub fn add_value(&self, value: T)
257    where
258        T: MetricValue,
259    {
260        struct Capturer<'a, S>(&'a S);
261        impl<'b, S: SharedAggregationStrategy> ValueWriter for Capturer<'b, S> {
262            fn string(self, _value: &str) {}
263            fn metric<'a>(
264                self,
265                distribution: impl IntoIterator<Item = Observation>,
266                _unit: metrique_writer::Unit,
267                _dimensions: impl IntoIterator<Item = (&'a str, &'a str)>,
268                _flags: MetricFlags<'_>,
269            ) {
270                for obs in distribution {
271                    match obs {
272                        Observation::Unsigned(v) => self.0.record(v as f64),
273                        Observation::Floating(v) => self.0.record(v),
274                        Observation::Repeated { total, occurrences } => {
275                            if occurrences > 0 {
276                                let avg = total / occurrences as f64;
277                                self.0.record_many(avg, occurrences);
278                            }
279                        }
280                        _ => {}
281                    }
282                }
283            }
284            fn error(self, _error: metrique_writer::ValidationError) {}
285        }
286
287        let capturer = Capturer(&self.strategy);
288        value.write(capturer);
289    }
290}
291
292impl<T: MetricValue, S: SharedAggregationStrategy> CloseValue for SharedHistogram<T, S> {
293    type Closed = HistogramClosed<T>;
294
295    fn close(self) -> Self::Closed {
296        HistogramClosed {
297            observations: self.strategy.drain(),
298            _value: PhantomData,
299        }
300    }
301}
302
303/// Closed histogram value containing aggregated observations.
304///
305/// This is the result of closing a histogram and is emitted as a metric distribution.
306pub struct HistogramClosed<T> {
307    observations: Vec<Observation>,
308    _value: PhantomData<T>,
309}
310
311impl<T> Value for HistogramClosed<T>
312where
313    T: MetricValue,
314{
315    fn write(&self, writer: impl ValueWriter) {
316        use metrique_writer::unit::UnitTag;
317        writer.metric(
318            self.observations.iter().copied(),
319            T::Unit::UNIT,
320            [],
321            MetricFlags::upcast(&Distribution),
322        )
323    }
324}
325
326impl<T> MetricValue for HistogramClosed<T>
327where
328    T: MetricValue,
329{
330    type Unit = T::Unit;
331}
332
333const SCALING_FACTOR: f64 = (1 << 10) as f64;
334
335fn scale_up(v: impl Into<f64>) -> f64 {
336    v.into() * SCALING_FACTOR
337}
338
339fn scale_down(v: impl Into<f64>) -> f64 {
340    v.into() / SCALING_FACTOR
341}
342
343/// Exponential bucketing strategy using the histogram crate.
344///
345/// This uses 976 buckets and supports values from 0 to u64::MAX. Values greater than u64::MAX are truncated to u64::MAX.
346/// Scaling factor for converting floating point values to integers for histogram bucketing.
347/// 2^10 = 1024, providing 3 decimal places of precision.
348///
349/// Uses exponential bucketing with configurable precision. Default configuration
350/// uses 4-bit mantissa precision (16 buckets per order of magnitude, ~6.25% error).
351pub struct ExponentialAggregationStrategy {
352    inner: histogram::Histogram,
353}
354
355impl ExponentialAggregationStrategy {
356    /// Create a new exponential aggregation strategy with default configuration.
357    pub fn new() -> Self {
358        let config = default_histogram_config();
359        Self {
360            inner: histogram::Histogram::with_config(&config),
361        }
362    }
363}
364
365impl Default for ExponentialAggregationStrategy {
366    fn default() -> Self {
367        Self::new()
368    }
369}
370
371fn default_histogram_config() -> Config {
372    Config::new(4, 64).expect("known good")
373}
374
375impl AggregationStrategy for ExponentialAggregationStrategy {
376    fn record_many(&mut self, value: f64, count: u64) {
377        // the inner histogram drops data above u64::MAX in our default configuration
378        let value = scale_up(value);
379        self.inner
380            .add(value.min(u64::MAX as f64) as u64, count)
381            .ok();
382    }
383
384    fn drain(&mut self) -> Vec<Observation> {
385        let snapshot = std::mem::replace(
386            &mut self.inner,
387            histogram::Histogram::with_config(&default_histogram_config()),
388        );
389        snapshot
390            .iter()
391            .filter(|bucket| bucket.count() > 0)
392            .map(|bucket| {
393                let range = bucket.range();
394                let midpoint = range.start().midpoint(*range.end());
395                let midpoint = scale_down(midpoint as f64);
396                Observation::Repeated {
397                    total: midpoint * bucket.count() as f64,
398                    occurrences: bucket.count(),
399                }
400            })
401            .collect()
402    }
403}
404
405/// Strategy that stores all observations and sorts them on emission.
406///
407/// This preserves all observations exactly but uses more memory than bucketing strategies.
408/// This uses a `SmallVec` (default size 32, memory usage of 256 bytes) to avoid allocations for small numbers of observations.
409///
410/// The const generic `N` controls the inline capacity before heap allocation.
411#[derive(Default)]
412pub struct SortAndMerge<const N: usize = 32> {
413    values: SmallVec<[f64; N]>,
414}
415
416impl<const N: usize> SortAndMerge<N> {
417    /// Create a new sort-and-merge strategy.
418    pub fn new() -> Self {
419        Self {
420            values: SmallVec::new(),
421        }
422    }
423}
424
425impl<const N: usize> AggregationStrategy for SortAndMerge<N> {
426    fn record_many(&mut self, value: f64, count: u64) {
427        self.values
428            .extend(std::iter::repeat_n(value, count as usize));
429    }
430
431    fn drain(&mut self) -> Vec<Observation> {
432        self.values.sort_by_key(|v| OrderedFloat(*v));
433        let mut observations = Vec::new();
434        let mut iter = self.values.iter().copied().filter(|v| !v.is_nan());
435
436        if let Some(first) = iter.next() {
437            let mut current_value = first;
438            let mut current_count: u64 = 1;
439
440            for value in iter {
441                if value == current_value {
442                    current_count = current_count.saturating_add(1);
443                } else {
444                    observations.push(Observation::Repeated {
445                        total: current_value * current_count as f64,
446                        occurrences: current_count,
447                    });
448                    current_value = value;
449                    current_count = 1;
450                }
451            }
452
453            observations.push(Observation::Repeated {
454                total: current_value * current_count as f64,
455                occurrences: current_count,
456            });
457        }
458
459        self.values.clear();
460        observations
461    }
462}
463
464/// Thread-safe exponential bucketing strategy using atomic counters.
465///
466/// This uses 976 buckets and supports values from 0 to u64::MAX. Values greater than u64::MAX are truncated to u64::MAX.
467///
468/// Like [`ExponentialAggregationStrategy`] but uses atomic operations to allow concurrent
469/// recording from multiple threads.
470pub struct AtomicExponentialAggregationStrategy {
471    inner: histogram::AtomicHistogram,
472}
473
474impl AtomicExponentialAggregationStrategy {
475    /// Create a new atomic exponential aggregation strategy with default configuration.
476    pub fn new() -> Self {
477        Self {
478            inner: histogram::AtomicHistogram::with_config(&default_histogram_config()),
479        }
480    }
481}
482
483impl Default for AtomicExponentialAggregationStrategy {
484    fn default() -> Self {
485        Self::new()
486    }
487}
488
489impl SharedAggregationStrategy for AtomicExponentialAggregationStrategy {
490    fn record_many(&self, value: f64, count: u64) {
491        let value = scale_up(value);
492        self.inner
493            .add(value.min(u64::MAX as f64) as u64, count)
494            .ok();
495    }
496
497    fn drain(&self) -> Vec<Observation> {
498        self.inner
499            .drain()
500            .iter()
501            .filter(|bucket| bucket.count() > 0)
502            .map(|bucket| {
503                let range = bucket.range();
504                let midpoint = range.start().midpoint(*range.end());
505                let midpoint = scale_down(midpoint as f64);
506                Observation::Repeated {
507                    total: midpoint * bucket.count() as f64,
508                    occurrences: bucket.count(),
509                }
510            })
511            .collect()
512    }
513}
514
515/// AggregateValue implementation for Histogram
516impl<T, S> AggregateValue<T> for Histogram<T, S>
517where
518    T: MetricValue,
519    S: AggregationStrategy + Default,
520{
521    type Aggregated = Histogram<T, S>;
522
523    fn insert(accum: &mut Self::Aggregated, value: T) {
524        accum.add_value(value);
525    }
526}
527
528/// AggregateValue implementation for merging closed histograms into a histogram.
529///
530/// This enables aggregating structs that already contain `Histogram` fields —
531/// when the source is closed, each `Histogram<T>` becomes a `HistogramClosed<T>`,
532/// and this impl replays those observations into the accumulator histogram.
533impl<T, S> AggregateValue<HistogramClosed<T>> for Histogram<T, S>
534where
535    T: MetricValue,
536    S: AggregationStrategy + Default,
537{
538    type Aggregated = Histogram<T, S>;
539
540    fn insert(accum: &mut Self::Aggregated, value: HistogramClosed<T>) {
541        for obs in value.observations {
542            match obs {
543                Observation::Repeated { total, occurrences } => {
544                    if occurrences > 0 {
545                        accum
546                            .strategy
547                            .record_many(total / occurrences as f64, occurrences);
548                    }
549                }
550                Observation::Unsigned(v) => accum.strategy.record(v as f64),
551                Observation::Floating(v) => accum.strategy.record(v),
552                _ => {}
553            }
554        }
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use assert2::check;
561    use metrique_writer::Observation;
562
563    use crate::histogram::{
564        AggregationStrategy, AtomicExponentialAggregationStrategy, ExponentialAggregationStrategy,
565        SharedAggregationStrategy, default_histogram_config, scale_down, scale_up,
566    };
567
568    #[test]
569    fn test_histogram_max_values() {
570        let v = f64::MAX;
571        let mut strat = ExponentialAggregationStrategy::new();
572        strat.record(v);
573        check!(
574            strat.drain()
575                == vec![Observation::Repeated {
576                    // value is truncated to u64::MAX
577                    total: 1.7732923532771328e16,
578                    occurrences: 1,
579                }]
580        );
581    }
582
583    #[test]
584    fn test_atomic_histogram_max_values() {
585        let v = f64::MAX;
586        let strat = AtomicExponentialAggregationStrategy::new();
587        strat.record(v);
588        check!(
589            strat.drain()
590                == vec![Observation::Repeated {
591                    // value is truncated to u64::MAX
592                    total: 1.7732923532771328e16,
593                    occurrences: 1,
594                }]
595        );
596    }
597
598    #[test]
599    fn num_buckets() {
600        check!(default_histogram_config().total_buckets() == 976);
601    }
602
603    #[test]
604    fn test_scaling() {
605        let x = 0.001;
606        check!(scale_down(scale_up(x)) == x);
607    }
608}