exponential_histogram/
exponential_histogram.rs

1use std::{
2    cmp::min,
3    collections::VecDeque,
4    f64::consts::{E, LN_2, LOG2_E},
5};
6
7/// An auto-scaling histogram approximation implementation following the opentelemetry
8/// exponential histogram algorithm.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ExponentialHistogram {
11    actual_scale: u8,
12    desired_scale: u8,
13    max_bucket_count: u16,
14    bucket_start_offset: u32,
15    positive_buckets: VecDeque<usize>,
16    negative_buckets: VecDeque<usize>,
17}
18
19impl std::fmt::Display for ExponentialHistogram {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        f.debug_map()
22            .entries(
23                self.value_counts()
24                    // let the bucket format be coarse for readability
25                    .map(|(bucket, count)| (format!("{:.2}", bucket), count)),
26            )
27            .finish()
28    }
29}
30
31impl Default for ExponentialHistogram {
32    fn default() -> Self {
33        Self::new(8)
34    }
35}
36
37impl ExponentialHistogram {
38    /// Desired scale will drop as necessary to match the static max buckets configuration.
39    /// This will happen dynamically in response to observed range. If your distribution
40    /// range falls within 160 contiguous buckets somewhere the desired scale's range, then
41    /// your output scale will match your desired scale. If your observed range exceeds 160
42    /// buckets then scale will be reduced to reflect the data's width.
43    pub fn new(desired_scale: u8) -> Self {
44        Self::new_with_max_buckets(desired_scale, 160)
45    }
46
47    /// Desired scale will drop as necessary to match the static max buckets configuration.
48    /// This will happen dynamically in response to observed range. If your distribution
49    /// range falls within max_buckets contiguous buckets somewhere the desired scale's range,
50    /// then your output scale will match your desired scale. If your observed range exceeds
51    /// max_buckets then scale will be reduced to reflect the data's width.
52    pub fn new_with_max_buckets(desired_scale: u8, max_buckets: u16) -> Self {
53        let desired_scale = desired_scale.clamp(0, 8);
54        Self {
55            actual_scale: desired_scale,
56            desired_scale,
57            max_bucket_count: max_buckets,
58            bucket_start_offset: 0,
59            positive_buckets: Default::default(),
60            negative_buckets: Default::default(),
61        }
62    }
63
64    /// Reset this histogram to an empty state
65    pub fn reset(&mut self) {
66        self.actual_scale = self.desired_scale;
67        self.bucket_start_offset = 0;
68        self.positive_buckets.clear();
69        self.negative_buckets.clear();
70    }
71
72    /// Observe a value, increasing its bucket's count by 1
73    pub fn accumulate<T: Into<f64>>(&mut self, value: T) {
74        self.accumulate_count(value, 1)
75    }
76
77    /// True when there aren't any measurements in this histogram
78    pub fn is_empty(&self) -> bool {
79        self.positive_buckets.is_empty() && self.negative_buckets.is_empty()
80    }
81
82    /// How many observations have been made?
83    pub fn count(&self) -> usize {
84        let mut count = 0;
85        for i in &self.positive_buckets {
86            count += *i;
87        }
88        for i in &self.negative_buckets {
89            count += *i;
90        }
91        count
92    }
93
94    /// This is an approximation, just using the positive buckets for the sum.
95    pub fn sum(&self) -> f64 {
96        self.positive_buckets
97            .iter()
98            .enumerate()
99            .map(|(index, count)| {
100                lower_boundary(self.actual_scale, self.bucket_start_offset as usize, index)
101                    * *count as f64
102            })
103            .sum()
104    }
105
106    /// This is an approximation, just using the positive buckets for the min.
107    pub fn min(&self) -> f64 {
108        self.positive_buckets
109            .iter()
110            .enumerate()
111            .filter(|(_, count)| 0 < **count)
112            .map(|(index, _count)| {
113                lower_boundary(self.actual_scale, self.bucket_start_offset as usize, index)
114            })
115            .next()
116            .unwrap_or_default()
117    }
118
119    /// This is an approximation, just using the positive buckets for the max.
120    pub fn max(&self) -> f64 {
121        self.positive_buckets
122            .iter()
123            .enumerate()
124            .rev()
125            .filter(|(_, count)| 0 < **count)
126            .map(|(index, _count)| {
127                lower_boundary(self.actual_scale, self.bucket_start_offset as usize, index)
128            })
129            .next()
130            .unwrap_or_default()
131    }
132
133    /// What is the current scale (as defined by opentelemetry exponential histogram)?
134    pub fn scale(&self) -> u8 {
135        self.actual_scale
136    }
137
138    /// What is the current bucket start offset (as defined by opentelemetry exponential histogram)?
139    pub fn bucket_start_offset(&self) -> usize {
140        self.bucket_start_offset as usize
141    }
142
143    /// Remove and return (positive, negative) bucket counts per the opentelemetry histogram concept.
144    ///
145    /// Remember that index 0 is actually the bucket_start_offset()'th bucket (as defined by opentelemetry exponential histogram).
146    pub fn take_counts(self) -> (VecDeque<usize>, VecDeque<usize>) {
147        (self.positive_buckets, self.negative_buckets)
148    }
149
150    /// Are there any negative observations?
151    pub fn has_negatives(&self) -> bool {
152        !self.negative_buckets.is_empty()
153    }
154
155    /// Iterate pairs of bucket->count. The bucket thresholds are defined by the opentelemetry exponential
156    /// histogram format. You do not need to do any extra math, this walks the actual mapping of bucket-to-count.
157    pub fn value_counts(&self) -> impl Iterator<Item = (f64, usize)> + '_ {
158        self.negative_buckets
159            .iter()
160            .enumerate()
161            .map(|(index, count)| {
162                (
163                    lower_boundary(self.actual_scale, self.bucket_start_offset as usize, index),
164                    *count,
165                )
166            })
167            .chain(
168                self.positive_buckets
169                    .iter()
170                    .enumerate()
171                    .map(|(index, count)| {
172                        (
173                            lower_boundary(
174                                self.actual_scale,
175                                self.bucket_start_offset as usize,
176                                index,
177                            ),
178                            *count,
179                        )
180                    }),
181            )
182    }
183
184    fn accumulate_count<T: Into<f64>>(&mut self, value: T, count: usize) {
185        let value = value.into();
186
187        // This may be before or after the current range, and that range might need to be expanded.
188        let scale_index = map_value_to_scale_index(self.actual_scale, value);
189
190        // Initialize the histogram to center on the first data point. That should probabilistically
191        // reduce the amount of shifting we do over time, for normal distributions.
192        if self.is_empty() {
193            self.bucket_start_offset =
194                (scale_index as u32).saturating_sub(self.max_bucket_count as u32 / 2);
195        }
196        let mut local_index = scale_index as i64 - self.bucket_start_offset as i64;
197
198        while local_index < 0 && self.rotate_range_down_one_index() {
199            local_index += 1
200        }
201        while self.max_bucket_count as i64 <= local_index && self.rotate_range_up_one_index() {
202            local_index -= 1
203        }
204        if local_index < 0 || self.max_bucket_count as i64 <= local_index {
205            if self.zoom_out() {
206                self.accumulate(value);
207                return;
208            }
209            // if we didn't zoom out then we're at the end of the range.
210            local_index = self.max_bucket_count as i64 - 1;
211        }
212
213        let index = min(self.max_bucket_count as usize - 1, local_index as usize);
214        let buckets = self.get_mut_buckets_for_value(value);
215        buckets.extend((0..=local_index.saturating_sub(buckets.len() as i64)).map(|_| 0));
216
217        buckets[index] += count;
218    }
219
220    fn zoom_out(&mut self) -> bool {
221        if self.actual_scale == 0 {
222            return false;
223        }
224        let old_scale: i32 = self.actual_scale.into();
225        let old_bucket_start_offset = self.bucket_start_offset as usize;
226        let old_positives = std::mem::take(&mut self.positive_buckets);
227        let old_negatives = std::mem::take(&mut self.negative_buckets);
228
229        self.actual_scale -= 1;
230        self.bucket_start_offset = 0;
231
232        // now just reingest
233        for (old_index, count) in old_positives.into_iter().enumerate() {
234            if 0 < count {
235                let value = lower_boundary(old_scale, old_bucket_start_offset, old_index);
236                self.accumulate_count(value, count)
237            }
238        }
239        for (old_index, count) in old_negatives.into_iter().enumerate() {
240            if 0 < count {
241                let value = -lower_boundary(old_scale, old_bucket_start_offset, old_index);
242                self.accumulate_count(value, count)
243            }
244        }
245
246        true
247    }
248
249    fn rotate_range_down_one_index(&mut self) -> bool {
250        if self.positive_buckets.len() < self.max_bucket_count as usize
251            && self.negative_buckets.len() < self.max_bucket_count as usize
252        {
253            if !self.positive_buckets.is_empty() {
254                self.positive_buckets.push_front(0);
255            }
256            if !self.negative_buckets.is_empty() {
257                self.negative_buckets.push_front(0);
258            }
259            self.bucket_start_offset -= 1;
260            true
261        } else {
262            false
263        }
264    }
265
266    fn rotate_range_up_one_index(&mut self) -> bool {
267        if self.positive_buckets.front().copied().unwrap_or_default() == 0
268            && self.negative_buckets.front().copied().unwrap_or_default() == 0
269        {
270            self.positive_buckets.pop_front();
271            self.negative_buckets.pop_front();
272            self.bucket_start_offset += 1;
273            true
274        } else {
275            false
276        }
277    }
278
279    fn get_mut_buckets_for_value(&mut self, value: f64) -> &mut VecDeque<usize> {
280        let buckets = if value.is_sign_positive() {
281            &mut self.positive_buckets
282        } else {
283            &mut self.negative_buckets
284        };
285        if buckets.is_empty() {
286            // I could reserve these ahead of time, but it seems likely that many uses will have exclusively
287            // positive or exclusively negative numbers - so this saves memory in those cases.
288            buckets.reserve_exact(self.max_bucket_count as usize);
289        }
290        buckets
291    }
292}
293
294/// treats negative numbers as positive - you gotta accumulate into a negative array
295fn map_value_to_scale_index(scale: impl Into<i32>, raw_value: impl Into<f64>) -> usize {
296    let value = raw_value.into().abs();
297    let scale_factor = LOG2_E * 2_f64.powi(scale.into());
298    (value.log(E) * scale_factor).floor() as usize
299}
300
301/// obviously only supports positive indices. If you want a negative boundary, flip the sign on the return value.
302/// per the wonkadoo instructions found at: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
303///   > The positive and negative ranges of the histogram are expressed separately. Negative values are mapped by
304///   > their absolute value into the negative range using the same scale as the positive range. Note that in the
305///   > negative range, therefore, histogram buckets use lower-inclusive boundaries.
306fn lower_boundary(scale: impl Into<i32>, offset: usize, index: usize) -> f64 {
307    let inverse_scale_factor = LN_2 * 2_f64.powi(-scale.into());
308    ((offset + index) as f64 * inverse_scale_factor).exp()
309}
310
311#[cfg(test)]
312mod test {
313    use std::time::{Duration, Instant};
314
315    use crate::exponential_histogram::{lower_boundary, map_value_to_scale_index};
316
317    use super::ExponentialHistogram;
318
319    #[test]
320    fn check_range() {
321        assert_eq!(1275, map_value_to_scale_index(6, 1_000_000));
322        assert_eq!(1275 + 160, map_value_to_scale_index(6, 5_650_000));
323
324        assert_eq!(637, map_value_to_scale_index(5, 1_000_000));
325        assert_eq!(637 + 160, map_value_to_scale_index(5, 32_000_000));
326    }
327
328    #[test]
329    fn indices_scale_zero_positive_numbers() {
330        let e = ExponentialHistogram::new(0);
331
332        assert_eq!(0, map_value_to_scale_index(e.scale(), 0_f64));
333        assert_value_lowerboundary(&e, 0, 1);
334        assert_value_lowerboundary(&e, 1, 1);
335        assert_value_lowerboundary(&e, 2, 2);
336        assert_value_lowerboundary(&e, 3, 2);
337        assert_value_lowerboundary(&e, 4, 4);
338        assert_value_lowerboundary(&e, 7, 4);
339        assert_value_lowerboundary(&e, 8, 4);
340        assert_value_lowerboundary(&e, 8.1, 8);
341    }
342
343    #[test]
344    fn indices_scale_zero_negative_numbers() {
345        let e = ExponentialHistogram::new(0);
346
347        assert_eq!(0, map_value_to_scale_index(e.scale(), 0_f64));
348        assert_value_lowerboundary(&e, -0, 1);
349        assert_value_lowerboundary(&e, -1, 1);
350        assert_value_lowerboundary(&e, -2, 2);
351        assert_value_lowerboundary(&e, -3, 2);
352        assert_value_lowerboundary(&e, -4, 4);
353        assert_value_lowerboundary(&e, -7, 4);
354        assert_value_lowerboundary(&e, -8, 4);
355        assert_value_lowerboundary(&e, -8.1, 8);
356    }
357
358    #[test]
359    fn indices_scale_one_positive_numbers() {
360        let e = ExponentialHistogram::new(1);
361
362        assert_eq!(0, map_value_to_scale_index(e.scale(), 0_f64));
363        assert_value_lowerboundary(&e, 0, 1);
364        assert_value_lowerboundary(&e, 1, 1);
365        assert_value_lowerboundary(&e, 2, 2);
366        assert_value_lowerboundary(&e, 3, 2.828);
367        assert_value_lowerboundary(&e, 4, 4);
368        assert_value_lowerboundary(&e, 7, 5.657);
369        assert_value_lowerboundary(&e, 8, 5.657);
370        assert_value_lowerboundary(&e, 8.1, 8);
371    }
372
373    #[test]
374    fn indices_scale_two_positive_numbers() {
375        let e = ExponentialHistogram::new(2);
376
377        assert_eq!(0, map_value_to_scale_index(e.scale(), 0_f64));
378        assert_value_lowerboundary(&e, 0, 1);
379        assert_value_lowerboundary(&e, 1, 1);
380        assert_value_lowerboundary(&e, 2, 2);
381        assert_value_lowerboundary(&e, 3, 2.828);
382        assert_value_lowerboundary(&e, 4, 4);
383        assert_value_lowerboundary(&e, 7, 6.727);
384        assert_value_lowerboundary(&e, 8, 6.727);
385        assert_value_lowerboundary(&e, 8.1, 8);
386    }
387
388    #[test]
389    fn indices_scale_three_positive_numbers() {
390        let e = ExponentialHistogram::new(3);
391
392        assert_eq!(0, map_value_to_scale_index(e.scale(), 0_f64));
393        assert_value_lowerboundary(&e, 0, 1);
394        assert_value_lowerboundary(&e, 1, 1);
395        assert_value_lowerboundary(&e, 2, 2);
396        assert_value_lowerboundary(&e, 3, 2.828);
397        assert_value_lowerboundary(&e, 4, 4);
398        assert_value_lowerboundary(&e, 7, 6.727);
399        assert_value_lowerboundary(&e, 8, 7.337);
400        assert_value_lowerboundary(&e, 8.1, 8);
401    }
402
403    #[test]
404    fn indices_scale_four_positive_numbers() {
405        let e = ExponentialHistogram::new(4);
406
407        assert_eq!(0, map_value_to_scale_index(e.scale(), 0_f64));
408        assert_value_lowerboundary(&e, 0, 1);
409        assert_value_lowerboundary(&e, 1, 1);
410        assert_value_lowerboundary(&e, 2, 2);
411        assert_value_lowerboundary(&e, 3, 2.954);
412        assert_value_lowerboundary(&e, 4, 4);
413        assert_value_lowerboundary(&e, 5, 4.967);
414        assert_value_lowerboundary(&e, 6, 5.907);
415        assert_value_lowerboundary(&e, 7, 6.727);
416        assert_value_lowerboundary(&e, 8, 7.661);
417        assert_value_lowerboundary(&e, 8.1, 8);
418    }
419
420    #[test]
421    fn indices_scale_downgrade_positive_numbers() {
422        //
423        // -------- Start out with a fine-grained histogram --------
424        //
425        let mut e = ExponentialHistogram::new(8);
426
427        e.accumulate(24_000_000);
428        assert_eq!(
429            6196, e.bucket_start_offset,
430            "histogram initializes with the first observation in the middle of the range"
431        );
432        assert_eq_epsilon(23984931.775, e.min(), "min and max should be equal");
433        assert_eq_epsilon(23984931.775, e.max(), "min and max should be equal");
434
435        assert_eq!(
436            8,
437            e.scale(),
438            "initial value should not change scale since it falls in the numeric range"
439        );
440        assert_eq!(
441            81,
442            e.positive_buckets.len(),
443            "initial value should be in the middle"
444        );
445        assert_eq!(
446            1, e.positive_buckets[80],
447            "initial value should go in index 80 because that is halfway to 160"
448        );
449        assert_eq!(
450            6196, e.bucket_start_offset,
451            "bucket start offset should index into scale 8"
452        );
453
454        // assert some bucket boundaries for convenience
455        assert_value_lowerboundary(&e, 24_000_000, 23984931.775);
456        assert_value_lowerboundary(&e, 24_040_000, 23984931.775);
457        assert_value_lowerboundary(&e, 24_050_000, 24049961.522);
458
459        assert_eq_epsilon(
460            19313750.368,
461            lower_boundary(8, 0, 6196),
462            "lower boundary of histogram",
463        );
464        assert_eq_epsilon(
465            29785874.896,
466            lower_boundary(8, 0, 6196 + 160),
467            "upper boundary of histogram",
468        );
469
470        // Accumulate some data in a bucket's range
471        for i in 0..40_000 {
472            e.accumulate(24_000_000 + i);
473        }
474        assert_eq!(
475            40001, e.positive_buckets[80],
476            "initial value should go in index 80 because that is halfway to 160"
477        );
478
479        e.accumulate(24_050_000);
480        assert_eq!(
481            8,
482            e.scale(),
483            "a value in the next higher bucket should not change the scale"
484        );
485        assert_eq!(
486            82,
487            e.positive_buckets.len(),
488            "bucket count should be able to increase densely when a new bucket value is observed"
489        );
490        assert_eq!(1, e.positive_buckets[81], "index 81 has a new count");
491        assert_eq!(
492            6196, e.bucket_start_offset,
493            "bucket start offset does not change when adding a bucket in the same range"
494        );
495
496        // Poke at growth boundary conditions
497        e.accumulate(23_984_000);
498        assert_eq!(
499            8,
500            e.scale(),
501            "a value in the next lower bucket should not change the scale"
502        );
503        assert_eq!(82, e.positive_buckets.len(), "bucket count should not increase when a new bucket value is observed within the covered range");
504        assert_eq!(1, e.positive_buckets[79], "index 79 has a new count");
505        assert_eq!(
506            6196, e.bucket_start_offset,
507            "bucket start offset does not change when using a bucket in the same range"
508        );
509
510        e.accumulate(19_313_750);
511        assert_eq!(8, e.scale(), "a value below the covered range should not change the scale yet because there is room above the observed range to shift");
512        assert_eq!(83, e.positive_buckets.len(), "bucket count should not increase when a new bucket value is observed within the covered range");
513        assert_eq!(1, e.positive_buckets[0], "index 0 has a new count");
514        assert_eq!(
515            6195, e.bucket_start_offset,
516            "bucket start offset changes because we rotated down 1 position"
517        );
518        assert_eq_epsilon(
519            29705335.561,
520            lower_boundary(8, 0, 6195 + 160),
521            "new upper boundary of histogram",
522        );
523
524        //
525        // -------- Expand histogram range with a big number --------
526        //
527        e.accumulate(29_705_336);
528        assert_eq!(
529            3177,
530            map_value_to_scale_index(7, 29_705_336_f64),
531            "this value pushes the length of scale 7 also"
532        );
533        assert_eq!(7, e.scale(), "a value above the covered range should now change the scale because the lower end is populated while the upper end is beyond the range this scale can cover in 160 buckets");
534        assert_eq!(
535            160,
536            e.positive_buckets.len(),
537            "bucket count should be sensible after rescale"
538        );
539        assert_eq!(
540            1,
541            e.positive_buckets[e.positive_buckets.len() - 1],
542            "last index has a new count"
543        );
544        assert_eq!(
545            3018, e.bucket_start_offset,
546            "bucket start offset changes because we scaled and rotated"
547        );
548
549        //
550        // -------- Skip several zoom scale steps in a single accumulate --------
551        //
552        let recursive_scale_start_count = e.count();
553        assert_eq!(
554            2199023255551.996,
555            lower_boundary(2, 0, 164),
556            "this value gets us down into scale 2"
557        );
558        assert_eq_epsilon(
559            4.000,
560            lower_boundary(2, 0, 8),
561            "this value gets us down into scale 2",
562        );
563        assert_eq_epsilon(
564            4.757,
565            lower_boundary(2, 0, 9),
566            "this value gets us down into scale 2",
567        );
568        // pin the bucket's low value, at scale 2's index 8. It's not in scale 2 yet though!
569        e.accumulate(4.25);
570        // now blow the range wide, way past scale 7, resulting in a recursive scale down from 7 to precision 2.
571        e.accumulate(2_199_023_255_552_f64);
572        assert_eq!(2, e.scale(), "this value range should force scale range 2");
573        assert_eq!(8, e.bucket_start_offset, "bucket start offset should match the first element, since we rotated and grew out to the larger value");
574        assert_eq!(
575            1,
576            e.positive_buckets[8 - 8],
577            "this is the 4.0 bucket, and 4.25 should go in it."
578        );
579        assert_eq!(
580            1,
581            e.positive_buckets[164 - 8],
582            "this is the bucket for the big numer."
583        );
584        assert_eq!(recursive_scale_start_count + 2, e.count(), "2 more reports were made. The histogram maintains every count across rescaling, even recursive rescaling");
585    }
586
587    /// Look for random index crashes
588    #[test]
589    fn fuzz() {
590        let start = Instant::now();
591        while start.elapsed() < Duration::from_millis(50) {
592            let mut e = ExponentialHistogram::new(8);
593            let start = Instant::now();
594            while start.elapsed() < Duration::from_millis(1) {
595                e.accumulate(1_000_000_000_000_000_f64 * rand::random::<f64>());
596            }
597        }
598    }
599
600    /// Look for random index crashes
601    #[test]
602    fn fuzz_negative() {
603        let start = Instant::now();
604        while start.elapsed() < Duration::from_millis(50) {
605            let mut e = ExponentialHistogram::new(8);
606            let start = Instant::now();
607            while start.elapsed() < Duration::from_millis(1) {
608                e.accumulate(-1_000_000_000_000_000_f64 * rand::random::<f64>());
609            }
610        }
611    }
612
613    #[track_caller]
614    fn assert_value_lowerboundary(
615        e: &ExponentialHistogram,
616        value: impl Into<f64>,
617        expected_lower_boundary: impl Into<f64>,
618    ) {
619        let observed_index = map_value_to_scale_index(e.scale(), value.into());
620        let observed_boundary = lower_boundary(e.scale(), 0, observed_index);
621        assert_eq_epsilon(
622            expected_lower_boundary.into(),
623            observed_boundary,
624            "boundary matches",
625        );
626        if 0 < observed_index {
627            let observed_offset_boundary = lower_boundary(e.scale(), 1, observed_index - 1);
628            assert_eq_epsilon(
629                observed_boundary,
630                observed_offset_boundary,
631                "offset must result in the same boundary",
632            );
633        }
634    }
635
636    #[track_caller]
637    fn assert_eq_epsilon(j: f64, k: f64, message: &str) {
638        const EPSILON: f64 = 1.0 / 128.0;
639        let difference = (j - k).abs();
640        assert!(
641            difference < EPSILON,
642            "{message}: {j} != {k} with epsilon {EPSILON}."
643        );
644    }
645}