use crate::estimator::Estimator;
use std::hash::Hash;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub struct RateComponents {
pub prev_samples: isize,
pub curr_samples: isize,
pub interval: Duration,
pub current_interval_fraction: f64,
}
pub struct Rate {
red_slot: Estimator,
blue_slot: Estimator,
red_or_blue: AtomicBool, start: Instant,
reset_interval_ms: u64, last_reset_time: AtomicU64, interval: Duration,
}
const HASHES: usize = 4;
const SLOTS: usize = 1024;
impl Rate {
pub fn new(interval: std::time::Duration) -> Self {
Rate::new_with_estimator_config(interval, HASHES, SLOTS)
}
#[inline]
pub fn new_with_estimator_config(
interval: std::time::Duration,
hashes: usize,
slots: usize,
) -> Self {
Rate {
red_slot: Estimator::new(hashes, slots),
blue_slot: Estimator::new(hashes, slots),
red_or_blue: AtomicBool::new(true),
start: Instant::now(),
reset_interval_ms: interval.as_millis() as u64, last_reset_time: AtomicU64::new(0),
interval,
}
}
fn current(&self, red_or_blue: bool) -> &Estimator {
if red_or_blue {
&self.red_slot
} else {
&self.blue_slot
}
}
fn previous(&self, red_or_blue: bool) -> &Estimator {
if red_or_blue {
&self.blue_slot
} else {
&self.red_slot
}
}
fn red_or_blue(&self) -> bool {
self.red_or_blue.load(Ordering::SeqCst)
}
pub fn rate<T: Hash>(&self, key: &T) -> f64 {
let past_ms = self.maybe_reset();
if past_ms >= self.reset_interval_ms * 2 {
return 0f64;
}
self.previous(self.red_or_blue()).get(key) as f64 / self.reset_interval_ms as f64 * 1000.0
}
pub fn observe<T: Hash>(&self, key: &T, events: isize) -> isize {
self.maybe_reset();
self.current(self.red_or_blue()).incr(key, events)
}
fn maybe_reset(&self) -> u64 {
let now = Instant::now().duration_since(self.start).as_millis() as u64;
let last_reset = self.last_reset_time.load(Ordering::SeqCst);
let past_ms = now - last_reset;
if past_ms < self.reset_interval_ms {
return past_ms;
}
let red_or_blue = self.red_or_blue();
match self.last_reset_time.compare_exchange(
last_reset,
now,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => {
self.previous(red_or_blue).reset();
self.red_or_blue.store(!red_or_blue, Ordering::SeqCst);
if now - last_reset >= self.reset_interval_ms * 2 {
self.current(red_or_blue).reset();
}
}
Err(new) => {
assert!(new >= now - 1000); }
}
past_ms
}
pub fn rate_with<F, T, K>(&self, key: &K, mut rate_calc_fn: F) -> T
where
F: FnMut(RateComponents) -> T,
K: Hash,
{
let past_ms = self.maybe_reset();
let (prev_samples, curr_samples) = if past_ms >= self.reset_interval_ms * 2 {
(0, 0)
} else if past_ms >= self.reset_interval_ms {
(self.previous(self.red_or_blue()).get(key), 0)
} else {
let (prev_est, curr_est) = if self.red_or_blue() {
(&self.blue_slot, &self.red_slot)
} else {
(&self.red_slot, &self.blue_slot)
};
(prev_est.get(key), curr_est.get(key))
};
rate_calc_fn(RateComponents {
interval: self.interval,
prev_samples,
curr_samples,
current_interval_fraction: (past_ms % self.reset_interval_ms) as f64
/ self.reset_interval_ms as f64,
})
}
}
#[cfg(test)]
mod tests {
use float_cmp::assert_approx_eq;
use super::*;
use std::thread::sleep;
use std::time::Duration;
#[test]
fn test_observe_rate() {
let r = Rate::new(Duration::from_secs(1));
let key = 1;
let observed = r.observe(&key, 3);
assert_eq!(observed, 3);
let observed = r.observe(&key, 2);
assert_eq!(observed, 5);
assert_eq!(r.rate(&key), 0f64);
sleep(Duration::from_secs(1));
let observed = r.observe(&key, 4);
assert_eq!(observed, 4);
assert_eq!(r.rate(&key), 5f64);
sleep(Duration::from_secs(1));
assert_eq!(r.rate(&key), 4f64);
sleep(Duration::from_secs(1));
assert_eq!(r.rate(&key), 0f64); }
fn assert_eq_ish(left: f64, right: f64) {
assert_approx_eq!(f64, left, right, epsilon = 0.15)
}
#[test]
fn test_observe_rate_custom_90_10() {
let r = Rate::new(Duration::from_secs(1));
let key = 1;
let rate_90_10_fn = |rate_info: RateComponents| {
let prev = rate_info.prev_samples as f64;
let curr = rate_info.curr_samples as f64;
(prev * 0.1 + curr * 0.9) / rate_info.interval.as_secs_f64()
};
let observed = r.observe(&key, 3);
assert_eq!(observed, 3);
let observed = r.observe(&key, 2);
assert_eq!(observed, 5);
assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.9);
sleep(Duration::from_secs(1));
let observed = r.observe(&key, 4);
assert_eq!(observed, 4);
assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.1 + 4. * 0.9);
sleep(Duration::from_secs(1));
assert_eq!(r.rate_with(&key, rate_90_10_fn), 4. * 0.1);
sleep(Duration::from_secs(1));
assert_eq!(r.rate_with(&key, rate_90_10_fn), 0f64);
}
#[test]
fn test_observe_rate_custom_proportional() {
let r = Rate::new(Duration::from_secs(1));
let key = 1;
let rate_prop_fn = |rate_info: RateComponents| {
let prev = rate_info.prev_samples as f64;
let curr = rate_info.curr_samples as f64;
let interval_secs = rate_info.interval.as_secs_f64();
let interval_fraction = rate_info.current_interval_fraction;
let weighted_count = prev * (1. - interval_fraction) + curr * interval_fraction;
weighted_count / interval_secs
};
let observed = r.observe(&key, 3);
assert_eq!(observed, 3);
let observed = r.observe(&key, 2);
assert_eq!(observed, 5);
assert_eq_ish(r.rate_with(&key, rate_prop_fn), 0.);
sleep(Duration::from_secs_f64(0.5));
assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.5);
sleep(Duration::from_secs_f64(0.5));
let observed = r.observe(&key, 4);
assert_eq!(observed, 4);
assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5.);
sleep(Duration::from_secs_f64(0.75));
assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.25 + 4. * 0.75);
sleep(Duration::from_secs_f64(0.25));
assert_eq_ish(r.rate_with(&key, rate_prop_fn), 4.);
sleep(Duration::from_secs(1));
assert_eq!(r.rate_with(&key, rate_prop_fn), 0f64);
}
}