exponential_decay_histogram/
lib.rs

1//! A histogram which exponentially weights in favor of recent values.
2//!
3//! Histograms compute statistics about the distribution of values in a data
4//! set. This histogram exponentially favors recent values over older ones,
5//! making it suitable for use cases such as monitoring the state of long
6//! running processes.
7//!
8//! The histogram does not store all values simultaneously, but rather a
9//! randomized subset. This allows us to put bounds on overall memory use
10//! regardless of the rate of events.
11//!
12//! This implementation is based on the `ExponentiallyDecayingReservoir` class
13//! in the Java [Metrics][1] library, which is itself based on the forward decay
14//! model described in [Cormode et al. 2009][2].
15//!
16//! [1]: http://metrics.dropwizard.io/3.2.2/
17//! [2]: http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
18//!
19//! # Examples
20//!
21//! ```
22//! use exponential_decay_histogram::ExponentialDecayHistogram;
23//!
24//! # fn do_work() -> i64 { 0 }
25//! let mut histogram = ExponentialDecayHistogram::new();
26//!
27//! // Do some work for a while and fill the histogram with some information.
28//! // Even though we're putting 10000 values into the histogram, it will only
29//! // retain a subset of them.
30//! for _ in 0..10000 {
31//!     let size = do_work();
32//!     histogram.update(size);
33//! }
34//!
35//! // Take a snapshot to inspect the current state of the histogram.
36//! let snapshot = histogram.snapshot();
37//! println!("count: {}", snapshot.count());
38//! println!("min: {}", snapshot.min());
39//! println!("max: {}", snapshot.max());
40//! println!("mean: {}", snapshot.mean());
41//! println!("standard deviation: {}", snapshot.stddev());
42//! println!("median: {}", snapshot.value(0.5));
43//! println!("99th percentile: {}", snapshot.value(0.99));
44//! ```
45#![warn(missing_docs)]
46use ordered_float::NotNan;
47use rand::distr::Open01;
48use rand::rngs::SmallRng;
49use rand::{Rng, SeedableRng};
50use std::collections::BTreeMap;
51use std::iter;
52use std::slice;
53use std::time::{Duration, Instant};
54
55const RESCALE_THRESHOLD: Duration = Duration::from_secs(60 * 60);
56
57#[derive(Debug)]
58struct WeightedSample {
59    value: i64,
60    weight: f64,
61}
62
63/// A histogram which exponentially weights in favor of recent values.
64///
65/// See the crate level documentation for more details.
66#[derive(Debug)]
67pub struct ExponentialDecayHistogram {
68    values: BTreeMap<NotNan<f64>, WeightedSample>,
69    alpha: f64,
70    size: usize,
71    count: u64,
72    start_time: Instant,
73    next_scale_time: Instant,
74    rng: SmallRng,
75}
76
77impl Default for ExponentialDecayHistogram {
78    fn default() -> ExponentialDecayHistogram {
79        ExponentialDecayHistogram::new()
80    }
81}
82
83impl ExponentialDecayHistogram {
84    /// Returns a new histogram with a default configuration.
85    pub fn new() -> Self {
86        Self::builder().build()
87    }
88
89    /// Returns a new builder to create a histogram with a custom configuration.
90    pub fn builder() -> Builder {
91        Builder {
92            now: Instant::now(),
93            size: 1028,
94            alpha: 0.015,
95        }
96    }
97
98    /// Returns a new histogram configured with the specified size and alpha.
99    ///
100    /// `size` specifies the number of values stored in the histogram. A larger
101    /// size will provide more accurate statistics, but with a higher memory
102    /// overhead.
103    ///
104    /// `alpha` specifies the exponential decay factor. A larger factor biases
105    /// the histogram towards newer values.
106    ///
107    /// # Panics
108    ///
109    /// Panics if `size` is zero.
110    #[deprecated = "use `ExponentialDecayHistogram::builder()` instead"]
111    pub fn with_size_and_alpha(size: usize, alpha: f64) -> Self {
112        Self::builder().size(size).alpha(alpha).build()
113    }
114
115    /// Inserts a value into the histogram at the current time.
116    pub fn update(&mut self, value: i64) {
117        self.update_at(Instant::now(), value);
118    }
119
120    /// Inserts a value into the histogram at the specified time.
121    ///
122    /// # Panics
123    ///
124    /// May panic if values are inserted at non-monotonically increasing times.
125    pub fn update_at(&mut self, time: Instant, value: i64) {
126        self.rescale_if_needed(time);
127        self.count += 1;
128
129        let item_weight = self.weight(time);
130        let sample = WeightedSample {
131            value,
132            weight: item_weight,
133        };
134        // Open01 since we don't want to divide by 0
135        let priority = item_weight / self.rng.sample::<f64, _>(&Open01);
136        let priority = NotNan::new(priority).unwrap();
137
138        if self.values.len() < self.size {
139            self.values.insert(priority, sample);
140        } else {
141            let first = *self.values.keys().next().unwrap();
142            if first < priority && self.values.insert(priority, sample).is_none() {
143                self.values.remove(&first).unwrap();
144            }
145        }
146    }
147
148    /// Takes a snapshot of the current state of the histogram.
149    pub fn snapshot(&self) -> Snapshot {
150        let mut entries = self
151            .values
152            .values()
153            .map(|s| SnapshotEntry {
154                value: s.value,
155                norm_weight: s.weight,
156                quantile: NotNan::new(0.).unwrap(),
157            })
158            .collect::<Vec<_>>();
159
160        entries.sort_by_key(|e| e.value);
161
162        let sum_weight = entries.iter().map(|e| e.norm_weight).sum::<f64>();
163        for entry in &mut entries {
164            entry.norm_weight /= sum_weight;
165        }
166
167        entries.iter_mut().fold(NotNan::new(0.).unwrap(), |acc, e| {
168            e.quantile = acc;
169            acc + NotNan::new(e.norm_weight).unwrap()
170        });
171
172        Snapshot {
173            entries,
174            count: self.count,
175        }
176    }
177
178    fn weight(&self, time: Instant) -> f64 {
179        (self.alpha * (time - self.start_time).as_secs() as f64).exp()
180    }
181
182    fn rescale_if_needed(&mut self, now: Instant) {
183        if now >= self.next_scale_time {
184            self.rescale(now);
185        }
186    }
187
188    fn rescale(&mut self, now: Instant) {
189        self.next_scale_time = now + RESCALE_THRESHOLD;
190        let old_start_time = self.start_time;
191        self.start_time = now;
192        let scaling_factor =
193            NotNan::new((-self.alpha * (now - old_start_time).as_secs() as f64).exp()).unwrap();
194
195        self.values = self
196            .values
197            .iter()
198            .map(|(&k, v)| {
199                (
200                    k * scaling_factor,
201                    WeightedSample {
202                        value: v.value,
203                        weight: v.weight * *scaling_factor,
204                    },
205                )
206            })
207            .collect();
208    }
209}
210
211/// A builder type for [`ExponentialDecayHistogram`] objects.
212pub struct Builder {
213    now: Instant,
214    size: usize,
215    alpha: f64,
216}
217
218impl Builder {
219    /// Sets the construction time of the histogram.
220    ///
221    /// Defaults to the system time when the [`Builder`] was constructed.
222    pub fn at(&mut self, now: Instant) -> &mut Self {
223        self.now = now;
224        self
225    }
226
227    /// Sets the size of the histogram.
228    ///
229    /// A larger size will provide a more accurate histogram, but with a higher memory overhead.
230    ///
231    /// Defaults to 1028, which offers a 99.9% confidence level with a 5% margin of error.
232    ///
233    /// # Panics
234    ///
235    /// Panics if `size` is 0.
236    pub fn size(&mut self, size: usize) -> &mut Self {
237        assert!(size > 0);
238
239        self.size = size;
240        self
241    }
242
243    /// Sets the decay rate of the histogram.
244    ///
245    /// A larger value biases the histogram towards newer values.
246    ///
247    /// Defaults to 0.015, which heavily biases towards the last 5 minutes of values.
248    pub fn alpha(&mut self, alpha: f64) -> &mut Self {
249        self.alpha = alpha;
250        self
251    }
252
253    /// Creates a new [`ExponentialDecayHistogram`].
254    pub fn build(&self) -> ExponentialDecayHistogram {
255        ExponentialDecayHistogram {
256            values: BTreeMap::new(),
257            alpha: self.alpha,
258            size: self.size,
259            count: 0,
260            start_time: self.now,
261            // we store this explicitly because it's ~10% faster than doing the math on demand
262            next_scale_time: self.now + RESCALE_THRESHOLD,
263            // using a SmallRng is ~10% faster than using thread_rng()
264            rng: SmallRng::from_rng(&mut rand::rng()),
265        }
266    }
267}
268
269struct SnapshotEntry {
270    value: i64,
271    norm_weight: f64,
272    quantile: NotNan<f64>,
273}
274
275/// A snapshot of the state of an `ExponentialDecayHistogram` at some point in time.
276pub struct Snapshot {
277    entries: Vec<SnapshotEntry>,
278    count: u64,
279}
280
281impl Snapshot {
282    /// Returns the value at a specified quantile in the snapshot, or 0 if it is
283    /// empty.
284    ///
285    /// For example, `snapshot.value(0.5)` returns the median value of the
286    /// snapshot.
287    ///
288    /// # Panics
289    ///
290    /// Panics if `quantile` is not between 0 and 1 (inclusive).
291    pub fn value(&self, quantile: f64) -> i64 {
292        assert!(quantile >= 0. && quantile <= 1.);
293
294        if self.entries.is_empty() {
295            return 0;
296        }
297
298        let quantile = NotNan::new(quantile).unwrap();
299        let idx = match self.entries.binary_search_by(|e| e.quantile.cmp(&quantile)) {
300            Ok(idx) => idx,
301            Err(idx) if idx >= self.entries.len() => self.entries.len() - 1,
302            Err(idx) => idx,
303        };
304
305        self.entries[idx].value
306    }
307
308    /// Returns the largest value in the snapshot, or 0 if it is empty.
309    pub fn max(&self) -> i64 {
310        self.entries.last().map_or(0, |e| e.value)
311    }
312
313    /// Returns the smallest value in the snapshot, or 0 if it is empty.
314    pub fn min(&self) -> i64 {
315        self.entries.first().map_or(0, |e| e.value)
316    }
317
318    /// Returns the mean of the values in the snapshot, or 0 if it is empty.
319    pub fn mean(&self) -> f64 {
320        self.entries
321            .iter()
322            .map(|e| e.value as f64 * e.norm_weight)
323            .sum::<f64>()
324    }
325
326    /// Returns the standard deviation of the values in the snapshot, or 0 if it
327    /// is empty.
328    pub fn stddev(&self) -> f64 {
329        if self.entries.len() <= 1 {
330            return 0.;
331        }
332
333        let mean = self.mean();
334        let variance = self
335            .entries
336            .iter()
337            .map(|e| {
338                let diff = e.value as f64 - mean;
339                e.norm_weight * diff * diff
340            })
341            .sum::<f64>();
342
343        variance.sqrt()
344    }
345
346    /// Returns the number of values which have been written to the histogram at
347    /// the time of the snapshot.
348    pub fn count(&self) -> u64 {
349        self.count
350    }
351
352    /// Returns an iterator over the distinct values in the snapshot along with their weights.
353    pub fn values<'a>(&'a self) -> Values<'a> {
354        Values {
355            it: self.entries.iter().peekable(),
356        }
357    }
358}
359
360/// An iterator over the distinct values in a snapshot along with their weights.
361pub struct Values<'a> {
362    it: iter::Peekable<slice::Iter<'a, SnapshotEntry>>,
363}
364
365impl<'a> Iterator for Values<'a> {
366    type Item = (i64, f64);
367
368    fn next(&mut self) -> Option<(i64, f64)> {
369        let (value, mut weight) = match self.it.next() {
370            Some(v) => (v.value, v.norm_weight),
371            None => return None,
372        };
373
374        loop {
375            match self.it.peek() {
376                Some(v) if v.value == value => weight += v.norm_weight,
377                _ => break,
378            }
379            self.it.next();
380        }
381
382        Some((value, weight))
383    }
384}
385
386#[cfg(test)]
387mod test {
388    use super::*;
389    use std::ops::Range;
390
391    #[test]
392    fn a_histogram_of_100_out_of_1000_elements() {
393        let mut histogram = ExponentialDecayHistogram::builder()
394            .size(100)
395            .alpha(0.99)
396            .build();
397        for i in 0..1000 {
398            histogram.update(i);
399        }
400
401        assert_eq!(histogram.values.len(), 100);
402
403        let snapshot = histogram.snapshot();
404
405        assert_eq!(snapshot.entries.len(), 100);
406
407        assert_all_values_between(snapshot, 0..1000);
408    }
409
410    #[test]
411    fn a_histogram_of_100_out_of_10_elements() {
412        let mut histogram = ExponentialDecayHistogram::builder()
413            .size(100)
414            .alpha(0.99)
415            .build();
416        for i in 0..10 {
417            histogram.update(i);
418        }
419
420        let snapshot = histogram.snapshot();
421
422        assert_eq!(snapshot.entries.len(), 10);
423
424        assert_all_values_between(snapshot, 0..10);
425    }
426
427    #[test]
428    fn a_heavily_biased_histogram_of_100_out_of_1000_elements() {
429        let mut histogram = ExponentialDecayHistogram::builder()
430            .size(1000)
431            .alpha(0.01)
432            .build();
433        for i in 0..100 {
434            histogram.update(i);
435        }
436
437        assert_eq!(histogram.values.len(), 100);
438
439        let snapshot = histogram.snapshot();
440
441        assert_eq!(snapshot.entries.len(), 100);
442
443        assert_all_values_between(snapshot, 0..100);
444    }
445
446    #[test]
447    fn long_periods_of_inactivity_should_not_corrupt_sampling_state() {
448        let mut now = Instant::now();
449        let mut histogram = ExponentialDecayHistogram::builder()
450            .at(now)
451            .size(10)
452            .alpha(0.015)
453            .build();
454
455        // add 1000 values at a rate of 10 values/second
456        let delta = Duration::from_millis(100);
457        for i in 0..1000 {
458            now += delta;
459            histogram.update_at(now, 1000 + i);
460        }
461
462        let snapshot = histogram.snapshot();
463        assert_eq!(snapshot.entries.len(), 10);
464        assert_all_values_between(snapshot, 1000..2000);
465
466        // wait for 15 hours and add another value.
467        // this should trigger a rescale. Note that the number of samples will
468        // be reduced because of the very small scaling factor that will make
469        // all existing priorities equal to zero after rescale.
470        now += Duration::from_secs(15 * 60 * 60);
471        histogram.update_at(now, 2000);
472
473        let snapshot = histogram.snapshot();
474        assert_eq!(snapshot.entries.len(), 2);
475        assert_all_values_between(snapshot, 1000..3000);
476
477        // add 1000 values at a rate of 10 values/second
478        for i in 0..1000 {
479            now += delta;
480            histogram.update_at(now, 3000 + i);
481        }
482        let snapshot = histogram.snapshot();
483        assert_eq!(snapshot.entries.len(), 10);
484        assert_all_values_between(snapshot, 3000..4000);
485    }
486
487    #[test]
488    fn spot_lift() {
489        let mut now = Instant::now();
490        let mut histogram = ExponentialDecayHistogram::builder()
491            .at(now)
492            .size(1000)
493            .alpha(0.015)
494            .build();
495
496        let values_per_minute = 10;
497        let values_interval = Duration::from_secs(60) / values_per_minute;
498        // mode 1: steady regime for 120 minutes
499        for _ in 0..120 * values_per_minute {
500            histogram.update_at(now, 177);
501            now += values_interval;
502        }
503
504        // switching to mode 2: 10 minutes with the same rate, but larger value
505        for _ in 0..10 * values_per_minute {
506            histogram.update_at(now, 9999);
507            now += values_interval;
508        }
509
510        // expect that the quantiles should be about mode 2 after 10 minutes
511        assert_eq!(histogram.snapshot().value(0.5), 9999);
512    }
513
514    #[test]
515    fn spot_fall() {
516        let mut now = Instant::now();
517        let mut histogram = ExponentialDecayHistogram::builder()
518            .at(now)
519            .size(1000)
520            .alpha(0.015)
521            .build();
522
523        let values_per_minute = 10;
524        let values_interval = Duration::from_secs(60) / values_per_minute;
525        // mode 1: steady regime for 120 minutes
526        for _ in 0..120 * values_per_minute {
527            histogram.update_at(now, 9998);
528            now += values_interval;
529        }
530
531        // switching to mode 2: 10 minutes with the same rate, but smaller value
532        for _ in 0..10 * values_per_minute {
533            histogram.update_at(now, 178);
534            now += values_interval;
535        }
536
537        // expect that the quantiles should be about mode 2 after 10 minutes
538        assert_eq!(histogram.snapshot().value(0.5), 178);
539    }
540
541    #[test]
542    fn quantiles_should_be_based_on_weights() {
543        let mut now = Instant::now();
544        let mut histogram = ExponentialDecayHistogram::builder()
545            .at(now)
546            .size(1000)
547            .alpha(0.015)
548            .build();
549
550        for _ in 0..40 {
551            histogram.update_at(now, 177);
552        }
553
554        now += Duration::from_secs(120);
555
556        for _ in 0..10 {
557            histogram.update_at(now, 9999);
558        }
559
560        let snapshot = histogram.snapshot();
561        assert_eq!(snapshot.entries.len(), 50);
562
563        // the first added 40 items (177) have weights 1
564        // the next added 10 items (9999) have weights ~6
565        // so, it's 40 vs 60 distribution, not 40 vs 10
566        assert_eq!(snapshot.value(0.5), 9999);
567        assert_eq!(snapshot.value(0.75), 9999);
568    }
569
570    fn assert_all_values_between(snapshot: Snapshot, range: Range<i64>) {
571        for entry in &snapshot.entries {
572            assert!(
573                entry.value >= range.start && entry.value < range.end,
574                "snapshot value {} was not in {:?}",
575                entry.value,
576                range
577            );
578        }
579    }
580
581    #[test]
582    fn values() {
583        let mut histogram = ExponentialDecayHistogram::new();
584        let now = histogram.start_time;
585
586        histogram.update_at(now, 1);
587        histogram.update_at(now, 1);
588        histogram.update_at(now, 1);
589        histogram.update_at(now, 10);
590
591        let values = histogram.snapshot().values().collect::<Vec<_>>();
592        assert_eq!(values, vec![(1, 0.75), (10, 0.25)]);
593    }
594}