Skip to main content

scepter/
distribution.rs

1use std::error::Error;
2use std::fmt;
3use std::ops::Range;
4
5/// Errors produced by distribution and histogram operations.
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum DistributionError {
8    /// Bucket layout contains no buckets.
9    EmptyLayout,
10    /// A bucket has non-finite or non-increasing bounds.
11    InvalidBucketRange,
12    /// Bucket bounds are not sorted in non-overlapping order.
13    NonIncreasingBounds,
14    /// Two distributions use incompatible bucket layouts.
15    IncompatibleLayout,
16    /// Percentile is outside `0..=100` or is not finite.
17    InvalidPercentile,
18    /// Sample or exemplar bucket index does not fit the layout.
19    BucketOutOfBounds,
20    /// Cumulative points cannot produce a delta window.
21    ResetOrOutOfOrderPoint,
22}
23
24impl fmt::Display for DistributionError {
25    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
26        let message = match self {
27            Self::EmptyLayout => "bucket layout must contain at least one bucket",
28            Self::InvalidBucketRange => "bucket range must be finite and increasing",
29            Self::NonIncreasingBounds => "bucket bounds must be sorted and non-overlapping",
30            Self::IncompatibleLayout => "distribution bucket layouts are incompatible",
31            Self::InvalidPercentile => "percentile must be finite and between 0 and 100",
32            Self::BucketOutOfBounds => "value or bucket index is outside the bucket layout",
33            Self::ResetOrOutOfOrderPoint => {
34                "cumulative points have different starts or are out of timestamp order"
35            }
36        };
37        formatter.write_str(message)
38    }
39}
40
41impl Error for DistributionError {}
42
43/// One histogram bucket.
44#[derive(Debug, Clone, PartialEq)]
45pub struct Bucket {
46    /// Half-open bucket range, except the final end may be accepted as an edge.
47    pub range: Range<f64>,
48    /// Number of samples in this bucket.
49    pub count: u64,
50}
51
52/// Ordered bucket layout for distribution metrics.
53#[derive(Debug, Clone, PartialEq)]
54pub struct BucketLayout {
55    /// Ordered bucket ranges.
56    pub ranges: Vec<Range<f64>>,
57}
58
59impl BucketLayout {
60    /// Creates a layout from explicit ranges.
61    pub fn new(ranges: Vec<Range<f64>>) -> Result<Self, DistributionError> {
62        if ranges.is_empty() {
63            return Err(DistributionError::EmptyLayout);
64        }
65
66        let mut previous_end = None;
67        for range in &ranges {
68            if !range.start.is_finite() || !range.end.is_finite() || range.start >= range.end {
69                return Err(DistributionError::InvalidBucketRange);
70            }
71
72            if previous_end.is_some_and(|end| range.start < end) {
73                return Err(DistributionError::NonIncreasingBounds);
74            }
75            previous_end = Some(range.end);
76        }
77
78        Ok(Self { ranges })
79    }
80
81    /// Creates a layout from monotonically increasing bucket bounds.
82    pub fn from_bounds(bounds: &[f64]) -> Result<Self, DistributionError> {
83        if bounds.len() < 2 {
84            return Err(DistributionError::EmptyLayout);
85        }
86
87        let ranges = bounds
88            .windows(2)
89            .map(|window| window[0]..window[1])
90            .collect();
91        Self::new(ranges)
92    }
93
94    /// Creates `buckets` fixed-width buckets starting at `start`.
95    pub fn fixed_width(start: f64, width: f64, buckets: usize) -> Result<Self, DistributionError> {
96        if buckets == 0 {
97            return Err(DistributionError::EmptyLayout);
98        }
99
100        let ranges = (0..buckets)
101            .map(|index| {
102                let lower = start + width * index as f64;
103                lower..(lower + width)
104            })
105            .collect();
106        Self::new(ranges)
107    }
108
109    /// Creates an empty distribution using this layout.
110    pub fn empty_distribution<T: Clone>(&self) -> Distribution<T> {
111        Distribution::new(
112            self.ranges
113                .iter()
114                .cloned()
115                .map(|range| Bucket { range, count: 0 })
116                .collect(),
117        )
118    }
119
120    /// Returns the bucket index that would contain `value`.
121    pub fn bucket_for(&self, value: f64) -> Option<usize> {
122        self.ranges
123            .iter()
124            .position(|range| range.start <= value && value < range.end)
125            .or_else(|| {
126                self.ranges
127                    .last()
128                    .is_some_and(|range| value == range.end)
129                    .then_some(self.ranges.len() - 1)
130            })
131    }
132}
133
134/// Representative payload for a distribution bucket.
135#[derive(Debug, Clone, PartialEq)]
136pub struct Exemplar<T> {
137    /// Numeric sample value represented by this exemplar.
138    pub value: f64,
139    /// User payload, such as a trace ID.
140    pub payload: T,
141}
142
143/// Histogram-like value type for metric distributions.
144#[derive(Debug, Clone, PartialEq)]
145pub struct Distribution<T = ()> {
146    buckets: Vec<Bucket>,
147    exemplars: Vec<Option<Exemplar<T>>>,
148}
149
150impl<T: Clone> Distribution<T> {
151    /// Creates a distribution from explicit buckets.
152    pub fn new(buckets: Vec<Bucket>) -> Self {
153        let exemplars = vec![None; buckets.len()];
154        Self { buckets, exemplars }
155    }
156
157    /// Creates an empty distribution from a layout.
158    pub fn from_layout(layout: &BucketLayout) -> Self {
159        layout.empty_distribution()
160    }
161
162    /// Returns buckets in layout order.
163    pub fn buckets(&self) -> &[Bucket] {
164        &self.buckets
165    }
166
167    /// Returns the total sample count across buckets.
168    pub fn total_count(&self) -> u64 {
169        self.buckets.iter().map(|bucket| bucket.count).sum()
170    }
171
172    /// Records `count` samples for `value`.
173    pub fn record(&mut self, value: f64, count: u64) -> Result<(), DistributionError> {
174        let Some(bucket) = self
175            .buckets
176            .iter()
177            .position(|bucket| bucket.range.start <= value && value < bucket.range.end)
178            .or_else(|| {
179                self.buckets
180                    .last()
181                    .is_some_and(|bucket| value == bucket.range.end)
182                    .then_some(self.buckets.len() - 1)
183            })
184        else {
185            return Err(DistributionError::BucketOutOfBounds);
186        };
187
188        self.buckets[bucket].count += count;
189        Ok(())
190    }
191
192    /// Sets an exemplar for a bucket.
193    pub fn set_exemplar(
194        &mut self,
195        bucket: usize,
196        exemplar: Exemplar<T>,
197    ) -> Result<(), DistributionError> {
198        let Some(slot) = self.exemplars.get_mut(bucket) else {
199            return Err(DistributionError::BucketOutOfBounds);
200        };
201        *slot = Some(exemplar);
202        Ok(())
203    }
204
205    /// Returns the exemplar for a bucket, if present.
206    pub fn exemplar(&self, bucket: usize) -> Option<&Exemplar<T>> {
207        self.exemplars.get(bucket).and_then(Option::as_ref)
208    }
209
210    /// Merges `other` into this distribution.
211    pub fn merge(&mut self, other: &Self) -> Result<(), DistributionError> {
212        self.ensure_compatible(other)?;
213
214        for (left, right) in self.buckets.iter_mut().zip(&other.buckets) {
215            left.count += right.count;
216        }
217
218        for (left, right) in self.exemplars.iter_mut().zip(&other.exemplars) {
219            if left.is_none() {
220                *left = right.clone();
221            }
222        }
223
224        Ok(())
225    }
226
227    /// Backwards-compatible alias for `merge`.
228    pub fn try_merge(&mut self, other: &Self) -> Result<(), DistributionError> {
229        self.merge(other)
230    }
231
232    /// Estimates a percentile using linear interpolation inside the selected bucket.
233    pub fn percentile(&self, percentile: f64) -> Result<Option<f64>, DistributionError> {
234        if !(0.0..=100.0).contains(&percentile) || !percentile.is_finite() {
235            return Err(DistributionError::InvalidPercentile);
236        }
237
238        let total = self.total_count();
239        if total == 0 {
240            return Ok(None);
241        }
242
243        let rank = ((percentile / 100.0) * total as f64).ceil().max(1.0) as u64;
244        let mut seen_before = 0;
245        for bucket in &self.buckets {
246            let seen_after = seen_before + bucket.count;
247            if seen_after >= rank {
248                if bucket.count == 0 {
249                    return Ok(Some(bucket.range.start));
250                }
251
252                let offset = rank.saturating_sub(seen_before).saturating_sub(1) as f64;
253                let fraction = offset / bucket.count as f64;
254                let value = bucket.range.start + (bucket.range.end - bucket.range.start) * fraction;
255                return Ok(Some(value));
256            }
257            seen_before = seen_after;
258        }
259
260        Ok(self.buckets.last().map(|bucket| bucket.range.end))
261    }
262
263    /// Computes a saturating bucket-wise delta from `previous` to `self`.
264    pub fn delta(&self, previous: &Self) -> Result<Self, DistributionError> {
265        self.ensure_compatible(previous)?;
266
267        let buckets = self
268            .buckets
269            .iter()
270            .zip(&previous.buckets)
271            .map(|(current, prior)| Bucket {
272                range: current.range.clone(),
273                count: current.count.saturating_sub(prior.count),
274            })
275            .collect();
276
277        Ok(Self::new(buckets))
278    }
279
280    fn ensure_compatible(&self, other: &Self) -> Result<(), DistributionError> {
281        if self.buckets.len() != other.buckets.len() {
282            return Err(DistributionError::IncompatibleLayout);
283        }
284
285        if self
286            .buckets
287            .iter()
288            .zip(&other.buckets)
289            .any(|(left, right)| left.range != right.range)
290        {
291            return Err(DistributionError::IncompatibleLayout);
292        }
293
294        Ok(())
295    }
296}
297
298/// One cumulative time-series point.
299#[derive(Debug, Clone, PartialEq)]
300pub struct CumulativePoint<T> {
301    /// Start timestamp for the cumulative window.
302    pub start: u64,
303    /// Point timestamp.
304    pub timestamp: u64,
305    /// Point value.
306    pub value: T,
307}
308
309/// Delta value over a time window.
310#[derive(Debug, Clone, PartialEq)]
311pub struct DeltaWindow<T> {
312    /// Inclusive window start timestamp.
313    pub start: u64,
314    /// Exclusive window end timestamp.
315    pub end: u64,
316    /// Delta value for the window.
317    pub value: T,
318}
319
320impl<T: Clone> CumulativePoint<Distribution<T>> {
321    /// Computes a delta window from a previous cumulative point.
322    pub fn delta_since(
323        &self,
324        previous: &Self,
325    ) -> Result<DeltaWindow<Distribution<T>>, DistributionError> {
326        if self.start != previous.start || self.timestamp <= previous.timestamp {
327            return Err(DistributionError::ResetOrOutOfOrderPoint);
328        }
329
330        Ok(DeltaWindow {
331            start: previous.timestamp,
332            end: self.timestamp,
333            value: self.value.delta(&previous.value)?,
334        })
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[test]
343    fn distribution_merges_and_estimates_percentile() {
344        let mut left = Distribution::<()>::new(vec![
345            Bucket {
346                range: 0.0..10.0,
347                count: 10,
348            },
349            Bucket {
350                range: 10.0..20.0,
351                count: 0,
352            },
353        ]);
354        let right = Distribution::<()>::new(vec![
355            Bucket {
356                range: 0.0..10.0,
357                count: 0,
358            },
359            Bucket {
360                range: 10.0..20.0,
361                count: 10,
362            },
363        ]);
364
365        left.merge(&right).unwrap();
366
367        assert_eq!(left.total_count(), 20);
368        assert_eq!(left.percentile(90.0), Ok(Some(17.0)));
369    }
370
371    #[test]
372    fn cumulative_points_produce_delta_windows() {
373        let layout = BucketLayout::fixed_width(0.0, 10.0, 1).unwrap();
374        let mut previous = layout.empty_distribution::<()>();
375        let mut current = layout.empty_distribution::<()>();
376        previous.buckets[0].count = 2;
377        current.buckets[0].count = 5;
378
379        let window = CumulativePoint {
380            start: 10,
381            timestamp: 30,
382            value: current,
383        }
384        .delta_since(&CumulativePoint {
385            start: 10,
386            timestamp: 20,
387            value: previous,
388        })
389        .unwrap();
390
391        assert_eq!(window.start, 20);
392        assert_eq!(window.value.total_count(), 3);
393    }
394}