oximedia_analytics/
quantile.rs1use crate::error::AnalyticsError;
17
18#[derive(Debug, Clone)]
22pub struct Centroid {
23 pub mean: f64,
25 pub weight: f64,
27}
28
29impl Centroid {
30 fn new(mean: f64, weight: f64) -> Self {
31 Self { mean, weight }
32 }
33}
34
35#[derive(Debug, Clone)]
50pub struct TDigest {
51 delta: f64,
53 centroids: Vec<Centroid>,
55 total_weight: f64,
57 buffer: Vec<f64>,
59 buffer_capacity: usize,
61 pub min: f64,
63 pub max: f64,
64}
65
66impl TDigest {
67 pub fn new(delta: f64) -> Self {
72 let buffer_capacity = (delta as usize).max(64);
73 Self {
74 delta: delta.max(1.0),
75 centroids: Vec::new(),
76 total_weight: 0.0,
77 buffer: Vec::with_capacity(buffer_capacity),
78 buffer_capacity,
79 min: f64::MAX,
80 max: f64::MIN,
81 }
82 }
83
84 pub fn add(&mut self, value: f64) {
86 self.add_weighted(value, 1.0);
87 }
88
89 pub fn add_weighted(&mut self, value: f64, weight: f64) {
91 if weight <= 0.0 {
92 return;
93 }
94 if value < self.min {
95 self.min = value;
96 }
97 if value > self.max {
98 self.max = value;
99 }
100 self.buffer.push(value);
101 self.total_weight += weight;
102 if self.buffer.len() >= self.buffer_capacity {
103 self.flush();
104 }
105 }
106
107 pub fn add_all(&mut self, values: &[f64]) {
109 for &v in values {
110 self.add(v);
111 }
112 }
113
114 pub fn merge(&mut self, other: &TDigest) {
116 for &v in &other.buffer {
118 self.add(v);
119 }
120 for c in &other.centroids {
121 self.add_weighted(c.mean, c.weight);
123 }
124 }
129
130 pub fn quantile(&mut self, q: f64) -> Result<f64, AnalyticsError> {
134 if q < 0.0 || q > 1.0 {
135 return Err(AnalyticsError::ConfigError(format!(
136 "quantile q={q} must be in [0, 1]"
137 )));
138 }
139 self.flush();
140 if self.centroids.is_empty() {
141 return Err(AnalyticsError::InsufficientData(
142 "t-digest is empty".to_string(),
143 ));
144 }
145
146 let n = self.total_weight;
147 if n == 0.0 {
148 return Err(AnalyticsError::InsufficientData(
149 "t-digest total weight is zero".to_string(),
150 ));
151 }
152
153 if q <= 0.0 {
155 return Ok(self.min);
156 }
157 if q >= 1.0 {
158 return Ok(self.max);
159 }
160
161 let target = q * n;
162
163 let mut cumulative = 0.0f64;
165 for i in 0..self.centroids.len() {
166 let half_weight = self.centroids[i].weight / 2.0;
167 let lower = cumulative;
168 let upper = cumulative + self.centroids[i].weight;
169
170 if target <= lower + half_weight {
171 if i == 0 {
173 let t = (target - lower) / half_weight;
175 return Ok(self.min + t * (self.centroids[i].mean - self.min));
176 }
177 let prev_mid = cumulative - self.centroids[i - 1].weight / 2.0;
178 let curr_mid = lower + half_weight;
179 if curr_mid > prev_mid {
180 let t = (target - prev_mid) / (curr_mid - prev_mid);
181 return Ok(self.centroids[i - 1].mean
182 + t * (self.centroids[i].mean - self.centroids[i - 1].mean));
183 }
184 return Ok(self.centroids[i].mean);
185 }
186 cumulative = upper;
187 }
188
189 Ok(self.max)
190 }
191
192 pub fn centroid_count(&self) -> usize {
194 self.centroids.len()
195 }
196
197 pub fn total_weight(&self) -> f64 {
199 self.total_weight
200 }
201
202 fn flush(&mut self) {
206 if self.buffer.is_empty() {
207 return;
208 }
209
210 let mut new_points: Vec<Centroid> = self
212 .buffer
213 .drain(..)
214 .map(|v| Centroid::new(v, 1.0))
215 .collect();
216 new_points.sort_by(|a, b| {
217 a.mean
218 .partial_cmp(&b.mean)
219 .unwrap_or(std::cmp::Ordering::Equal)
220 });
221
222 let old = std::mem::take(&mut self.centroids);
224 let mut merged: Vec<Centroid> = Vec::with_capacity(old.len() + new_points.len());
225
226 let mut old_iter = old.into_iter().peekable();
227 let mut new_iter = new_points.into_iter().peekable();
228 loop {
229 match (old_iter.peek(), new_iter.peek()) {
230 (Some(o), Some(n)) => {
231 if o.mean <= n.mean {
232 merged.push(old_iter.next().unwrap_or_else(|| unreachable!()));
233 } else {
234 merged.push(new_iter.next().unwrap_or_else(|| unreachable!()));
235 }
236 }
237 (Some(_), None) => {
238 merged.extend(old_iter);
239 break;
240 }
241 (None, Some(_)) => {
242 merged.extend(new_iter);
243 break;
244 }
245 (None, None) => break,
246 }
247 }
248
249 self.centroids = compress(merged, self.total_weight, self.delta);
251 }
252}
253
254fn compress(sorted: Vec<Centroid>, total_weight: f64, delta: f64) -> Vec<Centroid> {
260 if sorted.is_empty() {
261 return sorted;
262 }
263 let max_centroids = (delta as usize * 2).max(16);
264 let mut result: Vec<Centroid> = Vec::with_capacity(max_centroids);
265 let mut cumulative_weight = 0.0f64;
266
267 for c in sorted {
268 if let Some(last) = result.last_mut() {
269 let q = cumulative_weight / total_weight;
270 let size_limit = 4.0 * q * (1.0 - q) * total_weight / delta;
272 let size_limit = size_limit.max(1.0);
273
274 if last.weight + c.weight <= size_limit {
275 let total = last.weight + c.weight;
277 last.mean = (last.mean * last.weight + c.mean * c.weight) / total;
278 last.weight = total;
279 cumulative_weight += c.weight;
280 continue;
281 }
282 }
283 cumulative_weight += c.weight;
284 result.push(c);
285 }
286
287 result
288}
289
290pub fn percentiles(values: &[f64], percentiles: &[f64]) -> Result<Vec<f64>, AnalyticsError> {
299 if values.is_empty() {
300 return Err(AnalyticsError::InsufficientData(
301 "cannot compute percentiles of empty dataset".to_string(),
302 ));
303 }
304 let mut digest = TDigest::new(100.0);
305 digest.add_all(values);
306 percentiles
307 .iter()
308 .map(|&p| {
309 if p < 0.0 || p > 100.0 {
310 Err(AnalyticsError::ConfigError(format!(
311 "percentile {p} out of range [0, 100]"
312 )))
313 } else {
314 digest.quantile(p / 100.0)
315 }
316 })
317 .collect()
318}
319
320#[cfg(test)]
323mod tests {
324 use super::*;
325
326 #[test]
329 fn tdigest_single_value() {
330 let mut d = TDigest::new(100.0);
331 d.add(42.0);
332 let q50 = d.quantile(0.5).expect("quantile should succeed");
333 assert!((q50 - 42.0).abs() < 1e-9, "q50={q50}");
334 }
335
336 #[test]
337 fn tdigest_empty_returns_error() {
338 let mut d = TDigest::new(100.0);
339 assert!(d.quantile(0.5).is_err());
340 }
341
342 #[test]
343 fn tdigest_invalid_quantile() {
344 let mut d = TDigest::new(100.0);
345 d.add(1.0);
346 assert!(d.quantile(-0.1).is_err());
347 assert!(d.quantile(1.1).is_err());
348 }
349
350 #[test]
351 fn tdigest_uniform_distribution_p50() {
352 let mut d = TDigest::new(100.0);
354 for i in 1..=1000 {
355 d.add(i as f64);
356 }
357 let p50 = d.quantile(0.5).expect("quantile should succeed");
358 assert!(
359 (p50 - 500.0).abs() < 50.0,
360 "P50 of uniform [1..1000] should be ~500, got {p50}"
361 );
362 }
363
364 #[test]
365 fn tdigest_uniform_distribution_p95() {
366 let mut d = TDigest::new(100.0);
367 for i in 1..=1000 {
368 d.add(i as f64);
369 }
370 let p95 = d.quantile(0.95).expect("quantile should succeed");
371 assert!(
373 (p95 - 950.0).abs() < 60.0,
374 "P95 of uniform [1..1000] should be ~950, got {p95}"
375 );
376 }
377
378 #[test]
379 fn tdigest_min_max_exact() {
380 let mut d = TDigest::new(100.0);
381 for i in 1..=500 {
382 d.add(i as f64);
383 }
384 assert_eq!(d.quantile(0.0).expect("quantile should succeed"), 1.0);
385 assert_eq!(d.quantile(1.0).expect("quantile should succeed"), 500.0);
386 }
387
388 #[test]
391 fn percentiles_basic() {
392 let values: Vec<f64> = (1..=100).map(|x| x as f64).collect();
393 let result = percentiles(&values, &[50.0, 95.0, 99.0]).expect("percentiles should succeed");
394 assert_eq!(result.len(), 3);
395 assert!((result[0] - 50.0).abs() < 10.0, "P50={}", result[0]);
397 assert!((result[1] - 95.0).abs() < 10.0, "P95={}", result[1]);
398 }
399
400 #[test]
401 fn percentiles_empty_returns_error() {
402 assert!(percentiles(&[], &[50.0]).is_err());
403 }
404
405 #[test]
406 fn percentiles_out_of_range_error() {
407 let values = vec![1.0, 2.0, 3.0];
408 assert!(percentiles(&values, &[105.0]).is_err());
409 }
410
411 #[test]
414 fn tdigest_large_dataset_p99() {
415 let mut d = TDigest::new(200.0);
417 for i in 1..=10_000 {
418 d.add(i as f64);
419 }
420 let p99 = d.quantile(0.99).expect("quantile should succeed");
421 let true_p99 = 9901.0;
422 let error_pct = ((p99 - true_p99) / true_p99).abs() * 100.0;
423 assert!(
424 error_pct < 5.0,
425 "P99 error={error_pct:.2}% (p99={p99:.1}, expected~{true_p99})"
426 );
427 }
428
429 #[test]
430 fn tdigest_centroid_count_bounded() {
431 let delta = 100.0;
432 let mut d = TDigest::new(delta);
433 for i in 1..=10_000 {
434 d.add(i as f64);
435 }
436 d.quantile(0.5).ok(); assert!(
439 d.centroid_count() < 500,
440 "too many centroids: {}",
441 d.centroid_count()
442 );
443 }
444
445 #[test]
448 fn tdigest_merge_produces_consistent_quantiles() {
449 let mut d1 = TDigest::new(100.0);
450 let mut d2 = TDigest::new(100.0);
451 for i in 1..=500 {
452 d1.add(i as f64);
453 }
454 for i in 501..=1000 {
455 d2.add(i as f64);
456 }
457 d1.merge(&d2);
458 let p50 = d1.quantile(0.5).expect("quantile should succeed");
459 assert!(
460 (p50 - 500.0).abs() < 80.0,
461 "merged P50 should be ~500, got {p50}"
462 );
463 }
464}