1use std::error::Error;
2use std::fmt;
3use std::ops::Range;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum DistributionError {
8 EmptyLayout,
10 InvalidBucketRange,
12 NonIncreasingBounds,
14 IncompatibleLayout,
16 InvalidPercentile,
18 BucketOutOfBounds,
20 ResetOrOutOfOrderPoint,
22}
23
24impl fmt::Display for DistributionError {
25 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
26 let message = match self {
27 Self::EmptyLayout => "bucket layout must contain at least one bucket",
28 Self::InvalidBucketRange => "bucket range must be finite and increasing",
29 Self::NonIncreasingBounds => "bucket bounds must be sorted and non-overlapping",
30 Self::IncompatibleLayout => "distribution bucket layouts are incompatible",
31 Self::InvalidPercentile => "percentile must be finite and between 0 and 100",
32 Self::BucketOutOfBounds => "value or bucket index is outside the bucket layout",
33 Self::ResetOrOutOfOrderPoint => {
34 "cumulative points have different starts or are out of timestamp order"
35 }
36 };
37 formatter.write_str(message)
38 }
39}
40
41impl Error for DistributionError {}
42
43#[derive(Debug, Clone, PartialEq)]
45pub struct Bucket {
46 pub range: Range<f64>,
48 pub count: u64,
50}
51
52#[derive(Debug, Clone, PartialEq)]
54pub struct BucketLayout {
55 pub ranges: Vec<Range<f64>>,
57}
58
59impl BucketLayout {
60 pub fn new(ranges: Vec<Range<f64>>) -> Result<Self, DistributionError> {
62 if ranges.is_empty() {
63 return Err(DistributionError::EmptyLayout);
64 }
65
66 let mut previous_end = None;
67 for range in &ranges {
68 if !range.start.is_finite() || !range.end.is_finite() || range.start >= range.end {
69 return Err(DistributionError::InvalidBucketRange);
70 }
71
72 if previous_end.is_some_and(|end| range.start < end) {
73 return Err(DistributionError::NonIncreasingBounds);
74 }
75 previous_end = Some(range.end);
76 }
77
78 Ok(Self { ranges })
79 }
80
81 pub fn from_bounds(bounds: &[f64]) -> Result<Self, DistributionError> {
83 if bounds.len() < 2 {
84 return Err(DistributionError::EmptyLayout);
85 }
86
87 let ranges = bounds
88 .windows(2)
89 .map(|window| window[0]..window[1])
90 .collect();
91 Self::new(ranges)
92 }
93
94 pub fn fixed_width(start: f64, width: f64, buckets: usize) -> Result<Self, DistributionError> {
96 if buckets == 0 {
97 return Err(DistributionError::EmptyLayout);
98 }
99
100 let ranges = (0..buckets)
101 .map(|index| {
102 let lower = start + width * index as f64;
103 lower..(lower + width)
104 })
105 .collect();
106 Self::new(ranges)
107 }
108
109 pub fn empty_distribution<T: Clone>(&self) -> Distribution<T> {
111 Distribution::new(
112 self.ranges
113 .iter()
114 .cloned()
115 .map(|range| Bucket { range, count: 0 })
116 .collect(),
117 )
118 }
119
120 pub fn bucket_for(&self, value: f64) -> Option<usize> {
122 self.ranges
123 .iter()
124 .position(|range| range.start <= value && value < range.end)
125 .or_else(|| {
126 self.ranges
127 .last()
128 .is_some_and(|range| value == range.end)
129 .then_some(self.ranges.len() - 1)
130 })
131 }
132}
133
134#[derive(Debug, Clone, PartialEq)]
136pub struct Exemplar<T> {
137 pub value: f64,
139 pub payload: T,
141}
142
143#[derive(Debug, Clone, PartialEq)]
145pub struct Distribution<T = ()> {
146 buckets: Vec<Bucket>,
147 exemplars: Vec<Option<Exemplar<T>>>,
148}
149
150impl<T: Clone> Distribution<T> {
151 pub fn new(buckets: Vec<Bucket>) -> Self {
153 let exemplars = vec![None; buckets.len()];
154 Self { buckets, exemplars }
155 }
156
157 pub fn from_layout(layout: &BucketLayout) -> Self {
159 layout.empty_distribution()
160 }
161
162 pub fn buckets(&self) -> &[Bucket] {
164 &self.buckets
165 }
166
167 pub fn total_count(&self) -> u64 {
169 self.buckets.iter().map(|bucket| bucket.count).sum()
170 }
171
172 pub fn record(&mut self, value: f64, count: u64) -> Result<(), DistributionError> {
174 let Some(bucket) = self
175 .buckets
176 .iter()
177 .position(|bucket| bucket.range.start <= value && value < bucket.range.end)
178 .or_else(|| {
179 self.buckets
180 .last()
181 .is_some_and(|bucket| value == bucket.range.end)
182 .then_some(self.buckets.len() - 1)
183 })
184 else {
185 return Err(DistributionError::BucketOutOfBounds);
186 };
187
188 self.buckets[bucket].count += count;
189 Ok(())
190 }
191
192 pub fn set_exemplar(
194 &mut self,
195 bucket: usize,
196 exemplar: Exemplar<T>,
197 ) -> Result<(), DistributionError> {
198 let Some(slot) = self.exemplars.get_mut(bucket) else {
199 return Err(DistributionError::BucketOutOfBounds);
200 };
201 *slot = Some(exemplar);
202 Ok(())
203 }
204
205 pub fn exemplar(&self, bucket: usize) -> Option<&Exemplar<T>> {
207 self.exemplars.get(bucket).and_then(Option::as_ref)
208 }
209
210 pub fn merge(&mut self, other: &Self) -> Result<(), DistributionError> {
212 self.ensure_compatible(other)?;
213
214 for (left, right) in self.buckets.iter_mut().zip(&other.buckets) {
215 left.count += right.count;
216 }
217
218 for (left, right) in self.exemplars.iter_mut().zip(&other.exemplars) {
219 if left.is_none() {
220 *left = right.clone();
221 }
222 }
223
224 Ok(())
225 }
226
227 pub fn try_merge(&mut self, other: &Self) -> Result<(), DistributionError> {
229 self.merge(other)
230 }
231
232 pub fn percentile(&self, percentile: f64) -> Result<Option<f64>, DistributionError> {
234 if !(0.0..=100.0).contains(&percentile) || !percentile.is_finite() {
235 return Err(DistributionError::InvalidPercentile);
236 }
237
238 let total = self.total_count();
239 if total == 0 {
240 return Ok(None);
241 }
242
243 let rank = ((percentile / 100.0) * total as f64).ceil().max(1.0) as u64;
244 let mut seen_before = 0;
245 for bucket in &self.buckets {
246 let seen_after = seen_before + bucket.count;
247 if seen_after >= rank {
248 if bucket.count == 0 {
249 return Ok(Some(bucket.range.start));
250 }
251
252 let offset = rank.saturating_sub(seen_before).saturating_sub(1) as f64;
253 let fraction = offset / bucket.count as f64;
254 let value = bucket.range.start + (bucket.range.end - bucket.range.start) * fraction;
255 return Ok(Some(value));
256 }
257 seen_before = seen_after;
258 }
259
260 Ok(self.buckets.last().map(|bucket| bucket.range.end))
261 }
262
263 pub fn delta(&self, previous: &Self) -> Result<Self, DistributionError> {
265 self.ensure_compatible(previous)?;
266
267 let buckets = self
268 .buckets
269 .iter()
270 .zip(&previous.buckets)
271 .map(|(current, prior)| Bucket {
272 range: current.range.clone(),
273 count: current.count.saturating_sub(prior.count),
274 })
275 .collect();
276
277 Ok(Self::new(buckets))
278 }
279
280 fn ensure_compatible(&self, other: &Self) -> Result<(), DistributionError> {
281 if self.buckets.len() != other.buckets.len() {
282 return Err(DistributionError::IncompatibleLayout);
283 }
284
285 if self
286 .buckets
287 .iter()
288 .zip(&other.buckets)
289 .any(|(left, right)| left.range != right.range)
290 {
291 return Err(DistributionError::IncompatibleLayout);
292 }
293
294 Ok(())
295 }
296}
297
298#[derive(Debug, Clone, PartialEq)]
300pub struct CumulativePoint<T> {
301 pub start: u64,
303 pub timestamp: u64,
305 pub value: T,
307}
308
309#[derive(Debug, Clone, PartialEq)]
311pub struct DeltaWindow<T> {
312 pub start: u64,
314 pub end: u64,
316 pub value: T,
318}
319
320impl<T: Clone> CumulativePoint<Distribution<T>> {
321 pub fn delta_since(
323 &self,
324 previous: &Self,
325 ) -> Result<DeltaWindow<Distribution<T>>, DistributionError> {
326 if self.start != previous.start || self.timestamp <= previous.timestamp {
327 return Err(DistributionError::ResetOrOutOfOrderPoint);
328 }
329
330 Ok(DeltaWindow {
331 start: previous.timestamp,
332 end: self.timestamp,
333 value: self.value.delta(&previous.value)?,
334 })
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[test]
343 fn distribution_merges_and_estimates_percentile() {
344 let mut left = Distribution::<()>::new(vec![
345 Bucket {
346 range: 0.0..10.0,
347 count: 10,
348 },
349 Bucket {
350 range: 10.0..20.0,
351 count: 0,
352 },
353 ]);
354 let right = Distribution::<()>::new(vec![
355 Bucket {
356 range: 0.0..10.0,
357 count: 0,
358 },
359 Bucket {
360 range: 10.0..20.0,
361 count: 10,
362 },
363 ]);
364
365 left.merge(&right).unwrap();
366
367 assert_eq!(left.total_count(), 20);
368 assert_eq!(left.percentile(90.0), Ok(Some(17.0)));
369 }
370
371 #[test]
372 fn cumulative_points_produce_delta_windows() {
373 let layout = BucketLayout::fixed_width(0.0, 10.0, 1).unwrap();
374 let mut previous = layout.empty_distribution::<()>();
375 let mut current = layout.empty_distribution::<()>();
376 previous.buckets[0].count = 2;
377 current.buckets[0].count = 5;
378
379 let window = CumulativePoint {
380 start: 10,
381 timestamp: 30,
382 value: current,
383 }
384 .delta_since(&CumulativePoint {
385 start: 10,
386 timestamp: 20,
387 value: previous,
388 })
389 .unwrap();
390
391 assert_eq!(window.start, 20);
392 assert_eq!(window.value.total_count(), 3);
393 }
394}