1use ordered_float::OrderedFloat;
32use std::cmp::Ordering;
33
34#[cfg(feature = "use_serde")]
35use serde::{Deserialize, Serialize};
36
37#[derive(Debug, PartialEq, Eq, Clone)]
39#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
40pub struct Centroid {
41 mean: OrderedFloat<f64>,
42 weight: OrderedFloat<f64>,
43}
44
45impl PartialOrd for Centroid {
46 fn partial_cmp(&self, other: &Centroid) -> Option<Ordering> {
47 Some(self.cmp(other))
48 }
49}
50
51impl Ord for Centroid {
52 fn cmp(&self, other: &Centroid) -> Ordering {
53 self.mean.cmp(&other.mean)
54 }
55}
56
57impl Centroid {
58 pub fn new(mean: f64, weight: f64) -> Self {
59 Centroid {
60 mean: OrderedFloat::from(mean),
61 weight: OrderedFloat::from(weight),
62 }
63 }
64
65 #[inline]
66 pub fn mean(&self) -> f64 {
67 self.mean.into_inner()
68 }
69
70 #[inline]
71 pub fn weight(&self) -> f64 {
72 self.weight.into_inner()
73 }
74
75 pub fn add(&mut self, sum: f64, weight: f64) -> f64 {
76 let weight_: f64 = self.weight.into_inner();
77 let mean_: f64 = self.mean.into_inner();
78
79 let new_sum: f64 = sum + weight_ * mean_;
80 let new_weight: f64 = weight_ + weight;
81 self.weight = OrderedFloat::from(new_weight);
82 self.mean = OrderedFloat::from(new_sum / new_weight);
83 new_sum
84 }
85}
86
87impl Default for Centroid {
88 fn default() -> Self {
89 Centroid {
90 mean: OrderedFloat::from(0.0),
91 weight: OrderedFloat::from(1.0),
92 }
93 }
94}
95
96#[derive(Debug, PartialEq, Eq, Clone)]
98#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
99pub struct TDigest {
100 centroids: Vec<Centroid>,
101 max_size: usize,
102 sum: OrderedFloat<f64>,
103 count: OrderedFloat<f64>,
104 max: OrderedFloat<f64>,
105 min: OrderedFloat<f64>,
106}
107
108impl TDigest {
109 pub fn new_with_size(max_size: usize) -> Self {
110 TDigest {
111 centroids: Vec::new(),
112 max_size,
113 sum: OrderedFloat::from(0.0),
114 count: OrderedFloat::from(0.0),
115 max: OrderedFloat::from(std::f64::NAN),
116 min: OrderedFloat::from(std::f64::NAN),
117 }
118 }
119
120 pub fn new(centroids: Vec<Centroid>, sum: f64, count: f64, max: f64, min: f64, max_size: usize) -> Self {
121 if centroids.len() <= max_size {
122 TDigest {
123 centroids,
124 max_size,
125 sum: OrderedFloat::from(sum),
126 count: OrderedFloat::from(count),
127 max: OrderedFloat::from(max),
128 min: OrderedFloat::from(min),
129 }
130 } else {
131 let sz = centroids.len();
132 let digests: Vec<TDigest> = vec![
133 TDigest::new_with_size(100),
134 TDigest::new(centroids, sum, count, max, min, sz),
135 ];
136
137 Self::merge_digests(digests)
138 }
139 }
140
141 #[inline]
142 pub fn mean(&self) -> f64 {
143 let count_: f64 = self.count.into_inner();
144 let sum_: f64 = self.sum.into_inner();
145
146 if count_ > 0.0 {
147 sum_ / count_
148 } else {
149 0.0
150 }
151 }
152
153 #[inline]
154 pub fn sum(&self) -> f64 {
155 self.sum.into_inner()
156 }
157
158 #[inline]
159 pub fn count(&self) -> f64 {
160 self.count.into_inner()
161 }
162
163 #[inline]
164 pub fn max(&self) -> f64 {
165 self.max.into_inner()
166 }
167
168 #[inline]
169 pub fn min(&self) -> f64 {
170 self.min.into_inner()
171 }
172
173 #[inline]
174 pub fn is_empty(&self) -> bool {
175 self.centroids.is_empty()
176 }
177
178 #[inline]
179 pub fn max_size(&self) -> usize {
180 self.max_size
181 }
182}
183
184impl Default for TDigest {
185 fn default() -> Self {
186 TDigest {
187 centroids: Vec::new(),
188 max_size: 100,
189 sum: OrderedFloat::from(0.0),
190 count: OrderedFloat::from(0.0),
191 max: OrderedFloat::from(std::f64::NAN),
192 min: OrderedFloat::from(std::f64::NAN),
193 }
194 }
195}
196
197impl TDigest {
198 fn k_to_q(k: f64, d: f64) -> f64 {
199 let k_div_d = k / d;
200 if k_div_d >= 0.5 {
201 let base = 1.0 - k_div_d;
202 1.0 - 2.0 * base * base
203 } else {
204 2.0 * k_div_d * k_div_d
205 }
206 }
207
208 fn clamp(v: f64, lo: f64, hi: f64) -> f64 {
209 if v > hi {
210 hi
211 } else if v < lo {
212 lo
213 } else {
214 v
215 }
216 }
217
218 pub fn merge_unsorted(&self, unsorted_values: Vec<f64>) -> TDigest {
219 let mut sorted_values: Vec<OrderedFloat<f64>> = unsorted_values.into_iter().map(OrderedFloat::from).collect();
220 sorted_values.sort();
221 let sorted_values = sorted_values.into_iter().map(|f| f.into_inner()).collect();
222
223 self.merge_sorted(sorted_values)
224 }
225
226 pub fn merge_sorted(&self, sorted_values: Vec<f64>) -> TDigest {
227 if sorted_values.is_empty() {
228 return self.clone();
229 }
230
231 let mut result = TDigest::new_with_size(self.max_size());
232 result.count = OrderedFloat::from(self.count() + (sorted_values.len() as f64));
233
234 let maybe_min = OrderedFloat::from(*sorted_values.first().unwrap());
235 let maybe_max = OrderedFloat::from(*sorted_values.last().unwrap());
236
237 if self.count() > 0.0 {
238 result.min = std::cmp::min(self.min, maybe_min);
239 result.max = std::cmp::max(self.max, maybe_max);
240 } else {
241 result.min = maybe_min;
242 result.max = maybe_max;
243 }
244
245 let mut compressed: Vec<Centroid> = Vec::with_capacity(self.max_size);
246
247 let mut k_limit: f64 = 1.0;
248 let mut q_limit_times_count: f64 = Self::k_to_q(k_limit, self.max_size as f64) * result.count.into_inner();
249 k_limit += 1.0;
250
251 let mut iter_centroids = self.centroids.iter().peekable();
252 let mut iter_sorted_values = sorted_values.iter().peekable();
253
254 let mut curr: Centroid = if let Some(c) = iter_centroids.peek() {
255 let curr = **iter_sorted_values.peek().unwrap();
256 if c.mean() < curr {
257 iter_centroids.next().unwrap().clone()
258 } else {
259 Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
260 }
261 } else {
262 Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
263 };
264
265 let mut weight_so_far: f64 = curr.weight();
266
267 let mut sums_to_merge: f64 = 0.0;
268 let mut weights_to_merge: f64 = 0.0;
269
270 while iter_centroids.peek().is_some() || iter_sorted_values.peek().is_some() {
271 let next: Centroid = if let Some(c) = iter_centroids.peek() {
272 if iter_sorted_values.peek().is_none() || c.mean() < **iter_sorted_values.peek().unwrap() {
273 iter_centroids.next().unwrap().clone()
274 } else {
275 Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
276 }
277 } else {
278 Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
279 };
280
281 let next_sum: f64 = next.mean() * next.weight();
282 weight_so_far += next.weight();
283
284 if weight_so_far <= q_limit_times_count {
285 sums_to_merge += next_sum;
286 weights_to_merge += next.weight();
287 } else {
288 result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
289 sums_to_merge = 0.0;
290 weights_to_merge = 0.0;
291
292 compressed.push(curr.clone());
293 q_limit_times_count = Self::k_to_q(k_limit, self.max_size as f64) * result.count();
294 k_limit += 1.0;
295 curr = next;
296 }
297 }
298
299 result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
300 compressed.push(curr);
301 compressed.shrink_to_fit();
302 compressed.sort();
303
304 result.centroids = compressed;
305 result
306 }
307
308 fn external_merge(centroids: &mut Vec<Centroid>, first: usize, middle: usize, last: usize) {
309 let mut result: Vec<Centroid> = Vec::with_capacity(centroids.len());
310
311 let mut i = first;
312 let mut j = middle;
313
314 while i < middle && j < last {
315 match centroids[i].cmp(¢roids[j]) {
316 Ordering::Less => {
317 result.push(centroids[i].clone());
318 i += 1;
319 }
320 Ordering::Greater => {
321 result.push(centroids[j].clone());
322 j += 1;
323 }
324 Ordering::Equal => {
325 result.push(centroids[i].clone());
326 i += 1;
327 }
328 }
329 }
330
331 while i < middle {
332 result.push(centroids[i].clone());
333 i += 1;
334 }
335
336 while j < last {
337 result.push(centroids[j].clone());
338 j += 1;
339 }
340
341 i = first;
342 for centroid in result.into_iter() {
343 centroids[i] = centroid;
344 i += 1;
345 }
346 }
347
348 pub fn merge_digests(digests: Vec<TDigest>) -> TDigest {
350 let n_centroids: usize = digests.iter().map(|d| d.centroids.len()).sum();
351 if n_centroids == 0 {
352 return TDigest::default();
353 }
354
355 let max_size = digests.first().unwrap().max_size;
356 let mut centroids: Vec<Centroid> = Vec::with_capacity(n_centroids);
357 let mut starts: Vec<usize> = Vec::with_capacity(digests.len());
358
359 let mut count: f64 = 0.0;
360 let mut min = OrderedFloat::from(std::f64::INFINITY);
361 let mut max = OrderedFloat::from(std::f64::NEG_INFINITY);
362
363 let mut start: usize = 0;
364 for digest in digests.into_iter() {
365 starts.push(start);
366
367 let curr_count: f64 = digest.count();
368 if curr_count > 0.0 {
369 min = std::cmp::min(min, digest.min);
370 max = std::cmp::max(max, digest.max);
371 count += curr_count;
372 for centroid in digest.centroids {
373 centroids.push(centroid);
374 start += 1;
375 }
376 }
377 }
378
379 let mut digests_per_block: usize = 1;
380 while digests_per_block < starts.len() {
381 for i in (0..starts.len()).step_by(digests_per_block * 2) {
382 if i + digests_per_block < starts.len() {
383 let first = starts[i];
384 let middle = starts[i + digests_per_block];
385 let last = if i + 2 * digests_per_block < starts.len() {
386 starts[i + 2 * digests_per_block]
387 } else {
388 centroids.len()
389 };
390
391 debug_assert!(first <= middle && middle <= last);
392 Self::external_merge(&mut centroids, first, middle, last);
393 }
394 }
395
396 digests_per_block *= 2;
397 }
398
399 let mut result = TDigest::new_with_size(max_size);
400 let mut compressed: Vec<Centroid> = Vec::with_capacity(max_size);
401
402 let mut k_limit: f64 = 1.0;
403 let mut q_limit_times_count: f64 = Self::k_to_q(k_limit, max_size as f64) * (count as f64);
404
405 let mut iter_centroids = centroids.iter_mut();
406 let mut curr = iter_centroids.next().unwrap();
407 let mut weight_so_far: f64 = curr.weight();
408 let mut sums_to_merge: f64 = 0.0;
409 let mut weights_to_merge: f64 = 0.0;
410
411 for centroid in iter_centroids {
412 weight_so_far += centroid.weight();
413
414 if weight_so_far <= q_limit_times_count {
415 sums_to_merge += centroid.mean() * centroid.weight();
416 weights_to_merge += centroid.weight();
417 } else {
418 result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
419 sums_to_merge = 0.0;
420 weights_to_merge = 0.0;
421 compressed.push(curr.clone());
422 q_limit_times_count = Self::k_to_q(k_limit, max_size as f64) * (count as f64);
423 k_limit += 1.0;
424 curr = centroid;
425 }
426 }
427
428 result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
429 compressed.push(curr.clone());
430 compressed.shrink_to_fit();
431 compressed.sort();
432
433 result.count = OrderedFloat::from(count as f64);
434 result.min = min;
435 result.max = max;
436 result.centroids = compressed;
437 result
438 }
439
440 pub fn estimate_quantile(&self, q: f64) -> f64 {
442 if self.centroids.is_empty() {
443 return 0.0;
444 }
445
446 let count_: f64 = self.count.into_inner();
447 let rank: f64 = q * count_;
448
449 let mut pos: usize;
450 let mut t: f64;
451 if q > 0.5 {
452 if q >= 1.0 {
453 return self.max();
454 }
455
456 pos = 0;
457 t = count_;
458
459 for (k, centroid) in self.centroids.iter().enumerate().rev() {
460 t -= centroid.weight();
461
462 if rank >= t {
463 pos = k;
464 break;
465 }
466 }
467 } else {
468 if q <= 0.0 {
469 return self.min();
470 }
471
472 pos = self.centroids.len() - 1;
473 t = 0.0;
474
475 for (k, centroid) in self.centroids.iter().enumerate() {
476 if rank < t + centroid.weight() {
477 pos = k;
478 break;
479 }
480
481 t += centroid.weight();
482 }
483 }
484
485 let mut delta = 0.0;
486 let mut min: f64 = self.min.into_inner();
487 let mut max: f64 = self.max.into_inner();
488
489 if self.centroids.len() > 1 {
490 if pos == 0 {
491 delta = self.centroids[pos + 1].mean() - self.centroids[pos].mean();
492 max = self.centroids[pos + 1].mean();
493 } else if pos == (self.centroids.len() - 1) {
494 delta = self.centroids[pos].mean() - self.centroids[pos - 1].mean();
495 min = self.centroids[pos - 1].mean();
496 } else {
497 delta = (self.centroids[pos + 1].mean() - self.centroids[pos - 1].mean()) / 2.0;
498 min = self.centroids[pos - 1].mean();
499 max = self.centroids[pos + 1].mean();
500 }
501 }
502
503 let value = self.centroids[pos].mean() + ((rank - t) / self.centroids[pos].weight() - 0.5) * delta;
504 Self::clamp(value, min, max)
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511
512 #[test]
513 fn test_centroid_addition_regression() {
514 let vals = vec![1.0, 1.0, 1.0, 2.0, 1.0, 1.0];
517 let mut t = TDigest::new_with_size(10);
518
519 for v in vals {
520 t = t.merge_unsorted(vec![v]);
521 }
522
523 let ans = t.estimate_quantile(0.5);
524 let expected: f64 = 1.0;
525 let percentage: f64 = (expected - ans).abs() / expected;
526 assert!(percentage < 0.01);
527
528 let ans = t.estimate_quantile(0.95);
529 let expected: f64 = 2.0;
530 let percentage: f64 = (expected - ans).abs() / expected;
531 assert!(percentage < 0.01);
532 }
533
534 #[test]
535 fn test_merge_sorted_against_uniform_distro() {
536 let t = TDigest::new_with_size(100);
537 let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
538
539 let t = t.merge_sorted(values);
540
541 let ans = t.estimate_quantile(1.0);
542 let expected: f64 = 1_000_000.0;
543
544 let percentage: f64 = (expected - ans).abs() / expected;
545 assert!(percentage < 0.01);
546
547 let ans = t.estimate_quantile(0.99);
548 let expected: f64 = 990_000.0;
549
550 let percentage: f64 = (expected - ans).abs() / expected;
551 assert!(percentage < 0.01);
552
553 let ans = t.estimate_quantile(0.01);
554 let expected: f64 = 10_000.0;
555
556 let percentage: f64 = (expected - ans).abs() / expected;
557 assert!(percentage < 0.01);
558
559 let ans = t.estimate_quantile(0.0);
560 let expected: f64 = 1.0;
561
562 let percentage: f64 = (expected - ans).abs() / expected;
563 assert!(percentage < 0.01);
564
565 let ans = t.estimate_quantile(0.5);
566 let expected: f64 = 500_000.0;
567
568 let percentage: f64 = (expected - ans).abs() / expected;
569 assert!(percentage < 0.01);
570 }
571
572 #[test]
573 fn test_merge_unsorted_against_uniform_distro() {
574 let t = TDigest::new_with_size(100);
575 let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
576
577 let t = t.merge_unsorted(values);
578
579 let ans = t.estimate_quantile(1.0);
580 let expected: f64 = 1_000_000.0;
581
582 let percentage: f64 = (expected - ans).abs() / expected;
583 assert!(percentage < 0.01);
584
585 let ans = t.estimate_quantile(0.99);
586 let expected: f64 = 990_000.0;
587
588 let percentage: f64 = (expected - ans).abs() / expected;
589 assert!(percentage < 0.01);
590
591 let ans = t.estimate_quantile(0.01);
592 let expected: f64 = 10_000.0;
593
594 let percentage: f64 = (expected - ans).abs() / expected;
595 assert!(percentage < 0.01);
596
597 let ans = t.estimate_quantile(0.0);
598 let expected: f64 = 1.0;
599
600 let percentage: f64 = (expected - ans).abs() / expected;
601 assert!(percentage < 0.01);
602
603 let ans = t.estimate_quantile(0.5);
604 let expected: f64 = 500_000.0;
605
606 let percentage: f64 = (expected - ans).abs() / expected;
607 assert!(percentage < 0.01);
608 }
609
610 #[test]
611 fn test_merge_sorted_against_skewed_distro() {
612 let t = TDigest::new_with_size(100);
613 let mut values: Vec<f64> = (1..=600_000).map(f64::from).collect();
614 for _ in 0..400_000 {
615 values.push(1_000_000.0);
616 }
617
618 let t = t.merge_sorted(values);
619
620 let ans = t.estimate_quantile(0.99);
621 let expected: f64 = 1_000_000.0;
622 let percentage: f64 = (expected - ans).abs() / expected;
623 assert!(percentage < 0.01);
624
625 let ans = t.estimate_quantile(0.01);
626 let expected: f64 = 10_000.0;
627
628 let percentage: f64 = (expected - ans).abs() / expected;
629 assert!(percentage < 0.01);
630
631 let ans = t.estimate_quantile(0.5);
632 let expected: f64 = 500_000.0;
633
634 let percentage: f64 = (expected - ans).abs() / expected;
635 assert!(percentage < 0.01);
636 }
637
638 #[test]
639 fn test_merge_unsorted_against_skewed_distro() {
640 let t = TDigest::new_with_size(100);
641 let mut values: Vec<f64> = (1..=600_000).map(f64::from).collect();
642 for _ in 0..400_000 {
643 values.push(1_000_000.0);
644 }
645
646 let t = t.merge_unsorted(values);
647
648 let ans = t.estimate_quantile(0.99);
649 let expected: f64 = 1_000_000.0;
650 let percentage: f64 = (expected - ans).abs() / expected;
651 assert!(percentage < 0.01);
652
653 let ans = t.estimate_quantile(0.01);
654 let expected: f64 = 10_000.0;
655
656 let percentage: f64 = (expected - ans).abs() / expected;
657 assert!(percentage < 0.01);
658
659 let ans = t.estimate_quantile(0.5);
660 let expected: f64 = 500_000.0;
661
662 let percentage: f64 = (expected - ans).abs() / expected;
663 assert!(percentage < 0.01);
664 }
665
666 #[test]
667 fn test_merge_digests() {
668 let mut digests: Vec<TDigest> = Vec::new();
669
670 for _ in 1..=100 {
671 let t = TDigest::new_with_size(100);
672 let values: Vec<f64> = (1..=1_000).map(f64::from).collect();
673 let t = t.merge_sorted(values);
674 digests.push(t)
675 }
676
677 let t = TDigest::merge_digests(digests);
678
679 let ans = t.estimate_quantile(1.0);
680 let expected: f64 = 1000.0;
681
682 let percentage: f64 = (expected - ans).abs() / expected;
683 assert!(percentage < 0.01);
684
685 let ans = t.estimate_quantile(0.99);
686 let expected: f64 = 990.0;
687
688 let percentage: f64 = (expected - ans).abs() / expected;
689 assert!(percentage < 0.01);
690
691 let ans = t.estimate_quantile(0.01);
692 let expected: f64 = 10.0;
693
694 let percentage: f64 = (expected - ans).abs() / expected;
695 assert!(percentage < 0.2);
696
697 let ans = t.estimate_quantile(0.0);
698 let expected: f64 = 1.0;
699
700 let percentage: f64 = (expected - ans).abs() / expected;
701 assert!(percentage < 0.01);
702
703 let ans = t.estimate_quantile(0.5);
704 let expected: f64 = 500.0;
705
706 let percentage: f64 = (expected - ans).abs() / expected;
707 assert!(percentage < 0.01);
708 }
709}