exponential_decay_histogram/
lib.rs1#![warn(missing_docs)]
46use ordered_float::NotNan;
47use rand::distr::Open01;
48use rand::rngs::SmallRng;
49use rand::{Rng, SeedableRng};
50use std::collections::BTreeMap;
51use std::iter;
52use std::slice;
53use std::time::{Duration, Instant};
54
55const RESCALE_THRESHOLD: Duration = Duration::from_secs(60 * 60);
56
57#[derive(Debug)]
58struct WeightedSample {
59 value: i64,
60 weight: f64,
61}
62
63#[derive(Debug)]
67pub struct ExponentialDecayHistogram {
68 values: BTreeMap<NotNan<f64>, WeightedSample>,
69 alpha: f64,
70 size: usize,
71 count: u64,
72 start_time: Instant,
73 next_scale_time: Instant,
74 rng: SmallRng,
75}
76
77impl Default for ExponentialDecayHistogram {
78 fn default() -> ExponentialDecayHistogram {
79 ExponentialDecayHistogram::new()
80 }
81}
82
83impl ExponentialDecayHistogram {
84 pub fn new() -> Self {
86 Self::builder().build()
87 }
88
89 pub fn builder() -> Builder {
91 Builder {
92 now: Instant::now(),
93 size: 1028,
94 alpha: 0.015,
95 }
96 }
97
98 #[deprecated = "use `ExponentialDecayHistogram::builder()` instead"]
111 pub fn with_size_and_alpha(size: usize, alpha: f64) -> Self {
112 Self::builder().size(size).alpha(alpha).build()
113 }
114
115 pub fn update(&mut self, value: i64) {
117 self.update_at(Instant::now(), value);
118 }
119
120 pub fn update_at(&mut self, time: Instant, value: i64) {
126 self.rescale_if_needed(time);
127 self.count += 1;
128
129 let item_weight = self.weight(time);
130 let sample = WeightedSample {
131 value,
132 weight: item_weight,
133 };
134 let priority = item_weight / self.rng.sample::<f64, _>(&Open01);
136 let priority = NotNan::new(priority).unwrap();
137
138 if self.values.len() < self.size {
139 self.values.insert(priority, sample);
140 } else {
141 let first = *self.values.keys().next().unwrap();
142 if first < priority && self.values.insert(priority, sample).is_none() {
143 self.values.remove(&first).unwrap();
144 }
145 }
146 }
147
148 pub fn snapshot(&self) -> Snapshot {
150 let mut entries = self
151 .values
152 .values()
153 .map(|s| SnapshotEntry {
154 value: s.value,
155 norm_weight: s.weight,
156 quantile: NotNan::new(0.).unwrap(),
157 })
158 .collect::<Vec<_>>();
159
160 entries.sort_by_key(|e| e.value);
161
162 let sum_weight = entries.iter().map(|e| e.norm_weight).sum::<f64>();
163 for entry in &mut entries {
164 entry.norm_weight /= sum_weight;
165 }
166
167 entries.iter_mut().fold(NotNan::new(0.).unwrap(), |acc, e| {
168 e.quantile = acc;
169 acc + NotNan::new(e.norm_weight).unwrap()
170 });
171
172 Snapshot {
173 entries,
174 count: self.count,
175 }
176 }
177
178 fn weight(&self, time: Instant) -> f64 {
179 (self.alpha * (time - self.start_time).as_secs() as f64).exp()
180 }
181
182 fn rescale_if_needed(&mut self, now: Instant) {
183 if now >= self.next_scale_time {
184 self.rescale(now);
185 }
186 }
187
188 fn rescale(&mut self, now: Instant) {
189 self.next_scale_time = now + RESCALE_THRESHOLD;
190 let old_start_time = self.start_time;
191 self.start_time = now;
192 let scaling_factor =
193 NotNan::new((-self.alpha * (now - old_start_time).as_secs() as f64).exp()).unwrap();
194
195 self.values = self
196 .values
197 .iter()
198 .map(|(&k, v)| {
199 (
200 k * scaling_factor,
201 WeightedSample {
202 value: v.value,
203 weight: v.weight * *scaling_factor,
204 },
205 )
206 })
207 .collect();
208 }
209}
210
211pub struct Builder {
213 now: Instant,
214 size: usize,
215 alpha: f64,
216}
217
218impl Builder {
219 pub fn at(&mut self, now: Instant) -> &mut Self {
223 self.now = now;
224 self
225 }
226
227 pub fn size(&mut self, size: usize) -> &mut Self {
237 assert!(size > 0);
238
239 self.size = size;
240 self
241 }
242
243 pub fn alpha(&mut self, alpha: f64) -> &mut Self {
249 self.alpha = alpha;
250 self
251 }
252
253 pub fn build(&self) -> ExponentialDecayHistogram {
255 ExponentialDecayHistogram {
256 values: BTreeMap::new(),
257 alpha: self.alpha,
258 size: self.size,
259 count: 0,
260 start_time: self.now,
261 next_scale_time: self.now + RESCALE_THRESHOLD,
263 rng: SmallRng::from_rng(&mut rand::rng()),
265 }
266 }
267}
268
269struct SnapshotEntry {
270 value: i64,
271 norm_weight: f64,
272 quantile: NotNan<f64>,
273}
274
275pub struct Snapshot {
277 entries: Vec<SnapshotEntry>,
278 count: u64,
279}
280
281impl Snapshot {
282 pub fn value(&self, quantile: f64) -> i64 {
292 assert!(quantile >= 0. && quantile <= 1.);
293
294 if self.entries.is_empty() {
295 return 0;
296 }
297
298 let quantile = NotNan::new(quantile).unwrap();
299 let idx = match self.entries.binary_search_by(|e| e.quantile.cmp(&quantile)) {
300 Ok(idx) => idx,
301 Err(idx) if idx >= self.entries.len() => self.entries.len() - 1,
302 Err(idx) => idx,
303 };
304
305 self.entries[idx].value
306 }
307
308 pub fn max(&self) -> i64 {
310 self.entries.last().map_or(0, |e| e.value)
311 }
312
313 pub fn min(&self) -> i64 {
315 self.entries.first().map_or(0, |e| e.value)
316 }
317
318 pub fn mean(&self) -> f64 {
320 self.entries
321 .iter()
322 .map(|e| e.value as f64 * e.norm_weight)
323 .sum::<f64>()
324 }
325
326 pub fn stddev(&self) -> f64 {
329 if self.entries.len() <= 1 {
330 return 0.;
331 }
332
333 let mean = self.mean();
334 let variance = self
335 .entries
336 .iter()
337 .map(|e| {
338 let diff = e.value as f64 - mean;
339 e.norm_weight * diff * diff
340 })
341 .sum::<f64>();
342
343 variance.sqrt()
344 }
345
346 pub fn count(&self) -> u64 {
349 self.count
350 }
351
352 pub fn values<'a>(&'a self) -> Values<'a> {
354 Values {
355 it: self.entries.iter().peekable(),
356 }
357 }
358}
359
360pub struct Values<'a> {
362 it: iter::Peekable<slice::Iter<'a, SnapshotEntry>>,
363}
364
365impl<'a> Iterator for Values<'a> {
366 type Item = (i64, f64);
367
368 fn next(&mut self) -> Option<(i64, f64)> {
369 let (value, mut weight) = match self.it.next() {
370 Some(v) => (v.value, v.norm_weight),
371 None => return None,
372 };
373
374 loop {
375 match self.it.peek() {
376 Some(v) if v.value == value => weight += v.norm_weight,
377 _ => break,
378 }
379 self.it.next();
380 }
381
382 Some((value, weight))
383 }
384}
385
386#[cfg(test)]
387mod test {
388 use super::*;
389 use std::ops::Range;
390
391 #[test]
392 fn a_histogram_of_100_out_of_1000_elements() {
393 let mut histogram = ExponentialDecayHistogram::builder()
394 .size(100)
395 .alpha(0.99)
396 .build();
397 for i in 0..1000 {
398 histogram.update(i);
399 }
400
401 assert_eq!(histogram.values.len(), 100);
402
403 let snapshot = histogram.snapshot();
404
405 assert_eq!(snapshot.entries.len(), 100);
406
407 assert_all_values_between(snapshot, 0..1000);
408 }
409
410 #[test]
411 fn a_histogram_of_100_out_of_10_elements() {
412 let mut histogram = ExponentialDecayHistogram::builder()
413 .size(100)
414 .alpha(0.99)
415 .build();
416 for i in 0..10 {
417 histogram.update(i);
418 }
419
420 let snapshot = histogram.snapshot();
421
422 assert_eq!(snapshot.entries.len(), 10);
423
424 assert_all_values_between(snapshot, 0..10);
425 }
426
427 #[test]
428 fn a_heavily_biased_histogram_of_100_out_of_1000_elements() {
429 let mut histogram = ExponentialDecayHistogram::builder()
430 .size(1000)
431 .alpha(0.01)
432 .build();
433 for i in 0..100 {
434 histogram.update(i);
435 }
436
437 assert_eq!(histogram.values.len(), 100);
438
439 let snapshot = histogram.snapshot();
440
441 assert_eq!(snapshot.entries.len(), 100);
442
443 assert_all_values_between(snapshot, 0..100);
444 }
445
446 #[test]
447 fn long_periods_of_inactivity_should_not_corrupt_sampling_state() {
448 let mut now = Instant::now();
449 let mut histogram = ExponentialDecayHistogram::builder()
450 .at(now)
451 .size(10)
452 .alpha(0.015)
453 .build();
454
455 let delta = Duration::from_millis(100);
457 for i in 0..1000 {
458 now += delta;
459 histogram.update_at(now, 1000 + i);
460 }
461
462 let snapshot = histogram.snapshot();
463 assert_eq!(snapshot.entries.len(), 10);
464 assert_all_values_between(snapshot, 1000..2000);
465
466 now += Duration::from_secs(15 * 60 * 60);
471 histogram.update_at(now, 2000);
472
473 let snapshot = histogram.snapshot();
474 assert_eq!(snapshot.entries.len(), 2);
475 assert_all_values_between(snapshot, 1000..3000);
476
477 for i in 0..1000 {
479 now += delta;
480 histogram.update_at(now, 3000 + i);
481 }
482 let snapshot = histogram.snapshot();
483 assert_eq!(snapshot.entries.len(), 10);
484 assert_all_values_between(snapshot, 3000..4000);
485 }
486
487 #[test]
488 fn spot_lift() {
489 let mut now = Instant::now();
490 let mut histogram = ExponentialDecayHistogram::builder()
491 .at(now)
492 .size(1000)
493 .alpha(0.015)
494 .build();
495
496 let values_per_minute = 10;
497 let values_interval = Duration::from_secs(60) / values_per_minute;
498 for _ in 0..120 * values_per_minute {
500 histogram.update_at(now, 177);
501 now += values_interval;
502 }
503
504 for _ in 0..10 * values_per_minute {
506 histogram.update_at(now, 9999);
507 now += values_interval;
508 }
509
510 assert_eq!(histogram.snapshot().value(0.5), 9999);
512 }
513
514 #[test]
515 fn spot_fall() {
516 let mut now = Instant::now();
517 let mut histogram = ExponentialDecayHistogram::builder()
518 .at(now)
519 .size(1000)
520 .alpha(0.015)
521 .build();
522
523 let values_per_minute = 10;
524 let values_interval = Duration::from_secs(60) / values_per_minute;
525 for _ in 0..120 * values_per_minute {
527 histogram.update_at(now, 9998);
528 now += values_interval;
529 }
530
531 for _ in 0..10 * values_per_minute {
533 histogram.update_at(now, 178);
534 now += values_interval;
535 }
536
537 assert_eq!(histogram.snapshot().value(0.5), 178);
539 }
540
541 #[test]
542 fn quantiles_should_be_based_on_weights() {
543 let mut now = Instant::now();
544 let mut histogram = ExponentialDecayHistogram::builder()
545 .at(now)
546 .size(1000)
547 .alpha(0.015)
548 .build();
549
550 for _ in 0..40 {
551 histogram.update_at(now, 177);
552 }
553
554 now += Duration::from_secs(120);
555
556 for _ in 0..10 {
557 histogram.update_at(now, 9999);
558 }
559
560 let snapshot = histogram.snapshot();
561 assert_eq!(snapshot.entries.len(), 50);
562
563 assert_eq!(snapshot.value(0.5), 9999);
567 assert_eq!(snapshot.value(0.75), 9999);
568 }
569
570 fn assert_all_values_between(snapshot: Snapshot, range: Range<i64>) {
571 for entry in &snapshot.entries {
572 assert!(
573 entry.value >= range.start && entry.value < range.end,
574 "snapshot value {} was not in {:?}",
575 entry.value,
576 range
577 );
578 }
579 }
580
581 #[test]
582 fn values() {
583 let mut histogram = ExponentialDecayHistogram::new();
584 let now = histogram.start_time;
585
586 histogram.update_at(now, 1);
587 histogram.update_at(now, 1);
588 histogram.update_at(now, 1);
589 histogram.update_at(now, 10);
590
591 let values = histogram.snapshot().values().collect::<Vec<_>>();
592 assert_eq!(values, vec![(1, 0.75), (10, 0.25)]);
593 }
594}