scepter 0.1.1

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::ops::Range;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DistributionError {
    EmptyLayout,
    InvalidBucketRange,
    NonIncreasingBounds,
    IncompatibleLayout,
    InvalidPercentile,
    BucketOutOfBounds,
    ResetOrOutOfOrderPoint,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Bucket {
    pub range: Range<f64>,
    pub count: u64,
}

#[derive(Debug, Clone, PartialEq)]
pub struct BucketLayout {
    pub ranges: Vec<Range<f64>>,
}

impl BucketLayout {
    pub fn new(ranges: Vec<Range<f64>>) -> Result<Self, DistributionError> {
        if ranges.is_empty() {
            return Err(DistributionError::EmptyLayout);
        }

        let mut previous_end = None;
        for range in &ranges {
            if !range.start.is_finite() || !range.end.is_finite() || range.start >= range.end {
                return Err(DistributionError::InvalidBucketRange);
            }

            if previous_end.is_some_and(|end| range.start < end) {
                return Err(DistributionError::NonIncreasingBounds);
            }
            previous_end = Some(range.end);
        }

        Ok(Self { ranges })
    }

    pub fn from_bounds(bounds: &[f64]) -> Result<Self, DistributionError> {
        if bounds.len() < 2 {
            return Err(DistributionError::EmptyLayout);
        }

        let ranges = bounds
            .windows(2)
            .map(|window| window[0]..window[1])
            .collect();
        Self::new(ranges)
    }

    pub fn fixed_width(start: f64, width: f64, buckets: usize) -> Result<Self, DistributionError> {
        if buckets == 0 {
            return Err(DistributionError::EmptyLayout);
        }

        let ranges = (0..buckets)
            .map(|index| {
                let lower = start + width * index as f64;
                lower..(lower + width)
            })
            .collect();
        Self::new(ranges)
    }

    pub fn empty_distribution<T: Clone>(&self) -> Distribution<T> {
        Distribution::new(
            self.ranges
                .iter()
                .cloned()
                .map(|range| Bucket { range, count: 0 })
                .collect(),
        )
    }

    pub fn bucket_for(&self, value: f64) -> Option<usize> {
        self.ranges
            .iter()
            .position(|range| range.start <= value && value < range.end)
            .or_else(|| {
                self.ranges
                    .last()
                    .is_some_and(|range| value == range.end)
                    .then_some(self.ranges.len() - 1)
            })
    }
}

#[derive(Debug, Clone, PartialEq)]
pub struct Exemplar<T> {
    pub value: f64,
    pub payload: T,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Distribution<T = ()> {
    buckets: Vec<Bucket>,
    exemplars: Vec<Option<Exemplar<T>>>,
}

impl<T: Clone> Distribution<T> {
    pub fn new(buckets: Vec<Bucket>) -> Self {
        let exemplars = vec![None; buckets.len()];
        Self { buckets, exemplars }
    }

    pub fn from_layout(layout: &BucketLayout) -> Self {
        layout.empty_distribution()
    }

    pub fn buckets(&self) -> &[Bucket] {
        &self.buckets
    }

    pub fn total_count(&self) -> u64 {
        self.buckets.iter().map(|bucket| bucket.count).sum()
    }

    pub fn record(&mut self, value: f64, count: u64) -> Result<(), DistributionError> {
        let Some(bucket) = self
            .buckets
            .iter()
            .position(|bucket| bucket.range.start <= value && value < bucket.range.end)
            .or_else(|| {
                self.buckets
                    .last()
                    .is_some_and(|bucket| value == bucket.range.end)
                    .then_some(self.buckets.len() - 1)
            })
        else {
            return Err(DistributionError::BucketOutOfBounds);
        };

        self.buckets[bucket].count += count;
        Ok(())
    }

    pub fn set_exemplar(
        &mut self,
        bucket: usize,
        exemplar: Exemplar<T>,
    ) -> Result<(), DistributionError> {
        let Some(slot) = self.exemplars.get_mut(bucket) else {
            return Err(DistributionError::BucketOutOfBounds);
        };
        *slot = Some(exemplar);
        Ok(())
    }

    pub fn exemplar(&self, bucket: usize) -> Option<&Exemplar<T>> {
        self.exemplars.get(bucket).and_then(Option::as_ref)
    }

    pub fn merge(&mut self, other: &Self) -> Result<(), DistributionError> {
        self.ensure_compatible(other)?;

        for (left, right) in self.buckets.iter_mut().zip(&other.buckets) {
            left.count += right.count;
        }

        for (left, right) in self.exemplars.iter_mut().zip(&other.exemplars) {
            if left.is_none() {
                *left = right.clone();
            }
        }

        Ok(())
    }

    pub fn try_merge(&mut self, other: &Self) -> Result<(), DistributionError> {
        self.merge(other)
    }

    pub fn percentile(&self, percentile: f64) -> Result<Option<f64>, DistributionError> {
        if !(0.0..=100.0).contains(&percentile) || !percentile.is_finite() {
            return Err(DistributionError::InvalidPercentile);
        }

        let total = self.total_count();
        if total == 0 {
            return Ok(None);
        }

        let rank = ((percentile / 100.0) * total as f64).ceil().max(1.0) as u64;
        let mut seen_before = 0;
        for bucket in &self.buckets {
            let seen_after = seen_before + bucket.count;
            if seen_after >= rank {
                if bucket.count == 0 {
                    return Ok(Some(bucket.range.start));
                }

                let offset = rank.saturating_sub(seen_before).saturating_sub(1) as f64;
                let fraction = offset / bucket.count as f64;
                let value = bucket.range.start + (bucket.range.end - bucket.range.start) * fraction;
                return Ok(Some(value));
            }
            seen_before = seen_after;
        }

        Ok(self.buckets.last().map(|bucket| bucket.range.end))
    }

    pub fn delta(&self, previous: &Self) -> Result<Self, DistributionError> {
        self.ensure_compatible(previous)?;

        let buckets = self
            .buckets
            .iter()
            .zip(&previous.buckets)
            .map(|(current, prior)| Bucket {
                range: current.range.clone(),
                count: current.count.saturating_sub(prior.count),
            })
            .collect();

        Ok(Self::new(buckets))
    }

    fn ensure_compatible(&self, other: &Self) -> Result<(), DistributionError> {
        if self.buckets.len() != other.buckets.len() {
            return Err(DistributionError::IncompatibleLayout);
        }

        if self
            .buckets
            .iter()
            .zip(&other.buckets)
            .any(|(left, right)| left.range != right.range)
        {
            return Err(DistributionError::IncompatibleLayout);
        }

        Ok(())
    }
}

#[derive(Debug, Clone, PartialEq)]
pub struct CumulativePoint<T> {
    pub start: u64,
    pub timestamp: u64,
    pub value: T,
}

#[derive(Debug, Clone, PartialEq)]
pub struct DeltaWindow<T> {
    pub start: u64,
    pub end: u64,
    pub value: T,
}

impl<T: Clone> CumulativePoint<Distribution<T>> {
    pub fn delta_since(
        &self,
        previous: &Self,
    ) -> Result<DeltaWindow<Distribution<T>>, DistributionError> {
        if self.start != previous.start || self.timestamp <= previous.timestamp {
            return Err(DistributionError::ResetOrOutOfOrderPoint);
        }

        Ok(DeltaWindow {
            start: previous.timestamp,
            end: self.timestamp,
            value: self.value.delta(&previous.value)?,
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn distribution_merges_and_estimates_percentile() {
        let mut left = Distribution::<()>::new(vec![
            Bucket {
                range: 0.0..10.0,
                count: 10,
            },
            Bucket {
                range: 10.0..20.0,
                count: 0,
            },
        ]);
        let right = Distribution::<()>::new(vec![
            Bucket {
                range: 0.0..10.0,
                count: 0,
            },
            Bucket {
                range: 10.0..20.0,
                count: 10,
            },
        ]);

        left.merge(&right).unwrap();

        assert_eq!(left.total_count(), 20);
        assert_eq!(left.percentile(90.0), Ok(Some(17.0)));
    }

    #[test]
    fn cumulative_points_produce_delta_windows() {
        let layout = BucketLayout::fixed_width(0.0, 10.0, 1).unwrap();
        let mut previous = layout.empty_distribution::<()>();
        let mut current = layout.empty_distribution::<()>();
        previous.buckets[0].count = 2;
        current.buckets[0].count = 5;

        let window = CumulativePoint {
            start: 10,
            timestamp: 30,
            value: current,
        }
        .delta_since(&CumulativePoint {
            start: 10,
            timestamp: 20,
            value: previous,
        })
        .unwrap();

        assert_eq!(window.start, 20);
        assert_eq!(window.value.total_count(), 3);
    }
}