use std::{collections::BTreeMap, fmt::Debug, time::Duration};
use conv::ConvUtil;
use crate::{limiter::Outcome, limits::Sample};
pub trait Aggregator {
fn sample(&mut self, sample: Sample) -> Sample;
#[allow(missing_docs)]
fn sample_size(&self) -> usize;
#[allow(missing_docs)]
fn reset(&mut self);
}
#[derive(Debug)]
pub struct Average {
latency_sum: Duration,
in_flight_sum: u128,
overload: Outcome,
samples: usize,
}
pub struct Percentile {
percentile: f64,
overload: Outcome,
num_samples: usize,
samples: BTreeMap<Duration, Vec<Sample>>,
}
impl Aggregator for Average {
fn sample(&mut self, sample: Sample) -> Sample {
self.latency_sum += sample.latency;
self.in_flight_sum += sample.in_flight as u128;
self.overload = self.overload.overloaded_or(sample.outcome);
self.samples += 1;
Sample {
in_flight: (self.in_flight_sum / self.samples as u128) as usize,
latency: self.latency_sum.div_f64(self.samples as f64),
outcome: self.overload,
}
}
fn sample_size(&self) -> usize {
self.samples
}
fn reset(&mut self) {
*self = Self::default();
}
}
impl Default for Average {
fn default() -> Self {
Self {
latency_sum: Duration::ZERO,
in_flight_sum: 0,
overload: Outcome::Success,
samples: 0,
}
}
}
impl Percentile {
#[allow(missing_docs)]
pub fn new(percentile: f64) -> Self {
assert!(
percentile > 0. && percentile < 1.,
"percentiles must be between 0 and 1 exclusive"
);
Self {
percentile,
..Default::default()
}
}
fn percentile_sample(&self) -> Option<&Sample> {
let index = self.percentile_index();
index.and_then(|index| {
self.samples
.iter()
.flat_map(|(_, sample)| sample)
.nth(index)
})
}
fn percentile_index(&self) -> Option<usize> {
if self.num_samples == 0 {
return None;
}
let float_index = self.num_samples as f64 * self.percentile;
Some(
float_index
.ceil()
.approx_as::<usize>()
.expect("percentile should be < 1")
- 1,
)
}
}
impl Aggregator for Percentile {
fn sample(&mut self, sample: Sample) -> Sample {
self.overload = self.overload.overloaded_or(sample.outcome);
self.samples.entry(sample.latency).or_default().push(sample);
self.num_samples += 1;
let perc_sample = self
.percentile_sample()
.expect("Sample should exist at expected index");
Sample {
in_flight: perc_sample.in_flight,
latency: perc_sample.latency,
outcome: self.overload,
}
}
fn sample_size(&self) -> usize {
self.num_samples
}
fn reset(&mut self) {
*self = Self {
percentile: self.percentile,
..Default::default()
};
}
}
impl Default for Percentile {
fn default() -> Self {
Self {
percentile: 0.5,
samples: BTreeMap::new(),
num_samples: 0,
overload: Outcome::Success,
}
}
}
impl Debug for Percentile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Percentile")
.field("percentile", &self.percentile)
.field("overload", &self.overload)
.field("samples", &self.samples)
.field("(aggregated sample)", &self.percentile_sample())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn average() {
let mut aggregator = Average::default();
aggregator.sample(Sample {
in_flight: 1,
latency: Duration::from_millis(1),
outcome: Outcome::Success,
});
aggregator.sample(Sample {
in_flight: 5,
latency: Duration::from_millis(3),
outcome: Outcome::Overload,
});
let sample = aggregator.sample(Sample {
in_flight: 3,
latency: Duration::from_millis(5),
outcome: Outcome::Success,
});
assert_eq!(
sample,
Sample {
in_flight: 3,
latency: Duration::from_millis(3),
outcome: Outcome::Overload,
}
);
}
#[tokio::test]
async fn average_reset() {
let mut aggregator = Average::default();
aggregator.sample(Sample {
in_flight: 1,
latency: Duration::from_millis(1),
outcome: Outcome::Success,
});
aggregator.reset();
let sample = aggregator.sample(Sample {
in_flight: 3,
latency: Duration::from_millis(5),
outcome: Outcome::Success,
});
assert_eq!(
sample,
Sample {
in_flight: 3,
latency: Duration::from_millis(5),
outcome: Outcome::Success,
},
"should be equal to new sample after reset"
)
}
#[tokio::test]
async fn percentile_p01() {
let mut aggregator = Percentile::new(0.01);
aggregator.sample(Sample {
in_flight: 5,
latency: Duration::from_millis(3),
outcome: Outcome::Overload,
});
aggregator.sample(Sample {
in_flight: 1,
latency: Duration::from_millis(1),
outcome: Outcome::Success,
});
let sample = aggregator.sample(Sample {
in_flight: 3,
latency: Duration::from_millis(5),
outcome: Outcome::Success,
});
assert_eq!(
sample,
Sample {
in_flight: 1,
latency: Duration::from_millis(1),
outcome: Outcome::Overload,
}
);
}
#[tokio::test]
async fn percentile_p99() {
let mut aggregator = Percentile::new(0.99);
aggregator.sample(Sample {
in_flight: 5,
latency: Duration::from_millis(3),
outcome: Outcome::Overload,
});
aggregator.sample(Sample {
in_flight: 1,
latency: Duration::from_millis(1),
outcome: Outcome::Success,
});
let sample = aggregator.sample(Sample {
in_flight: 3,
latency: Duration::from_millis(5),
outcome: Outcome::Success,
});
assert_eq!(
sample,
Sample {
in_flight: 3,
latency: Duration::from_millis(5),
outcome: Outcome::Overload,
}
);
}
#[tokio::test]
async fn percentile_reset() {
let mut aggregator = Percentile::new(0.99);
aggregator.sample(Sample {
in_flight: 1,
latency: Duration::from_millis(1),
outcome: Outcome::Success,
});
aggregator.reset();
let sample = aggregator.sample(Sample {
in_flight: 3,
latency: Duration::from_millis(5),
outcome: Outcome::Success,
});
assert_eq!(
sample,
Sample {
in_flight: 3,
latency: Duration::from_millis(5),
outcome: Outcome::Success,
},
"should be equal to new sample after reset"
);
assert_eq!(
aggregator.percentile, 0.99,
"percentile shouldn't change after reset"
);
}
}