use crate::Clock;
use parking_lot::Mutex;
use std::convert::TryFrom;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
const INTERVAL_SECS: u64 = 5;
const SECONDS_PER_MINUTE: f64 = 60.;
struct State {
count: i64,
rate_1m: Ewma,
rate_5m: Ewma,
rate_15m: Ewma,
}
pub struct Meter {
uncounted: AtomicI64,
last_tick: AtomicU64,
start_time: Instant,
clock: Arc<dyn Clock>,
state: Mutex<State>,
}
impl Default for Meter {
fn default() -> Meter {
Meter::new()
}
}
impl Meter {
pub fn new() -> Meter {
Meter::new_with(crate::SYSTEM_CLOCK.clone())
}
pub fn new_with(clock: Arc<dyn Clock>) -> Meter {
Meter {
uncounted: AtomicI64::new(0),
last_tick: AtomicU64::new(0),
start_time: clock.now(),
clock,
state: Mutex::new(State {
count: 0,
rate_1m: Ewma::new(1.),
rate_5m: Ewma::new(5.),
rate_15m: Ewma::new(15.),
}),
}
}
pub fn mark(&self, n: i64) {
self.tick_if_necessary();
self.uncounted.fetch_add(n, Ordering::SeqCst);
}
pub fn count(&self) -> i64 {
self.state.lock().count + self.uncounted.load(Ordering::SeqCst)
}
pub fn one_minute_rate(&self) -> f64 {
self.tick_if_necessary();
self.state.lock().rate_1m.get()
}
pub fn five_minute_rate(&self) -> f64 {
self.tick_if_necessary();
self.state.lock().rate_5m.get()
}
pub fn fifteen_minute_rate(&self) -> f64 {
self.tick_if_necessary();
self.state.lock().rate_15m.get()
}
pub fn mean_rate(&self) -> f64 {
let count = self.count() as f64;
if count == 0. {
0.
} else {
let time = (self.clock.now() - self.start_time).as_secs_f64();
count / time
}
}
fn tick_if_necessary(&self) {
let time = self.clock.now();
let old_tick = self.last_tick.load(Ordering::SeqCst);
let new_tick = (time - self.start_time).as_secs();
let age = new_tick - old_tick;
if age < INTERVAL_SECS {
return;
}
let new_interval_start_tick = new_tick - age % INTERVAL_SECS;
if self
.last_tick
.compare_exchange(
old_tick,
new_interval_start_tick,
Ordering::SeqCst,
Ordering::SeqCst,
)
.is_err()
{
return;
}
let required_ticks = age / INTERVAL_SECS;
let mut state = self.state.lock();
let uncounted = self.uncounted.swap(0, Ordering::SeqCst);
state.count += uncounted;
state.rate_1m.tick(uncounted);
state.rate_1m.decay(required_ticks - 1);
state.rate_5m.tick(uncounted);
state.rate_5m.decay(required_ticks - 1);
state.rate_15m.tick(uncounted);
state.rate_15m.decay(required_ticks - 1);
}
}
struct Ewma {
rate: f64,
alpha: f64,
initialized: bool,
}
impl Ewma {
fn new(minutes: f64) -> Ewma {
Ewma {
rate: 0.,
alpha: 1. - (-(INTERVAL_SECS as f64) / SECONDS_PER_MINUTE / minutes).exp(),
initialized: false,
}
}
fn tick(&mut self, count: i64) {
let instant_rate = count as f64 / INTERVAL_SECS as f64;
if self.initialized {
self.rate += self.alpha * (instant_rate - self.rate);
} else {
self.rate = instant_rate;
self.initialized = true;
}
}
fn decay(&mut self, ticks: u64) {
match i32::try_from(ticks) {
Ok(ticks) => self.rate *= (1. - self.alpha).powi(ticks),
Err(_) => self.rate = 0.,
}
}
fn get(&self) -> f64 {
self.rate
}
}
#[cfg(test)]
mod test {
use crate::clock::test::TestClock;
use crate::Meter;
use assert_approx_eq::assert_approx_eq;
use std::sync::Arc;
use std::time::Duration;
#[test]
#[allow(clippy::float_cmp)]
fn starts_out_with_no_rates_or_count() {
let clock = Arc::new(TestClock::new());
let meter = Meter::new_with(clock);
assert_eq!(meter.count(), 0);
assert_eq!(meter.one_minute_rate(), 0.);
assert_eq!(meter.five_minute_rate(), 0.);
assert_eq!(meter.fifteen_minute_rate(), 0.);
assert_eq!(meter.mean_rate(), 0.)
}
#[test]
fn marks_events_and_updates_rate_and_count() {
let clock = Arc::new(TestClock::new());
let meter = Meter::new_with(clock.clone());
meter.mark(1);
clock.advance(Duration::from_secs(10));
meter.mark(2);
assert_approx_eq!(meter.mean_rate(), 0.3, 0.001);
assert_approx_eq!(meter.one_minute_rate(), 0.1840, 0.001);
assert_approx_eq!(meter.five_minute_rate(), 0.1966, 0.001);
assert_approx_eq!(meter.fifteen_minute_rate(), 0.1988, 0.001);
}
}