exponential_decay_histogram/
lib.rs1#![warn(missing_docs)]
53use ordered_float::NotNan;
54use rand::distr::Open01;
55use rand::rngs::SmallRng;
56use rand::{Rng, SeedableRng};
57use std::collections::BTreeMap;
58use std::iter;
59use std::slice;
60use std::time::{Duration, Instant};
61
62const RESCALE_THRESHOLD: Duration = Duration::from_secs(60 * 60);
63
64#[derive(Debug)]
65struct WeightedSample<T> {
66 value: i64,
67 exemplar: T,
68 weight: f64,
69}
70
71#[derive(Debug)]
75pub struct ExponentialDecayHistogram<T = ()> {
76 values: BTreeMap<NotNan<f64>, WeightedSample<T>>,
77 alpha: f64,
78 size: usize,
79 count: u64,
80 start_time: Instant,
81 next_scale_time: Instant,
82 rng: SmallRng,
83}
84
85impl Default for ExponentialDecayHistogram {
86 fn default() -> ExponentialDecayHistogram {
87 ExponentialDecayHistogram::new()
88 }
89}
90
91impl ExponentialDecayHistogram {
92 pub fn new() -> Self {
94 Self::builder().build()
95 }
96
97 pub fn builder() -> Builder {
99 Builder {
100 now: Instant::now(),
101 size: 1028,
102 alpha: 0.015,
103 }
104 }
105
106 #[deprecated = "use `ExponentialDecayHistogram::builder()` instead"]
119 pub fn with_size_and_alpha(size: usize, alpha: f64) -> Self {
120 Self::builder().size(size).alpha(alpha).build()
121 }
122
123 pub fn update(&mut self, value: i64) {
125 self.update_at(Instant::now(), value);
126 }
127
128 pub fn update_at(&mut self, time: Instant, value: i64) {
134 self.update_at_ex(time, value, ());
135 }
136}
137
138impl<T> ExponentialDecayHistogram<T>
139where
140 T: Clone,
141{
142 pub fn update_ex(&mut self, value: i64, exemplar: T) {
144 self.update_at_ex(Instant::now(), value, exemplar);
145 }
146
147 pub fn update_at_ex(&mut self, time: Instant, value: i64, exemplar: T) {
149 self.rescale_if_needed(time);
150 self.count += 1;
151
152 let item_weight = self.weight(time);
153 let sample = WeightedSample {
154 value,
155 exemplar,
156 weight: item_weight,
157 };
158 let priority = item_weight / self.rng.sample::<f64, _>(&Open01);
160 let priority = NotNan::new(priority).unwrap();
161
162 if self.values.len() < self.size {
163 self.values.insert(priority, sample);
164 } else {
165 let first = *self.values.keys().next().unwrap();
166 if first < priority && self.values.insert(priority, sample).is_none() {
167 self.values.remove(&first).unwrap();
168 }
169 }
170 }
171
172 pub fn snapshot(&self) -> Snapshot<T> {
174 let mut entries = self
175 .values
176 .values()
177 .map(|s| SnapshotEntry {
178 value: s.value,
179 exemplar: s.exemplar.clone(),
180 norm_weight: s.weight,
181 quantile: NotNan::new(0.).unwrap(),
182 })
183 .collect::<Vec<_>>();
184
185 entries.sort_by_key(|e| e.value);
186
187 let sum_weight = entries.iter().map(|e| e.norm_weight).sum::<f64>();
188 for entry in &mut entries {
189 entry.norm_weight /= sum_weight;
190 }
191
192 entries.iter_mut().fold(NotNan::new(0.).unwrap(), |acc, e| {
193 e.quantile = acc;
194 acc + NotNan::new(e.norm_weight).unwrap()
195 });
196
197 Snapshot {
198 entries,
199 count: self.count,
200 }
201 }
202
203 fn weight(&self, time: Instant) -> f64 {
204 (self.alpha * (time - self.start_time).as_secs() as f64).exp()
205 }
206
207 fn rescale_if_needed(&mut self, now: Instant) {
208 if now >= self.next_scale_time {
209 self.rescale(now);
210 }
211 }
212
213 fn rescale(&mut self, now: Instant) {
214 self.next_scale_time = now + RESCALE_THRESHOLD;
215 let old_start_time = self.start_time;
216 self.start_time = now;
217 let scaling_factor =
218 NotNan::new((-self.alpha * (now - old_start_time).as_secs() as f64).exp()).unwrap();
219
220 self.values = self
221 .values
222 .iter()
223 .map(|(&k, v)| {
224 (
225 k * scaling_factor,
226 WeightedSample {
227 value: v.value,
228 exemplar: v.exemplar.clone(),
229 weight: v.weight * *scaling_factor,
230 },
231 )
232 })
233 .collect();
234 }
235}
236
237pub struct Builder {
239 now: Instant,
240 size: usize,
241 alpha: f64,
242}
243
244impl Builder {
245 pub fn at(&mut self, now: Instant) -> &mut Self {
249 self.now = now;
250 self
251 }
252
253 pub fn size(&mut self, size: usize) -> &mut Self {
263 assert!(size > 0);
264
265 self.size = size;
266 self
267 }
268
269 pub fn alpha(&mut self, alpha: f64) -> &mut Self {
275 self.alpha = alpha;
276 self
277 }
278
279 pub fn build<T>(&self) -> ExponentialDecayHistogram<T> {
281 ExponentialDecayHistogram {
282 values: BTreeMap::new(),
283 alpha: self.alpha,
284 size: self.size,
285 count: 0,
286 start_time: self.now,
287 next_scale_time: self.now + RESCALE_THRESHOLD,
289 rng: SmallRng::from_rng(&mut rand::rng()),
291 }
292 }
293}
294
295struct SnapshotEntry<T> {
296 value: i64,
297 exemplar: T,
298 norm_weight: f64,
299 quantile: NotNan<f64>,
300}
301
302pub struct Snapshot<T = ()> {
304 entries: Vec<SnapshotEntry<T>>,
305 count: u64,
306}
307
308impl<T> Snapshot<T> {
309 pub fn value(&self, quantile: f64) -> i64 {
319 assert!((0. ..=1.).contains(&quantile));
320
321 if self.entries.is_empty() {
322 return 0;
323 }
324
325 let quantile = NotNan::new(quantile).unwrap();
326 let idx = match self.entries.binary_search_by(|e| e.quantile.cmp(&quantile)) {
327 Ok(idx) => idx,
328 Err(idx) if idx >= self.entries.len() => self.entries.len() - 1,
329 Err(idx) => idx,
330 };
331
332 self.entries[idx].value
333 }
334
335 pub fn max(&self) -> i64 {
337 self.entries.last().map_or(0, |e| e.value)
338 }
339
340 pub fn min(&self) -> i64 {
342 self.entries.first().map_or(0, |e| e.value)
343 }
344
345 pub fn mean(&self) -> f64 {
347 self.entries
348 .iter()
349 .map(|e| e.value as f64 * e.norm_weight)
350 .sum::<f64>()
351 }
352
353 pub fn stddev(&self) -> f64 {
356 if self.entries.len() <= 1 {
357 return 0.;
358 }
359
360 let mean = self.mean();
361 let variance = self
362 .entries
363 .iter()
364 .map(|e| {
365 let diff = e.value as f64 - mean;
366 e.norm_weight * diff * diff
367 })
368 .sum::<f64>();
369
370 variance.sqrt()
371 }
372
373 pub fn count(&self) -> u64 {
376 self.count
377 }
378
379 pub fn values<'a>(&'a self) -> Values<'a, T> {
381 Values {
382 it: self.entries.iter().peekable(),
383 }
384 }
385
386 pub fn exemplars(&self) -> Exemplars<'_, T> {
388 Exemplars {
389 it: self.entries.iter(),
390 }
391 }
392}
393
394pub struct Values<'a, T = ()> {
396 it: iter::Peekable<slice::Iter<'a, SnapshotEntry<T>>>,
397}
398
399impl<'a> Iterator for Values<'a> {
400 type Item = (i64, f64);
401
402 fn next(&mut self) -> Option<(i64, f64)> {
403 let (value, mut weight) = match self.it.next() {
404 Some(v) => (v.value, v.norm_weight),
405 None => return None,
406 };
407
408 loop {
409 match self.it.peek() {
410 Some(v) if v.value == value => weight += v.norm_weight,
411 _ => break,
412 }
413 self.it.next();
414 }
415
416 Some((value, weight))
417 }
418}
419
420pub struct Exemplars<'a, T> {
422 it: slice::Iter<'a, SnapshotEntry<T>>,
423}
424
425impl<'a, T> Iterator for Exemplars<'a, T> {
426 type Item = (i64, &'a T);
427
428 fn next(&mut self) -> Option<(i64, &'a T)> {
429 self.it.next().map(|e| (e.value, &e.exemplar))
430 }
431}
432
433#[cfg(test)]
434mod test {
435 use super::*;
436 use std::ops::Range;
437
438 #[test]
439 fn a_histogram_of_100_out_of_1000_elements() {
440 let mut histogram = ExponentialDecayHistogram::builder()
441 .size(100)
442 .alpha(0.99)
443 .build();
444 for i in 0..1000 {
445 histogram.update(i);
446 }
447
448 assert_eq!(histogram.values.len(), 100);
449
450 let snapshot = histogram.snapshot();
451
452 assert_eq!(snapshot.entries.len(), 100);
453
454 assert_all_values_between(snapshot, 0..1000);
455 }
456
457 #[test]
458 fn a_histogram_of_100_out_of_10_elements() {
459 let mut histogram = ExponentialDecayHistogram::builder()
460 .size(100)
461 .alpha(0.99)
462 .build();
463 for i in 0..10 {
464 histogram.update(i);
465 }
466
467 let snapshot = histogram.snapshot();
468
469 assert_eq!(snapshot.entries.len(), 10);
470
471 assert_all_values_between(snapshot, 0..10);
472 }
473
474 #[test]
475 fn a_heavily_biased_histogram_of_100_out_of_1000_elements() {
476 let mut histogram = ExponentialDecayHistogram::builder()
477 .size(1000)
478 .alpha(0.01)
479 .build();
480 for i in 0..100 {
481 histogram.update(i);
482 }
483
484 assert_eq!(histogram.values.len(), 100);
485
486 let snapshot = histogram.snapshot();
487
488 assert_eq!(snapshot.entries.len(), 100);
489
490 assert_all_values_between(snapshot, 0..100);
491 }
492
493 #[test]
494 fn long_periods_of_inactivity_should_not_corrupt_sampling_state() {
495 let mut now = Instant::now();
496 let mut histogram = ExponentialDecayHistogram::builder()
497 .at(now)
498 .size(10)
499 .alpha(0.015)
500 .build();
501
502 let delta = Duration::from_millis(100);
504 for i in 0..1000 {
505 now += delta;
506 histogram.update_at(now, 1000 + i);
507 }
508
509 let snapshot = histogram.snapshot();
510 assert_eq!(snapshot.entries.len(), 10);
511 assert_all_values_between(snapshot, 1000..2000);
512
513 now += Duration::from_secs(15 * 60 * 60);
518 histogram.update_at(now, 2000);
519
520 let snapshot = histogram.snapshot();
521 assert_eq!(snapshot.entries.len(), 2);
522 assert_all_values_between(snapshot, 1000..3000);
523
524 for i in 0..1000 {
526 now += delta;
527 histogram.update_at(now, 3000 + i);
528 }
529 let snapshot = histogram.snapshot();
530 assert_eq!(snapshot.entries.len(), 10);
531 assert_all_values_between(snapshot, 3000..4000);
532 }
533
534 #[test]
535 fn spot_lift() {
536 let mut now = Instant::now();
537 let mut histogram = ExponentialDecayHistogram::builder()
538 .at(now)
539 .size(1000)
540 .alpha(0.015)
541 .build();
542
543 let values_per_minute = 10;
544 let values_interval = Duration::from_secs(60) / values_per_minute;
545 for _ in 0..120 * values_per_minute {
547 histogram.update_at(now, 177);
548 now += values_interval;
549 }
550
551 for _ in 0..10 * values_per_minute {
553 histogram.update_at(now, 9999);
554 now += values_interval;
555 }
556
557 assert_eq!(histogram.snapshot().value(0.5), 9999);
559 }
560
561 #[test]
562 fn spot_fall() {
563 let mut now = Instant::now();
564 let mut histogram = ExponentialDecayHistogram::builder()
565 .at(now)
566 .size(1000)
567 .alpha(0.015)
568 .build();
569
570 let values_per_minute = 10;
571 let values_interval = Duration::from_secs(60) / values_per_minute;
572 for _ in 0..120 * values_per_minute {
574 histogram.update_at(now, 9998);
575 now += values_interval;
576 }
577
578 for _ in 0..10 * values_per_minute {
580 histogram.update_at(now, 178);
581 now += values_interval;
582 }
583
584 assert_eq!(histogram.snapshot().value(0.5), 178);
586 }
587
588 #[test]
589 fn quantiles_should_be_based_on_weights() {
590 let mut now = Instant::now();
591 let mut histogram = ExponentialDecayHistogram::builder()
592 .at(now)
593 .size(1000)
594 .alpha(0.015)
595 .build();
596
597 for _ in 0..40 {
598 histogram.update_at(now, 177);
599 }
600
601 now += Duration::from_secs(120);
602
603 for _ in 0..10 {
604 histogram.update_at(now, 9999);
605 }
606
607 let snapshot = histogram.snapshot();
608 assert_eq!(snapshot.entries.len(), 50);
609
610 assert_eq!(snapshot.value(0.5), 9999);
614 assert_eq!(snapshot.value(0.75), 9999);
615 }
616
617 fn assert_all_values_between(snapshot: Snapshot, range: Range<i64>) {
618 for entry in &snapshot.entries {
619 assert!(
620 entry.value >= range.start && entry.value < range.end,
621 "snapshot value {} was not in {:?}",
622 entry.value,
623 range
624 );
625 }
626 }
627
628 #[test]
629 fn values() {
630 let mut histogram = ExponentialDecayHistogram::new();
631 let now = histogram.start_time;
632
633 histogram.update_at(now, 1);
634 histogram.update_at(now, 1);
635 histogram.update_at(now, 1);
636 histogram.update_at(now, 10);
637
638 let values = histogram.snapshot().values().collect::<Vec<_>>();
639 assert_eq!(values, vec![(1, 0.75), (10, 0.25)]);
640 }
641}