#![doc(html_root_url = "https://docs.rs/exponential-decay-histogram/0.1")]
#![warn(missing_docs)]
extern crate ordered_float;
extern crate rand;
use ordered_float::NotNan;
use rand::distributions::Open01;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use std::collections::BTreeMap;
use std::iter;
use std::slice;
use std::time::{Duration, Instant};
const DEFAULT_SIZE: usize = 1028;
const DEFAULT_ALPHA: f64 = 0.015;
const RESCALE_THRESHOLD: Duration = Duration::from_secs(60 * 60);
struct WeightedSample {
value: i64,
weight: f64,
}
pub struct ExponentialDecayHistogram {
values: BTreeMap<NotNan<f64>, WeightedSample>,
alpha: f64,
size: usize,
count: u64,
start_time: Instant,
next_scale_time: Instant,
rng: SmallRng,
}
impl Default for ExponentialDecayHistogram {
fn default() -> ExponentialDecayHistogram {
ExponentialDecayHistogram::new()
}
}
impl ExponentialDecayHistogram {
pub fn new() -> ExponentialDecayHistogram {
ExponentialDecayHistogram::with_size_and_alpha(DEFAULT_SIZE, DEFAULT_ALPHA)
}
pub fn with_size_and_alpha(size: usize, alpha: f64) -> ExponentialDecayHistogram {
assert!(size > 0);
let now = Instant::now();
ExponentialDecayHistogram {
values: BTreeMap::new(),
alpha,
size,
count: 0,
start_time: now,
next_scale_time: now + RESCALE_THRESHOLD,
rng: SmallRng::from_rng(rand::thread_rng()).expect("error seeding RNG"),
}
}
pub fn update(&mut self, value: i64) {
self.update_at(Instant::now(), value);
}
pub fn update_at(&mut self, time: Instant, value: i64) {
self.rescale_if_needed(time);
self.count += 1;
let item_weight = self.weight(time);
let sample = WeightedSample {
value,
weight: item_weight,
};
let priority = item_weight / self.rng.sample::<f64, _>(&Open01);
let priority = NotNan::from(priority);
if self.values.len() < self.size {
self.values.insert(priority, sample);
} else {
let first = *self.values.keys().next().unwrap();
if first < priority && self.values.insert(priority, sample).is_none() {
self.values.remove(&first).unwrap();
}
}
}
pub fn snapshot(&self) -> Snapshot {
let mut entries = self
.values
.values()
.map(|s| SnapshotEntry {
value: s.value,
norm_weight: s.weight,
quantile: NotNan::from(0.),
})
.collect::<Vec<_>>();
entries.sort_by_key(|e| e.value);
let sum_weight = entries.iter().map(|e| e.norm_weight).sum::<f64>();
for entry in &mut entries {
entry.norm_weight /= sum_weight;
}
entries.iter_mut().fold(NotNan::from(0.), |acc, e| {
e.quantile = acc;
acc + e.norm_weight
});
Snapshot {
entries,
count: self.count,
}
}
fn weight(&self, time: Instant) -> f64 {
(self.alpha * (time - self.start_time).as_secs() as f64).exp()
}
fn rescale_if_needed(&mut self, now: Instant) {
if now >= self.next_scale_time {
self.rescale(now);
}
}
fn rescale(&mut self, now: Instant) {
self.next_scale_time = now + RESCALE_THRESHOLD;
let old_start_time = self.start_time;
self.start_time = now;
let scaling_factor = (-self.alpha * (now - old_start_time).as_secs() as f64).exp();
self.values = self
.values
.iter()
.map(|(&k, v)| {
(
k * scaling_factor,
WeightedSample {
value: v.value,
weight: v.weight * scaling_factor,
},
)
})
.collect();
}
}
struct SnapshotEntry {
value: i64,
norm_weight: f64,
quantile: NotNan<f64>,
}
pub struct Snapshot {
entries: Vec<SnapshotEntry>,
count: u64,
}
impl Snapshot {
pub fn value(&self, quantile: f64) -> i64 {
assert!(quantile >= 0. && quantile <= 1.);
if self.entries.is_empty() {
return 0;
}
let quantile = NotNan::from(quantile);
let idx = match self.entries.binary_search_by(|e| e.quantile.cmp(&quantile)) {
Ok(idx) => idx,
Err(idx) if idx >= self.entries.len() => self.entries.len() - 1,
Err(idx) => idx,
};
self.entries[idx].value
}
pub fn max(&self) -> i64 {
self.entries.last().map_or(0, |e| e.value)
}
pub fn min(&self) -> i64 {
self.entries.first().map_or(0, |e| e.value)
}
pub fn mean(&self) -> f64 {
self.entries
.iter()
.map(|e| e.value as f64 * e.norm_weight)
.sum::<f64>()
}
pub fn stddev(&self) -> f64 {
if self.entries.len() <= 1 {
return 0.;
}
let mean = self.mean();
let variance = self
.entries
.iter()
.map(|e| {
let diff = e.value as f64 - mean;
e.norm_weight * diff * diff
})
.sum::<f64>();
variance.sqrt()
}
pub fn count(&self) -> u64 {
self.count
}
pub fn values<'a>(&'a self) -> Values<'a> {
Values {
it: self.entries.iter().peekable(),
}
}
}
pub struct Values<'a> {
it: iter::Peekable<slice::Iter<'a, SnapshotEntry>>,
}
impl<'a> Iterator for Values<'a> {
type Item = (i64, f64);
fn next(&mut self) -> Option<(i64, f64)> {
let (value, mut weight) = match self.it.next() {
Some(v) => (v.value, v.norm_weight),
None => return None
};
loop {
match self.it.peek() {
Some(v) if v.value == value => weight += v.norm_weight,
_ => break,
}
self.it.next();
}
Some((value, weight))
}
}
#[cfg(test)]
mod test {
use std::ops::Range;
use super::*;
#[test]
fn a_histogram_of_100_out_of_1000_elements() {
let mut histogram = ExponentialDecayHistogram::with_size_and_alpha(100, 0.99);
for i in 0..1000 {
histogram.update(i);
}
assert_eq!(histogram.values.len(), 100);
let snapshot = histogram.snapshot();
assert_eq!(snapshot.entries.len(), 100);
assert_all_values_between(snapshot, 0..1000);
}
#[test]
fn a_histogram_of_100_out_of_10_elements() {
let mut histogram = ExponentialDecayHistogram::with_size_and_alpha(100, 0.99);
for i in 0..10 {
histogram.update(i);
}
let snapshot = histogram.snapshot();
assert_eq!(snapshot.entries.len(), 10);
assert_all_values_between(snapshot, 0..10);
}
#[test]
fn a_heavily_biased_histogram_of_100_out_of_1000_elements() {
let mut histogram = ExponentialDecayHistogram::with_size_and_alpha(1000, 0.01);
for i in 0..100 {
histogram.update(i);
}
assert_eq!(histogram.values.len(), 100);
let snapshot = histogram.snapshot();
assert_eq!(snapshot.entries.len(), 100);
assert_all_values_between(snapshot, 0..100);
}
#[test]
fn long_periods_of_inactivity_should_not_corrupt_sampling_state() {
let mut histogram = ExponentialDecayHistogram::with_size_and_alpha(10, 0.015);
let mut now = histogram.start_time;
let delta = Duration::from_millis(100);
for i in 0..1000 {
now += delta;
histogram.update_at(now, 1000 + i);
}
let snapshot = histogram.snapshot();
assert_eq!(snapshot.entries.len(), 10);
assert_all_values_between(snapshot, 1000..2000);
now += Duration::from_secs(15 * 60 * 60);
histogram.update_at(now, 2000);
let snapshot = histogram.snapshot();
assert_eq!(snapshot.entries.len(), 2);
assert_all_values_between(snapshot, 1000..3000);
for i in 0..1000 {
now += delta;
histogram.update_at(now, 3000 + i);
}
let snapshot = histogram.snapshot();
assert_eq!(snapshot.entries.len(), 10);
assert_all_values_between(snapshot, 3000..4000);
}
#[test]
fn spot_lift() {
let mut histogram = ExponentialDecayHistogram::with_size_and_alpha(1000, 0.015);
let mut now = histogram.start_time;
let values_per_minute = 10;
let values_interval = Duration::from_secs(60) / values_per_minute;
for _ in 0..120 * values_per_minute {
histogram.update_at(now, 177);
now += values_interval;
}
for _ in 0..10 * values_per_minute {
histogram.update_at(now, 9999);
now += values_interval;
}
assert_eq!(histogram.snapshot().value(0.5), 9999);
}
#[test]
fn spot_fall() {
let mut histogram = ExponentialDecayHistogram::with_size_and_alpha(1000, 0.015);
let mut now = histogram.start_time;
let values_per_minute = 10;
let values_interval = Duration::from_secs(60) / values_per_minute;
for _ in 0..120 * values_per_minute {
histogram.update_at(now, 9998);
now += values_interval;
}
for _ in 0..10 * values_per_minute {
histogram.update_at(now, 178);
now += values_interval;
}
assert_eq!(histogram.snapshot().value(0.5), 178);
}
#[test]
fn quantiles_should_be_based_on_weights() {
let mut histogram = ExponentialDecayHistogram::with_size_and_alpha(1000, 0.015);
let mut now = histogram.start_time;
for _ in 0..40 {
histogram.update_at(now, 177);
}
now += Duration::from_secs(120);
for _ in 0..10 {
histogram.update_at(now, 9999);
}
let snapshot = histogram.snapshot();
assert_eq!(snapshot.entries.len(), 50);
assert_eq!(snapshot.value(0.5), 9999);
assert_eq!(snapshot.value(0.75), 9999);
}
fn assert_all_values_between(snapshot: Snapshot, range: Range<i64>) {
for entry in &snapshot.entries {
assert!(
entry.value >= range.start && entry.value < range.end,
"snapshot value {} was not in {:?}",
entry.value,
range
);
}
}
#[test]
fn values() {
let mut histogram = ExponentialDecayHistogram::new();
let now = histogram.start_time;
histogram.update_at(now, 1);
histogram.update_at(now, 1);
histogram.update_at(now, 1);
histogram.update_at(now, 10);
let values = histogram.snapshot().values().collect::<Vec<_>>();
assert_eq!(values, vec![(1, 0.75), (10, 0.25)]);
}
}