use std::num::NonZeroU32;
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use quanta::Instant;
use crate::common::Matcher;
use metrics_util::{Histogram, Quantile, Summary};
const DEFAULT_SUMMARY_BUCKET_COUNT: NonZeroU32 = match NonZeroU32::new(3) {
Some(v) => v,
None => [][0],
};
const DEFAULT_SUMMARY_BUCKET_DURATION: Duration = Duration::from_secs(20);
#[derive(Clone)]
pub enum Distribution {
Histogram(Histogram),
Summary(RollingSummary, Arc<Vec<Quantile>>, f64),
}
impl Distribution {
#[warn(clippy::missing_panics_doc)]
pub fn new_histogram(buckets: &[f64]) -> Distribution {
let hist = Histogram::new(buckets).expect("buckets should never be empty");
Distribution::Histogram(hist)
}
pub fn new_summary(
quantiles: Arc<Vec<Quantile>>,
bucket_duration: Duration,
bucket_count: NonZeroU32,
) -> Distribution {
Distribution::Summary(RollingSummary::new(bucket_count, bucket_duration), quantiles, 0.0)
}
pub fn record_samples(&mut self, samples: &[(f64, Instant)]) {
match self {
Distribution::Histogram(hist) => {
hist.record_many(samples.iter().map(|(sample, _ts)| sample));
}
Distribution::Summary(hist, _, sum) => {
for (sample, ts) in samples {
hist.add(*sample, *ts);
*sum += *sample;
}
}
}
}
}
#[derive(Debug)]
pub struct DistributionBuilder {
quantiles: Arc<Vec<Quantile>>,
buckets: Option<Vec<f64>>,
bucket_duration: Option<Duration>,
bucket_count: Option<NonZeroU32>,
bucket_overrides: Option<Vec<(Matcher, Vec<f64>)>>,
}
impl DistributionBuilder {
pub fn new(
quantiles: Vec<Quantile>,
bucket_duration: Option<Duration>,
buckets: Option<Vec<f64>>,
bucket_count: Option<NonZeroU32>,
bucket_overrides: Option<HashMap<Matcher, Vec<f64>>>,
) -> DistributionBuilder {
DistributionBuilder {
quantiles: Arc::new(quantiles),
bucket_duration,
buckets,
bucket_count,
bucket_overrides: bucket_overrides.map(|entries| {
let mut matchers = entries.into_iter().collect::<Vec<_>>();
matchers.sort_by(|a, b| a.0.cmp(&b.0));
matchers
}),
}
}
pub fn get_distribution(&self, name: &str) -> Distribution {
if let Some(ref overrides) = self.bucket_overrides {
for (matcher, buckets) in overrides {
if matcher.matches(name) {
return Distribution::new_histogram(buckets);
}
}
}
if let Some(ref buckets) = self.buckets {
return Distribution::new_histogram(buckets);
}
let b_duration = self.bucket_duration.map_or(DEFAULT_SUMMARY_BUCKET_DURATION, |d| d);
let b_count = self.bucket_count.map_or(DEFAULT_SUMMARY_BUCKET_COUNT, |c| c);
Distribution::new_summary(self.quantiles.clone(), b_duration, b_count)
}
pub fn get_distribution_type(&self, name: &str) -> &str {
if self.buckets.is_some() {
return "histogram";
}
if let Some(ref overrides) = self.bucket_overrides {
for (matcher, _) in overrides {
if matcher.matches(name) {
return "histogram";
}
}
}
"summary"
}
}
#[derive(Clone)]
struct Bucket {
begin: Instant,
summary: Summary,
}
#[derive(Clone)]
pub struct RollingSummary {
buckets: Vec<Bucket>,
max_buckets: usize,
bucket_duration: Duration,
max_bucket_duration: Duration,
count: usize,
}
impl Default for RollingSummary {
fn default() -> Self {
RollingSummary::new(DEFAULT_SUMMARY_BUCKET_COUNT, DEFAULT_SUMMARY_BUCKET_DURATION)
}
}
impl RollingSummary {
pub fn new(buckets: std::num::NonZeroU32, bucket_duration: Duration) -> RollingSummary {
assert!(!bucket_duration.is_zero());
let max_bucket_duration = bucket_duration * buckets.get();
let max_buckets = buckets.get() as usize;
RollingSummary {
buckets: Vec::with_capacity(max_buckets),
max_buckets,
bucket_duration,
max_bucket_duration,
count: 0,
}
}
pub fn add(&mut self, value: f64, now: Instant) {
self.count += 1;
for bucket in &mut self.buckets {
let end = bucket.begin + self.bucket_duration;
if now > bucket.begin + self.bucket_duration {
break;
}
if now >= bucket.begin && now < end {
bucket.summary.add(value);
return;
}
}
if let Some(cutoff) = now.checked_sub(self.max_bucket_duration) {
self.buckets.retain(|b| b.begin > cutoff);
}
if self.buckets.is_empty() {
let mut summary = Summary::with_defaults();
summary.add(value);
self.buckets.push(Bucket { begin: now, summary });
return;
}
let reftime = self.buckets[0].begin;
let mut summary = Summary::with_defaults();
summary.add(value);
let mut begin;
if now > reftime {
begin = reftime + self.bucket_duration;
let mut end = begin + self.bucket_duration;
while now < begin || now >= end {
begin += self.bucket_duration;
end += self.bucket_duration;
}
self.buckets.truncate(self.max_buckets - 1);
self.buckets.insert(0, Bucket { begin, summary });
}
}
pub fn snapshot(&self, now: Instant) -> Summary {
let cutoff = now.checked_sub(self.max_bucket_duration);
let mut acc = Summary::with_defaults();
self.buckets
.iter()
.filter(|b| if let Some(cutoff) = cutoff { b.begin > cutoff } else { true })
.map(|b| &b.summary)
.fold(&mut acc, |acc, item| {
acc.merge(item).expect("merge can only fail if summary config inconsistent");
acc
});
acc
}
pub fn is_empty(&self) -> bool {
self.count() == 0
}
pub fn count(&self) -> usize {
self.count
}
#[cfg(test)]
fn buckets(&self) -> &Vec<Bucket> {
&self.buckets
}
}
#[cfg(test)]
mod tests {
use super::*;
use quanta::Clock;
#[test]
fn new_rolling_summary() {
let summary = RollingSummary::default();
assert_eq!(0, summary.buckets().len());
assert_eq!(0, summary.count());
assert!(summary.is_empty());
}
#[test]
fn empty_snapshot() {
let (clock, _mock) = Clock::mock();
let summary = RollingSummary::default();
let snapshot = summary.snapshot(clock.now());
assert_eq!(0, snapshot.count());
assert_eq!(f64::INFINITY, snapshot.min());
assert_eq!(f64::NEG_INFINITY, snapshot.max());
assert_eq!(None, snapshot.quantile(0.5));
}
#[test]
fn snapshot() {
let (clock, mock) = Clock::mock();
mock.increment(Duration::from_secs(3600));
let mut summary = RollingSummary::default();
summary.add(42.0, clock.now());
mock.increment(Duration::from_secs(20));
summary.add(42.0, clock.now());
mock.increment(Duration::from_secs(20));
summary.add(42.0, clock.now());
let snapshot = summary.snapshot(clock.now());
assert_eq!(42.0, snapshot.min());
assert_eq!(42.0, snapshot.max());
assert!(Some(41.9958) < snapshot.quantile(0.5));
assert!(Some(42.0042) > snapshot.quantile(0.5));
}
#[test]
fn add_first_value() {
let (clock, mock) = Clock::mock();
mock.increment(Duration::from_secs(3600));
let mut summary = RollingSummary::default();
summary.add(42.0, clock.now());
assert_eq!(1, summary.buckets().len());
assert_eq!(1, summary.count());
assert!(!summary.is_empty());
}
#[test]
fn add_new_head() {
let (clock, mock) = Clock::mock();
mock.increment(Duration::from_secs(3600));
let mut summary = RollingSummary::default();
summary.add(42.0, clock.now());
mock.increment(Duration::from_secs(20));
summary.add(42.0, clock.now());
assert_eq!(2, summary.buckets().len());
}
#[test]
fn truncate_old_buckets() {
let (clock, mock) = Clock::mock();
mock.increment(Duration::from_secs(3600));
let mut summary = RollingSummary::default();
summary.add(42.0, clock.now());
for _ in 0..3 {
mock.increment(Duration::from_secs(20));
summary.add(42.0, clock.now());
}
assert_eq!(3, summary.buckets().len());
}
#[test]
fn add_value_ts_before_first_bucket() {
let (clock, mock) = Clock::mock();
mock.increment(Duration::from_secs(4));
let bucket_count = NonZeroU32::new(2).unwrap();
let bucket_width = Duration::from_secs(5);
let mut summary = RollingSummary::new(bucket_count, bucket_width);
assert_eq!(0, summary.buckets().len());
assert_eq!(0, summary.count());
summary.add(42.0, clock.now());
assert_eq!(1, summary.buckets().len());
assert_eq!(1, summary.count());
assert!(!summary.is_empty());
mock.decrement(Duration::from_secs(1));
summary.add(43.0, clock.now());
assert_eq!(1, summary.buckets().len());
assert_eq!(2, summary.count());
assert!(!summary.is_empty());
}
}