exponential_decay_histogram/
lib.rs

1//! A histogram which exponentially weights in favor of recent values.
2//!
3//! Histograms compute statistics about the distribution of values in a data
4//! set. This histogram exponentially favors recent values over older ones,
5//! making it suitable for use cases such as monitoring the state of long
6//! running processes.
7//!
8//! The histogram does not store all values simultaneously, but rather a
9//! randomized subset. This allows us to put bounds on overall memory use
10//! regardless of the rate of events.
11//!
12//! Each sample in the histogram is composed of an `i64` value that contributes
13//! to the histogram's statistics and an exemplar, which is an arbitrary value
14//! that is tracked by the histogram as long the sample is retained. By
15//! default, the exemplar type is simply `()`, but consumers that want to use
16//! exemplars can pick a different type and pass it into the histogram with the
17//! [`ExponentialDecayHistogram::update_ex`] method.
18//!
19//! This implementation is based on the `ExponentiallyDecayingReservoir` class
20//! in the Java [Metrics][1] library, which is itself based on the forward decay
21//! model described in [Cormode et al. 2009][2].
22//!
23//! [1]: http://metrics.dropwizard.io/3.2.2/
24//! [2]: http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
25//!
26//! # Examples
27//!
28//! ```
29//! use exponential_decay_histogram::ExponentialDecayHistogram;
30//!
31//! # fn do_work() -> i64 { 0 }
32//! let mut histogram = ExponentialDecayHistogram::new();
33//!
34//! // Do some work for a while and fill the histogram with some information.
35//! // Even though we're putting 10000 values into the histogram, it will only
36//! // retain a subset of them.
37//! for _ in 0..10000 {
38//!     let size = do_work();
39//!     histogram.update(size);
40//! }
41//!
42//! // Take a snapshot to inspect the current state of the histogram.
43//! let snapshot = histogram.snapshot();
44//! println!("count: {}", snapshot.count());
45//! println!("min: {}", snapshot.min());
46//! println!("max: {}", snapshot.max());
47//! println!("mean: {}", snapshot.mean());
48//! println!("standard deviation: {}", snapshot.stddev());
49//! println!("median: {}", snapshot.value(0.5));
50//! println!("99th percentile: {}", snapshot.value(0.99));
51//! ```
52#![warn(missing_docs)]
53use ordered_float::NotNan;
54use rand::distr::Open01;
55use rand::rngs::SmallRng;
56use rand::{Rng, SeedableRng};
57use std::collections::BTreeMap;
58use std::iter;
59use std::slice;
60use std::time::{Duration, Instant};
61
62const RESCALE_THRESHOLD: Duration = Duration::from_secs(60 * 60);
63
64#[derive(Debug)]
65struct WeightedSample<T> {
66    value: i64,
67    exemplar: T,
68    weight: f64,
69}
70
71/// A histogram which exponentially weights in favor of recent values.
72///
73/// See the crate level documentation for more details.
74#[derive(Debug)]
75pub struct ExponentialDecayHistogram<T = ()> {
76    values: BTreeMap<NotNan<f64>, WeightedSample<T>>,
77    alpha: f64,
78    size: usize,
79    count: u64,
80    start_time: Instant,
81    next_scale_time: Instant,
82    rng: SmallRng,
83}
84
85impl Default for ExponentialDecayHistogram {
86    fn default() -> ExponentialDecayHistogram {
87        ExponentialDecayHistogram::new()
88    }
89}
90
91impl ExponentialDecayHistogram {
92    /// Returns a new histogram with a default configuration.
93    pub fn new() -> Self {
94        Self::builder().build()
95    }
96
97    /// Returns a new builder to create a histogram with a custom configuration.
98    pub fn builder() -> Builder {
99        Builder {
100            now: Instant::now(),
101            size: 1028,
102            alpha: 0.015,
103        }
104    }
105
106    /// Returns a new histogram configured with the specified size and alpha.
107    ///
108    /// `size` specifies the number of values stored in the histogram. A larger
109    /// size will provide more accurate statistics, but with a higher memory
110    /// overhead.
111    ///
112    /// `alpha` specifies the exponential decay factor. A larger factor biases
113    /// the histogram towards newer values.
114    ///
115    /// # Panics
116    ///
117    /// Panics if `size` is zero.
118    #[deprecated = "use `ExponentialDecayHistogram::builder()` instead"]
119    pub fn with_size_and_alpha(size: usize, alpha: f64) -> Self {
120        Self::builder().size(size).alpha(alpha).build()
121    }
122
123    /// Inserts a value into the histogram at the current time.
124    pub fn update(&mut self, value: i64) {
125        self.update_at(Instant::now(), value);
126    }
127
128    /// Inserts a value into the histogram at the specified time.
129    ///
130    /// # Panics
131    ///
132    /// May panic if values are inserted at non-monotonically increasing times.
133    pub fn update_at(&mut self, time: Instant, value: i64) {
134        self.update_at_ex(time, value, ());
135    }
136}
137
138impl<T> ExponentialDecayHistogram<T>
139where
140    T: Clone,
141{
142    /// Like [`Self::update`], except that it additionally takes an exemplar.
143    pub fn update_ex(&mut self, value: i64, exemplar: T) {
144        self.update_at_ex(Instant::now(), value, exemplar);
145    }
146
147    /// Like [`Self::update_at`], except that it additionally takes an exemplar.
148    pub fn update_at_ex(&mut self, time: Instant, value: i64, exemplar: T) {
149        self.rescale_if_needed(time);
150        self.count += 1;
151
152        let item_weight = self.weight(time);
153        let sample = WeightedSample {
154            value,
155            exemplar,
156            weight: item_weight,
157        };
158        // Open01 since we don't want to divide by 0
159        let priority = item_weight / self.rng.sample::<f64, _>(&Open01);
160        let priority = NotNan::new(priority).unwrap();
161
162        if self.values.len() < self.size {
163            self.values.insert(priority, sample);
164        } else {
165            let first = *self.values.keys().next().unwrap();
166            if first < priority && self.values.insert(priority, sample).is_none() {
167                self.values.remove(&first).unwrap();
168            }
169        }
170    }
171
172    /// Takes a snapshot of the current state of the histogram.
173    pub fn snapshot(&self) -> Snapshot<T> {
174        let mut entries = self
175            .values
176            .values()
177            .map(|s| SnapshotEntry {
178                value: s.value,
179                exemplar: s.exemplar.clone(),
180                norm_weight: s.weight,
181                quantile: NotNan::new(0.).unwrap(),
182            })
183            .collect::<Vec<_>>();
184
185        entries.sort_by_key(|e| e.value);
186
187        let sum_weight = entries.iter().map(|e| e.norm_weight).sum::<f64>();
188        for entry in &mut entries {
189            entry.norm_weight /= sum_weight;
190        }
191
192        entries.iter_mut().fold(NotNan::new(0.).unwrap(), |acc, e| {
193            e.quantile = acc;
194            acc + NotNan::new(e.norm_weight).unwrap()
195        });
196
197        Snapshot {
198            entries,
199            count: self.count,
200        }
201    }
202
203    fn weight(&self, time: Instant) -> f64 {
204        (self.alpha * (time - self.start_time).as_secs() as f64).exp()
205    }
206
207    fn rescale_if_needed(&mut self, now: Instant) {
208        if now >= self.next_scale_time {
209            self.rescale(now);
210        }
211    }
212
213    fn rescale(&mut self, now: Instant) {
214        self.next_scale_time = now + RESCALE_THRESHOLD;
215        let old_start_time = self.start_time;
216        self.start_time = now;
217        let scaling_factor =
218            NotNan::new((-self.alpha * (now - old_start_time).as_secs() as f64).exp()).unwrap();
219
220        self.values = self
221            .values
222            .iter()
223            .map(|(&k, v)| {
224                (
225                    k * scaling_factor,
226                    WeightedSample {
227                        value: v.value,
228                        exemplar: v.exemplar.clone(),
229                        weight: v.weight * *scaling_factor,
230                    },
231                )
232            })
233            .collect();
234    }
235}
236
237/// A builder type for [`ExponentialDecayHistogram`] objects.
238pub struct Builder {
239    now: Instant,
240    size: usize,
241    alpha: f64,
242}
243
244impl Builder {
245    /// Sets the construction time of the histogram.
246    ///
247    /// Defaults to the system time when the [`Builder`] was constructed.
248    pub fn at(&mut self, now: Instant) -> &mut Self {
249        self.now = now;
250        self
251    }
252
253    /// Sets the size of the histogram.
254    ///
255    /// A larger size will provide a more accurate histogram, but with a higher memory overhead.
256    ///
257    /// Defaults to 1028, which offers a 99.9% confidence level with a 5% margin of error.
258    ///
259    /// # Panics
260    ///
261    /// Panics if `size` is 0.
262    pub fn size(&mut self, size: usize) -> &mut Self {
263        assert!(size > 0);
264
265        self.size = size;
266        self
267    }
268
269    /// Sets the decay rate of the histogram.
270    ///
271    /// A larger value biases the histogram towards newer values.
272    ///
273    /// Defaults to 0.015, which heavily biases towards the last 5 minutes of values.
274    pub fn alpha(&mut self, alpha: f64) -> &mut Self {
275        self.alpha = alpha;
276        self
277    }
278
279    /// Creates a new [`ExponentialDecayHistogram`].
280    pub fn build<T>(&self) -> ExponentialDecayHistogram<T> {
281        ExponentialDecayHistogram {
282            values: BTreeMap::new(),
283            alpha: self.alpha,
284            size: self.size,
285            count: 0,
286            start_time: self.now,
287            // we store this explicitly because it's ~10% faster than doing the math on demand
288            next_scale_time: self.now + RESCALE_THRESHOLD,
289            // using a SmallRng is ~10% faster than using thread_rng()
290            rng: SmallRng::from_rng(&mut rand::rng()),
291        }
292    }
293}
294
295struct SnapshotEntry<T> {
296    value: i64,
297    exemplar: T,
298    norm_weight: f64,
299    quantile: NotNan<f64>,
300}
301
302/// A snapshot of the state of an `ExponentialDecayHistogram` at some point in time.
303pub struct Snapshot<T = ()> {
304    entries: Vec<SnapshotEntry<T>>,
305    count: u64,
306}
307
308impl<T> Snapshot<T> {
309    /// Returns the value at a specified quantile in the snapshot, or 0 if it is
310    /// empty.
311    ///
312    /// For example, `snapshot.value(0.5)` returns the median value of the
313    /// snapshot.
314    ///
315    /// # Panics
316    ///
317    /// Panics if `quantile` is not between 0 and 1 (inclusive).
318    pub fn value(&self, quantile: f64) -> i64 {
319        assert!((0. ..=1.).contains(&quantile));
320
321        if self.entries.is_empty() {
322            return 0;
323        }
324
325        let quantile = NotNan::new(quantile).unwrap();
326        let idx = match self.entries.binary_search_by(|e| e.quantile.cmp(&quantile)) {
327            Ok(idx) => idx,
328            Err(idx) if idx >= self.entries.len() => self.entries.len() - 1,
329            Err(idx) => idx,
330        };
331
332        self.entries[idx].value
333    }
334
335    /// Returns the largest value in the snapshot, or 0 if it is empty.
336    pub fn max(&self) -> i64 {
337        self.entries.last().map_or(0, |e| e.value)
338    }
339
340    /// Returns the smallest value in the snapshot, or 0 if it is empty.
341    pub fn min(&self) -> i64 {
342        self.entries.first().map_or(0, |e| e.value)
343    }
344
345    /// Returns the mean of the values in the snapshot, or 0 if it is empty.
346    pub fn mean(&self) -> f64 {
347        self.entries
348            .iter()
349            .map(|e| e.value as f64 * e.norm_weight)
350            .sum::<f64>()
351    }
352
353    /// Returns the standard deviation of the values in the snapshot, or 0 if it
354    /// is empty.
355    pub fn stddev(&self) -> f64 {
356        if self.entries.len() <= 1 {
357            return 0.;
358        }
359
360        let mean = self.mean();
361        let variance = self
362            .entries
363            .iter()
364            .map(|e| {
365                let diff = e.value as f64 - mean;
366                e.norm_weight * diff * diff
367            })
368            .sum::<f64>();
369
370        variance.sqrt()
371    }
372
373    /// Returns the number of values which have been written to the histogram at
374    /// the time of the snapshot.
375    pub fn count(&self) -> u64 {
376        self.count
377    }
378
379    /// Returns an iterator over the distinct values in the snapshot along with their weights.
380    pub fn values<'a>(&'a self) -> Values<'a, T> {
381        Values {
382            it: self.entries.iter().peekable(),
383        }
384    }
385
386    /// Returns an iterator over the values in the snapshot along with their exemplars.
387    pub fn exemplars(&self) -> Exemplars<'_, T> {
388        Exemplars {
389            it: self.entries.iter(),
390        }
391    }
392}
393
394/// An iterator over the distinct values in a snapshot along with their weights.
395pub struct Values<'a, T = ()> {
396    it: iter::Peekable<slice::Iter<'a, SnapshotEntry<T>>>,
397}
398
399impl<'a> Iterator for Values<'a> {
400    type Item = (i64, f64);
401
402    fn next(&mut self) -> Option<(i64, f64)> {
403        let (value, mut weight) = match self.it.next() {
404            Some(v) => (v.value, v.norm_weight),
405            None => return None,
406        };
407
408        loop {
409            match self.it.peek() {
410                Some(v) if v.value == value => weight += v.norm_weight,
411                _ => break,
412            }
413            self.it.next();
414        }
415
416        Some((value, weight))
417    }
418}
419
420/// An iterator over the values in the snapshot along with their exemplars.
421pub struct Exemplars<'a, T> {
422    it: slice::Iter<'a, SnapshotEntry<T>>,
423}
424
425impl<'a, T> Iterator for Exemplars<'a, T> {
426    type Item = (i64, &'a T);
427
428    fn next(&mut self) -> Option<(i64, &'a T)> {
429        self.it.next().map(|e| (e.value, &e.exemplar))
430    }
431}
432
433#[cfg(test)]
434mod test {
435    use super::*;
436    use std::ops::Range;
437
438    #[test]
439    fn a_histogram_of_100_out_of_1000_elements() {
440        let mut histogram = ExponentialDecayHistogram::builder()
441            .size(100)
442            .alpha(0.99)
443            .build();
444        for i in 0..1000 {
445            histogram.update(i);
446        }
447
448        assert_eq!(histogram.values.len(), 100);
449
450        let snapshot = histogram.snapshot();
451
452        assert_eq!(snapshot.entries.len(), 100);
453
454        assert_all_values_between(snapshot, 0..1000);
455    }
456
457    #[test]
458    fn a_histogram_of_100_out_of_10_elements() {
459        let mut histogram = ExponentialDecayHistogram::builder()
460            .size(100)
461            .alpha(0.99)
462            .build();
463        for i in 0..10 {
464            histogram.update(i);
465        }
466
467        let snapshot = histogram.snapshot();
468
469        assert_eq!(snapshot.entries.len(), 10);
470
471        assert_all_values_between(snapshot, 0..10);
472    }
473
474    #[test]
475    fn a_heavily_biased_histogram_of_100_out_of_1000_elements() {
476        let mut histogram = ExponentialDecayHistogram::builder()
477            .size(1000)
478            .alpha(0.01)
479            .build();
480        for i in 0..100 {
481            histogram.update(i);
482        }
483
484        assert_eq!(histogram.values.len(), 100);
485
486        let snapshot = histogram.snapshot();
487
488        assert_eq!(snapshot.entries.len(), 100);
489
490        assert_all_values_between(snapshot, 0..100);
491    }
492
493    #[test]
494    fn long_periods_of_inactivity_should_not_corrupt_sampling_state() {
495        let mut now = Instant::now();
496        let mut histogram = ExponentialDecayHistogram::builder()
497            .at(now)
498            .size(10)
499            .alpha(0.015)
500            .build();
501
502        // add 1000 values at a rate of 10 values/second
503        let delta = Duration::from_millis(100);
504        for i in 0..1000 {
505            now += delta;
506            histogram.update_at(now, 1000 + i);
507        }
508
509        let snapshot = histogram.snapshot();
510        assert_eq!(snapshot.entries.len(), 10);
511        assert_all_values_between(snapshot, 1000..2000);
512
513        // wait for 15 hours and add another value.
514        // this should trigger a rescale. Note that the number of samples will
515        // be reduced because of the very small scaling factor that will make
516        // all existing priorities equal to zero after rescale.
517        now += Duration::from_secs(15 * 60 * 60);
518        histogram.update_at(now, 2000);
519
520        let snapshot = histogram.snapshot();
521        assert_eq!(snapshot.entries.len(), 2);
522        assert_all_values_between(snapshot, 1000..3000);
523
524        // add 1000 values at a rate of 10 values/second
525        for i in 0..1000 {
526            now += delta;
527            histogram.update_at(now, 3000 + i);
528        }
529        let snapshot = histogram.snapshot();
530        assert_eq!(snapshot.entries.len(), 10);
531        assert_all_values_between(snapshot, 3000..4000);
532    }
533
534    #[test]
535    fn spot_lift() {
536        let mut now = Instant::now();
537        let mut histogram = ExponentialDecayHistogram::builder()
538            .at(now)
539            .size(1000)
540            .alpha(0.015)
541            .build();
542
543        let values_per_minute = 10;
544        let values_interval = Duration::from_secs(60) / values_per_minute;
545        // mode 1: steady regime for 120 minutes
546        for _ in 0..120 * values_per_minute {
547            histogram.update_at(now, 177);
548            now += values_interval;
549        }
550
551        // switching to mode 2: 10 minutes with the same rate, but larger value
552        for _ in 0..10 * values_per_minute {
553            histogram.update_at(now, 9999);
554            now += values_interval;
555        }
556
557        // expect that the quantiles should be about mode 2 after 10 minutes
558        assert_eq!(histogram.snapshot().value(0.5), 9999);
559    }
560
561    #[test]
562    fn spot_fall() {
563        let mut now = Instant::now();
564        let mut histogram = ExponentialDecayHistogram::builder()
565            .at(now)
566            .size(1000)
567            .alpha(0.015)
568            .build();
569
570        let values_per_minute = 10;
571        let values_interval = Duration::from_secs(60) / values_per_minute;
572        // mode 1: steady regime for 120 minutes
573        for _ in 0..120 * values_per_minute {
574            histogram.update_at(now, 9998);
575            now += values_interval;
576        }
577
578        // switching to mode 2: 10 minutes with the same rate, but smaller value
579        for _ in 0..10 * values_per_minute {
580            histogram.update_at(now, 178);
581            now += values_interval;
582        }
583
584        // expect that the quantiles should be about mode 2 after 10 minutes
585        assert_eq!(histogram.snapshot().value(0.5), 178);
586    }
587
588    #[test]
589    fn quantiles_should_be_based_on_weights() {
590        let mut now = Instant::now();
591        let mut histogram = ExponentialDecayHistogram::builder()
592            .at(now)
593            .size(1000)
594            .alpha(0.015)
595            .build();
596
597        for _ in 0..40 {
598            histogram.update_at(now, 177);
599        }
600
601        now += Duration::from_secs(120);
602
603        for _ in 0..10 {
604            histogram.update_at(now, 9999);
605        }
606
607        let snapshot = histogram.snapshot();
608        assert_eq!(snapshot.entries.len(), 50);
609
610        // the first added 40 items (177) have weights 1
611        // the next added 10 items (9999) have weights ~6
612        // so, it's 40 vs 60 distribution, not 40 vs 10
613        assert_eq!(snapshot.value(0.5), 9999);
614        assert_eq!(snapshot.value(0.75), 9999);
615    }
616
617    fn assert_all_values_between(snapshot: Snapshot, range: Range<i64>) {
618        for entry in &snapshot.entries {
619            assert!(
620                entry.value >= range.start && entry.value < range.end,
621                "snapshot value {} was not in {:?}",
622                entry.value,
623                range
624            );
625        }
626    }
627
628    #[test]
629    fn values() {
630        let mut histogram = ExponentialDecayHistogram::new();
631        let now = histogram.start_time;
632
633        histogram.update_at(now, 1);
634        histogram.update_at(now, 1);
635        histogram.update_at(now, 1);
636        histogram.update_at(now, 10);
637
638        let values = histogram.snapshot().values().collect::<Vec<_>>();
639        assert_eq!(values, vec![(1, 0.75), (10, 0.25)]);
640    }
641}