use std::ops::Range;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DistributionError {
EmptyLayout,
InvalidBucketRange,
NonIncreasingBounds,
IncompatibleLayout,
InvalidPercentile,
BucketOutOfBounds,
ResetOrOutOfOrderPoint,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Bucket {
pub range: Range<f64>,
pub count: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct BucketLayout {
pub ranges: Vec<Range<f64>>,
}
impl BucketLayout {
pub fn new(ranges: Vec<Range<f64>>) -> Result<Self, DistributionError> {
if ranges.is_empty() {
return Err(DistributionError::EmptyLayout);
}
let mut previous_end = None;
for range in &ranges {
if !range.start.is_finite() || !range.end.is_finite() || range.start >= range.end {
return Err(DistributionError::InvalidBucketRange);
}
if previous_end.is_some_and(|end| range.start < end) {
return Err(DistributionError::NonIncreasingBounds);
}
previous_end = Some(range.end);
}
Ok(Self { ranges })
}
pub fn from_bounds(bounds: &[f64]) -> Result<Self, DistributionError> {
if bounds.len() < 2 {
return Err(DistributionError::EmptyLayout);
}
let ranges = bounds
.windows(2)
.map(|window| window[0]..window[1])
.collect();
Self::new(ranges)
}
pub fn fixed_width(start: f64, width: f64, buckets: usize) -> Result<Self, DistributionError> {
if buckets == 0 {
return Err(DistributionError::EmptyLayout);
}
let ranges = (0..buckets)
.map(|index| {
let lower = start + width * index as f64;
lower..(lower + width)
})
.collect();
Self::new(ranges)
}
pub fn empty_distribution<T: Clone>(&self) -> Distribution<T> {
Distribution::new(
self.ranges
.iter()
.cloned()
.map(|range| Bucket { range, count: 0 })
.collect(),
)
}
pub fn bucket_for(&self, value: f64) -> Option<usize> {
self.ranges
.iter()
.position(|range| range.start <= value && value < range.end)
.or_else(|| {
self.ranges
.last()
.is_some_and(|range| value == range.end)
.then_some(self.ranges.len() - 1)
})
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Exemplar<T> {
pub value: f64,
pub payload: T,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Distribution<T = ()> {
buckets: Vec<Bucket>,
exemplars: Vec<Option<Exemplar<T>>>,
}
impl<T: Clone> Distribution<T> {
pub fn new(buckets: Vec<Bucket>) -> Self {
let exemplars = vec![None; buckets.len()];
Self { buckets, exemplars }
}
pub fn from_layout(layout: &BucketLayout) -> Self {
layout.empty_distribution()
}
pub fn buckets(&self) -> &[Bucket] {
&self.buckets
}
pub fn total_count(&self) -> u64 {
self.buckets.iter().map(|bucket| bucket.count).sum()
}
pub fn record(&mut self, value: f64, count: u64) -> Result<(), DistributionError> {
let Some(bucket) = self
.buckets
.iter()
.position(|bucket| bucket.range.start <= value && value < bucket.range.end)
.or_else(|| {
self.buckets
.last()
.is_some_and(|bucket| value == bucket.range.end)
.then_some(self.buckets.len() - 1)
})
else {
return Err(DistributionError::BucketOutOfBounds);
};
self.buckets[bucket].count += count;
Ok(())
}
pub fn set_exemplar(
&mut self,
bucket: usize,
exemplar: Exemplar<T>,
) -> Result<(), DistributionError> {
let Some(slot) = self.exemplars.get_mut(bucket) else {
return Err(DistributionError::BucketOutOfBounds);
};
*slot = Some(exemplar);
Ok(())
}
pub fn exemplar(&self, bucket: usize) -> Option<&Exemplar<T>> {
self.exemplars.get(bucket).and_then(Option::as_ref)
}
pub fn merge(&mut self, other: &Self) -> Result<(), DistributionError> {
self.ensure_compatible(other)?;
for (left, right) in self.buckets.iter_mut().zip(&other.buckets) {
left.count += right.count;
}
for (left, right) in self.exemplars.iter_mut().zip(&other.exemplars) {
if left.is_none() {
*left = right.clone();
}
}
Ok(())
}
pub fn try_merge(&mut self, other: &Self) -> Result<(), DistributionError> {
self.merge(other)
}
pub fn percentile(&self, percentile: f64) -> Result<Option<f64>, DistributionError> {
if !(0.0..=100.0).contains(&percentile) || !percentile.is_finite() {
return Err(DistributionError::InvalidPercentile);
}
let total = self.total_count();
if total == 0 {
return Ok(None);
}
let rank = ((percentile / 100.0) * total as f64).ceil().max(1.0) as u64;
let mut seen_before = 0;
for bucket in &self.buckets {
let seen_after = seen_before + bucket.count;
if seen_after >= rank {
if bucket.count == 0 {
return Ok(Some(bucket.range.start));
}
let offset = rank.saturating_sub(seen_before).saturating_sub(1) as f64;
let fraction = offset / bucket.count as f64;
let value = bucket.range.start + (bucket.range.end - bucket.range.start) * fraction;
return Ok(Some(value));
}
seen_before = seen_after;
}
Ok(self.buckets.last().map(|bucket| bucket.range.end))
}
pub fn delta(&self, previous: &Self) -> Result<Self, DistributionError> {
self.ensure_compatible(previous)?;
let buckets = self
.buckets
.iter()
.zip(&previous.buckets)
.map(|(current, prior)| Bucket {
range: current.range.clone(),
count: current.count.saturating_sub(prior.count),
})
.collect();
Ok(Self::new(buckets))
}
fn ensure_compatible(&self, other: &Self) -> Result<(), DistributionError> {
if self.buckets.len() != other.buckets.len() {
return Err(DistributionError::IncompatibleLayout);
}
if self
.buckets
.iter()
.zip(&other.buckets)
.any(|(left, right)| left.range != right.range)
{
return Err(DistributionError::IncompatibleLayout);
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CumulativePoint<T> {
pub start: u64,
pub timestamp: u64,
pub value: T,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DeltaWindow<T> {
pub start: u64,
pub end: u64,
pub value: T,
}
impl<T: Clone> CumulativePoint<Distribution<T>> {
pub fn delta_since(
&self,
previous: &Self,
) -> Result<DeltaWindow<Distribution<T>>, DistributionError> {
if self.start != previous.start || self.timestamp <= previous.timestamp {
return Err(DistributionError::ResetOrOutOfOrderPoint);
}
Ok(DeltaWindow {
start: previous.timestamp,
end: self.timestamp,
value: self.value.delta(&previous.value)?,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn distribution_merges_and_estimates_percentile() {
let mut left = Distribution::<()>::new(vec![
Bucket {
range: 0.0..10.0,
count: 10,
},
Bucket {
range: 10.0..20.0,
count: 0,
},
]);
let right = Distribution::<()>::new(vec![
Bucket {
range: 0.0..10.0,
count: 0,
},
Bucket {
range: 10.0..20.0,
count: 10,
},
]);
left.merge(&right).unwrap();
assert_eq!(left.total_count(), 20);
assert_eq!(left.percentile(90.0), Ok(Some(17.0)));
}
#[test]
fn cumulative_points_produce_delta_windows() {
let layout = BucketLayout::fixed_width(0.0, 10.0, 1).unwrap();
let mut previous = layout.empty_distribution::<()>();
let mut current = layout.empty_distribution::<()>();
previous.buckets[0].count = 2;
current.buckets[0].count = 5;
let window = CumulativePoint {
start: 10,
timestamp: 30,
value: current,
}
.delta_since(&CumulativePoint {
start: 10,
timestamp: 20,
value: previous,
})
.unwrap();
assert_eq!(window.start, 20);
assert_eq!(window.value.total_count(), 3);
}
}