1use ordered_float::OrderedFloat;
32use std::cmp::Ordering;
33
34#[cfg(feature = "use_serde")]
35use serde::{Deserialize, Serialize};
36
37#[derive(Debug, PartialEq, Eq, Clone)]
39#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
40pub struct Centroid {
41 mean: OrderedFloat<f64>,
42 weight: OrderedFloat<f64>,
43}
44
45impl PartialOrd for Centroid {
46 fn partial_cmp(&self, other: &Centroid) -> Option<Ordering> {
47 Some(self.cmp(other))
48 }
49}
50
51impl Ord for Centroid {
52 fn cmp(&self, other: &Centroid) -> Ordering {
53 self.mean.cmp(&other.mean)
54 }
55}
56
57impl Centroid {
58 pub fn new(mean: f64, weight: f64) -> Self {
59 Centroid {
60 mean: OrderedFloat::from(mean),
61 weight: OrderedFloat::from(weight),
62 }
63 }
64
65 #[inline]
66 pub fn mean(&self) -> f64 {
67 self.mean.into_inner()
68 }
69
70 #[inline]
71 pub fn weight(&self) -> f64 {
72 self.weight.into_inner()
73 }
74
75 pub fn add(&mut self, sum: f64, weight: f64) -> f64 {
76 let weight_: f64 = self.weight.into_inner();
77 let mean_: f64 = self.mean.into_inner();
78
79 let new_sum: f64 = sum + weight_ * mean_;
80 let new_weight: f64 = weight_ + weight;
81 self.weight = OrderedFloat::from(new_weight);
82 self.mean = OrderedFloat::from(new_sum / new_weight);
83 new_sum
84 }
85}
86
87impl Default for Centroid {
88 fn default() -> Self {
89 Centroid {
90 mean: OrderedFloat::from(0.0),
91 weight: OrderedFloat::from(1.0),
92 }
93 }
94}
95
96#[derive(Debug, PartialEq, Eq, Clone)]
98#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
99pub struct TDigest {
100 centroids: Vec<Centroid>,
101 max_size: usize,
102 sum: OrderedFloat<f64>,
103 count: OrderedFloat<f64>,
104 max: OrderedFloat<f64>,
105 min: OrderedFloat<f64>,
106}
107
108impl TDigest {
109 pub fn new_with_size(max_size: usize) -> Self {
110 TDigest {
111 centroids: Vec::new(),
112 max_size,
113 sum: OrderedFloat::from(0.0),
114 count: OrderedFloat::from(0.0),
115 max: OrderedFloat::from(std::f64::NAN),
116 min: OrderedFloat::from(std::f64::NAN),
117 }
118 }
119
120 pub fn new(centroids: Vec<Centroid>, sum: f64, count: f64, max: f64, min: f64, max_size: usize) -> Self {
121 if centroids.len() <= max_size {
122 TDigest {
123 centroids,
124 max_size,
125 sum: OrderedFloat::from(sum),
126 count: OrderedFloat::from(count),
127 max: OrderedFloat::from(max),
128 min: OrderedFloat::from(min),
129 }
130 } else {
131 let sz = centroids.len();
132 let digests: Vec<TDigest> = vec![
133 TDigest::new_with_size(100),
134 TDigest::new(centroids, sum, count, max, min, sz),
135 ];
136
137 Self::merge_digests(digests)
138 }
139 }
140
141 #[inline]
142 pub fn mean(&self) -> f64 {
143 let count_: f64 = self.count.into_inner();
144 let sum_: f64 = self.sum.into_inner();
145
146 if count_ > 0.0 {
147 sum_ / count_
148 } else {
149 0.0
150 }
151 }
152
153 #[inline]
154 pub fn sum(&self) -> f64 {
155 self.sum.into_inner()
156 }
157
158 #[inline]
159 pub fn centroids(&self) -> Vec<Centroid> {
160 self.centroids.clone()
161 }
162
163 #[inline]
164 pub fn count(&self) -> f64 {
165 self.count.into_inner()
166 }
167
168 #[inline]
169 pub fn max(&self) -> f64 {
170 self.max.into_inner()
171 }
172
173 #[inline]
174 pub fn min(&self) -> f64 {
175 self.min.into_inner()
176 }
177
178 #[inline]
179 pub fn is_empty(&self) -> bool {
180 self.centroids.is_empty()
181 }
182
183 #[inline]
184 pub fn max_size(&self) -> usize {
185 self.max_size
186 }
187}
188
189impl Default for TDigest {
190 fn default() -> Self {
191 TDigest {
192 centroids: Vec::new(),
193 max_size: 100,
194 sum: OrderedFloat::from(0.0),
195 count: OrderedFloat::from(0.0),
196 max: OrderedFloat::from(std::f64::NAN),
197 min: OrderedFloat::from(std::f64::NAN),
198 }
199 }
200}
201
202impl TDigest {
203 fn k_to_q(k: f64, d: f64) -> f64 {
204 let k_div_d = k / d;
205 if k_div_d >= 0.5 {
206 let base = 1.0 - k_div_d;
207 1.0 - 2.0 * base * base
208 } else {
209 2.0 * k_div_d * k_div_d
210 }
211 }
212
213 fn clamp(v: f64, lo: f64, hi: f64) -> f64 {
214 if v > hi {
215 hi
216 } else if v < lo {
217 lo
218 } else {
219 v
220 }
221 }
222
223 pub fn merge_unsorted(&self, unsorted_values: Vec<f64>) -> TDigest {
224 let mut sorted_values: Vec<OrderedFloat<f64>> = unsorted_values.into_iter().map(OrderedFloat::from).collect();
225 sorted_values.sort();
226 let sorted_values = sorted_values.into_iter().map(|f| f.into_inner()).collect();
227
228 self.merge_sorted(sorted_values)
229 }
230
231 pub fn merge_sorted(&self, sorted_values: Vec<f64>) -> TDigest {
232 if sorted_values.is_empty() {
233 return self.clone();
234 }
235
236 let mut result = TDigest::new_with_size(self.max_size());
237 result.count = OrderedFloat::from(self.count() + (sorted_values.len() as f64));
238
239 let maybe_min = OrderedFloat::from(*sorted_values.first().unwrap());
240 let maybe_max = OrderedFloat::from(*sorted_values.last().unwrap());
241
242 if self.count() > 0.0 {
243 result.min = std::cmp::min(self.min, maybe_min);
244 result.max = std::cmp::max(self.max, maybe_max);
245 } else {
246 result.min = maybe_min;
247 result.max = maybe_max;
248 }
249
250 let mut compressed: Vec<Centroid> = Vec::with_capacity(self.max_size);
251
252 let mut k_limit: f64 = 1.0;
253 let mut q_limit_times_count: f64 = Self::k_to_q(k_limit, self.max_size as f64) * result.count.into_inner();
254 k_limit += 1.0;
255
256 let mut iter_centroids = self.centroids.iter().peekable();
257 let mut iter_sorted_values = sorted_values.iter().peekable();
258
259 let mut curr: Centroid = if let Some(c) = iter_centroids.peek() {
260 let curr = **iter_sorted_values.peek().unwrap();
261 if c.mean() < curr {
262 iter_centroids.next().unwrap().clone()
263 } else {
264 Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
265 }
266 } else {
267 Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
268 };
269
270 let mut weight_so_far: f64 = curr.weight();
271
272 let mut sums_to_merge: f64 = 0.0;
273 let mut weights_to_merge: f64 = 0.0;
274
275 while iter_centroids.peek().is_some() || iter_sorted_values.peek().is_some() {
276 let next: Centroid = if let Some(c) = iter_centroids.peek() {
277 if iter_sorted_values.peek().is_none() || c.mean() < **iter_sorted_values.peek().unwrap() {
278 iter_centroids.next().unwrap().clone()
279 } else {
280 Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
281 }
282 } else {
283 Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
284 };
285
286 let next_sum: f64 = next.mean() * next.weight();
287 weight_so_far += next.weight();
288
289 if weight_so_far <= q_limit_times_count {
290 sums_to_merge += next_sum;
291 weights_to_merge += next.weight();
292 } else {
293 result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
294 sums_to_merge = 0.0;
295 weights_to_merge = 0.0;
296
297 compressed.push(curr.clone());
298 q_limit_times_count = Self::k_to_q(k_limit, self.max_size as f64) * result.count();
299 k_limit += 1.0;
300 curr = next;
301 }
302 }
303
304 result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
305 compressed.push(curr);
306 compressed.shrink_to_fit();
307 compressed.sort();
308
309 result.centroids = compressed;
310 result
311 }
312
313 fn external_merge(centroids: &mut Vec<Centroid>, first: usize, middle: usize, last: usize) {
314 let mut result: Vec<Centroid> = Vec::with_capacity(centroids.len());
315
316 let mut i = first;
317 let mut j = middle;
318
319 while i < middle && j < last {
320 match centroids[i].cmp(¢roids[j]) {
321 Ordering::Less => {
322 result.push(centroids[i].clone());
323 i += 1;
324 }
325 Ordering::Greater => {
326 result.push(centroids[j].clone());
327 j += 1;
328 }
329 Ordering::Equal => {
330 result.push(centroids[i].clone());
331 i += 1;
332 }
333 }
334 }
335
336 while i < middle {
337 result.push(centroids[i].clone());
338 i += 1;
339 }
340
341 while j < last {
342 result.push(centroids[j].clone());
343 j += 1;
344 }
345
346 i = first;
347 for centroid in result.into_iter() {
348 centroids[i] = centroid;
349 i += 1;
350 }
351 }
352
353 pub fn merge_digests(digests: Vec<TDigest>) -> TDigest {
355 let n_centroids: usize = digests.iter().map(|d| d.centroids.len()).sum();
356 if n_centroids == 0 {
357 return TDigest::default();
358 }
359
360 let max_size = digests.first().unwrap().max_size;
361 let mut centroids: Vec<Centroid> = Vec::with_capacity(n_centroids);
362 let mut starts: Vec<usize> = Vec::with_capacity(digests.len());
363
364 let mut count: f64 = 0.0;
365 let mut min = OrderedFloat::from(std::f64::INFINITY);
366 let mut max = OrderedFloat::from(std::f64::NEG_INFINITY);
367
368 let mut start: usize = 0;
369 for digest in digests.into_iter() {
370 starts.push(start);
371
372 let curr_count: f64 = digest.count();
373 if curr_count > 0.0 {
374 min = std::cmp::min(min, digest.min);
375 max = std::cmp::max(max, digest.max);
376 count += curr_count;
377 for centroid in digest.centroids {
378 centroids.push(centroid);
379 start += 1;
380 }
381 }
382 }
383
384 let mut digests_per_block: usize = 1;
385 while digests_per_block < starts.len() {
386 for i in (0..starts.len()).step_by(digests_per_block * 2) {
387 if i + digests_per_block < starts.len() {
388 let first = starts[i];
389 let middle = starts[i + digests_per_block];
390 let last = if i + 2 * digests_per_block < starts.len() {
391 starts[i + 2 * digests_per_block]
392 } else {
393 centroids.len()
394 };
395
396 debug_assert!(first <= middle && middle <= last);
397 Self::external_merge(&mut centroids, first, middle, last);
398 }
399 }
400
401 digests_per_block *= 2;
402 }
403
404 let mut result = TDigest::new_with_size(max_size);
405 let mut compressed: Vec<Centroid> = Vec::with_capacity(max_size);
406
407 let mut k_limit: f64 = 1.0;
408 let mut q_limit_times_count: f64 = Self::k_to_q(k_limit, max_size as f64) * (count as f64);
409
410 let mut iter_centroids = centroids.iter_mut();
411 let mut curr = iter_centroids.next().unwrap();
412 let mut weight_so_far: f64 = curr.weight();
413 let mut sums_to_merge: f64 = 0.0;
414 let mut weights_to_merge: f64 = 0.0;
415
416 for centroid in iter_centroids {
417 weight_so_far += centroid.weight();
418
419 if weight_so_far <= q_limit_times_count {
420 sums_to_merge += centroid.mean() * centroid.weight();
421 weights_to_merge += centroid.weight();
422 } else {
423 result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
424 sums_to_merge = 0.0;
425 weights_to_merge = 0.0;
426 compressed.push(curr.clone());
427 q_limit_times_count = Self::k_to_q(k_limit, max_size as f64) * (count as f64);
428 k_limit += 1.0;
429 curr = centroid;
430 }
431 }
432
433 result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
434 compressed.push(curr.clone());
435 compressed.shrink_to_fit();
436 compressed.sort();
437
438 result.count = OrderedFloat::from(count as f64);
439 result.min = min;
440 result.max = max;
441 result.centroids = compressed;
442 result
443 }
444
445 pub fn estimate_quantile(&self, q: f64) -> f64 {
447 if self.centroids.is_empty() {
448 return 0.0;
449 }
450
451 let count_: f64 = self.count.into_inner();
452 let rank: f64 = q * count_;
453
454 let mut pos: usize;
455 let mut t: f64;
456 if q > 0.5 {
457 if q >= 1.0 {
458 return self.max();
459 }
460
461 pos = 0;
462 t = count_;
463
464 for (k, centroid) in self.centroids.iter().enumerate().rev() {
465 t -= centroid.weight();
466
467 if rank >= t {
468 pos = k;
469 break;
470 }
471 }
472 } else {
473 if q <= 0.0 {
474 return self.min();
475 }
476
477 pos = self.centroids.len() - 1;
478 t = 0.0;
479
480 for (k, centroid) in self.centroids.iter().enumerate() {
481 if rank < t + centroid.weight() {
482 pos = k;
483 break;
484 }
485
486 t += centroid.weight();
487 }
488 }
489
490 let mut delta = 0.0;
491 let mut min: f64 = self.min.into_inner();
492 let mut max: f64 = self.max.into_inner();
493
494 if self.centroids.len() > 1 {
495 if pos == 0 {
496 delta = self.centroids[pos + 1].mean() - self.centroids[pos].mean();
497 max = self.centroids[pos + 1].mean();
498 } else if pos == (self.centroids.len() - 1) {
499 delta = self.centroids[pos].mean() - self.centroids[pos - 1].mean();
500 min = self.centroids[pos - 1].mean();
501 } else {
502 delta = (self.centroids[pos + 1].mean() - self.centroids[pos - 1].mean()) / 2.0;
503 min = self.centroids[pos - 1].mean();
504 max = self.centroids[pos + 1].mean();
505 }
506 }
507
508 let value = self.centroids[pos].mean() + ((rank - t) / self.centroids[pos].weight() - 0.5) * delta;
509 Self::clamp(value, min, max)
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 #[test]
518 fn test_centroid_addition_regression() {
519 let vals = vec![1.0, 1.0, 1.0, 2.0, 1.0, 1.0];
522 let mut t = TDigest::new_with_size(10);
523
524 for v in vals {
525 t = t.merge_unsorted(vec![v]);
526 }
527
528 let ans = t.estimate_quantile(0.5);
529 let expected: f64 = 1.0;
530 let percentage: f64 = (expected - ans).abs() / expected;
531 assert!(percentage < 0.01);
532
533 let ans = t.estimate_quantile(0.95);
534 let expected: f64 = 2.0;
535 let percentage: f64 = (expected - ans).abs() / expected;
536 assert!(percentage < 0.01);
537 }
538
539 #[test]
540 fn test_merge_sorted_against_uniform_distro() {
541 let t = TDigest::new_with_size(100);
542 let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
543
544 let t = t.merge_sorted(values);
545
546 let ans = t.estimate_quantile(1.0);
547 let expected: f64 = 1_000_000.0;
548
549 let percentage: f64 = (expected - ans).abs() / expected;
550 assert!(percentage < 0.01);
551
552 let ans = t.estimate_quantile(0.99);
553 let expected: f64 = 990_000.0;
554
555 let percentage: f64 = (expected - ans).abs() / expected;
556 assert!(percentage < 0.01);
557
558 let ans = t.estimate_quantile(0.01);
559 let expected: f64 = 10_000.0;
560
561 let percentage: f64 = (expected - ans).abs() / expected;
562 assert!(percentage < 0.01);
563
564 let ans = t.estimate_quantile(0.0);
565 let expected: f64 = 1.0;
566
567 let percentage: f64 = (expected - ans).abs() / expected;
568 assert!(percentage < 0.01);
569
570 let ans = t.estimate_quantile(0.5);
571 let expected: f64 = 500_000.0;
572
573 let percentage: f64 = (expected - ans).abs() / expected;
574 assert!(percentage < 0.01);
575 }
576
577 #[test]
578 fn test_merge_unsorted_against_uniform_distro() {
579 let t = TDigest::new_with_size(100);
580 let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
581
582 let t = t.merge_unsorted(values);
583
584 let ans = t.estimate_quantile(1.0);
585 let expected: f64 = 1_000_000.0;
586
587 let percentage: f64 = (expected - ans).abs() / expected;
588 assert!(percentage < 0.01);
589
590 let ans = t.estimate_quantile(0.99);
591 let expected: f64 = 990_000.0;
592
593 let percentage: f64 = (expected - ans).abs() / expected;
594 assert!(percentage < 0.01);
595
596 let ans = t.estimate_quantile(0.01);
597 let expected: f64 = 10_000.0;
598
599 let percentage: f64 = (expected - ans).abs() / expected;
600 assert!(percentage < 0.01);
601
602 let ans = t.estimate_quantile(0.0);
603 let expected: f64 = 1.0;
604
605 let percentage: f64 = (expected - ans).abs() / expected;
606 assert!(percentage < 0.01);
607
608 let ans = t.estimate_quantile(0.5);
609 let expected: f64 = 500_000.0;
610
611 let percentage: f64 = (expected - ans).abs() / expected;
612 assert!(percentage < 0.01);
613 }
614
615 #[test]
616 fn test_merge_sorted_against_skewed_distro() {
617 let t = TDigest::new_with_size(100);
618 let mut values: Vec<f64> = (1..=600_000).map(f64::from).collect();
619 for _ in 0..400_000 {
620 values.push(1_000_000.0);
621 }
622
623 let t = t.merge_sorted(values);
624
625 let ans = t.estimate_quantile(0.99);
626 let expected: f64 = 1_000_000.0;
627 let percentage: f64 = (expected - ans).abs() / expected;
628 assert!(percentage < 0.01);
629
630 let ans = t.estimate_quantile(0.01);
631 let expected: f64 = 10_000.0;
632
633 let percentage: f64 = (expected - ans).abs() / expected;
634 assert!(percentage < 0.01);
635
636 let ans = t.estimate_quantile(0.5);
637 let expected: f64 = 500_000.0;
638
639 let percentage: f64 = (expected - ans).abs() / expected;
640 assert!(percentage < 0.01);
641 }
642
643 #[test]
644 fn test_merge_unsorted_against_skewed_distro() {
645 let t = TDigest::new_with_size(100);
646 let mut values: Vec<f64> = (1..=600_000).map(f64::from).collect();
647 for _ in 0..400_000 {
648 values.push(1_000_000.0);
649 }
650
651 let t = t.merge_unsorted(values);
652
653 let ans = t.estimate_quantile(0.99);
654 let expected: f64 = 1_000_000.0;
655 let percentage: f64 = (expected - ans).abs() / expected;
656 assert!(percentage < 0.01);
657
658 let ans = t.estimate_quantile(0.01);
659 let expected: f64 = 10_000.0;
660
661 let percentage: f64 = (expected - ans).abs() / expected;
662 assert!(percentage < 0.01);
663
664 let ans = t.estimate_quantile(0.5);
665 let expected: f64 = 500_000.0;
666
667 let percentage: f64 = (expected - ans).abs() / expected;
668 assert!(percentage < 0.01);
669 }
670
671 #[test]
672 fn test_merge_digests() {
673 let mut digests: Vec<TDigest> = Vec::new();
674
675 for _ in 1..=100 {
676 let t = TDigest::new_with_size(100);
677 let values: Vec<f64> = (1..=1_000).map(f64::from).collect();
678 let t = t.merge_sorted(values);
679 digests.push(t)
680 }
681
682 let t = TDigest::merge_digests(digests);
683
684 let ans = t.estimate_quantile(1.0);
685 let expected: f64 = 1000.0;
686
687 let percentage: f64 = (expected - ans).abs() / expected;
688 assert!(percentage < 0.01);
689
690 let ans = t.estimate_quantile(0.99);
691 let expected: f64 = 990.0;
692
693 let percentage: f64 = (expected - ans).abs() / expected;
694 assert!(percentage < 0.01);
695
696 let ans = t.estimate_quantile(0.01);
697 let expected: f64 = 10.0;
698
699 let percentage: f64 = (expected - ans).abs() / expected;
700 assert!(percentage < 0.2);
701
702 let ans = t.estimate_quantile(0.0);
703 let expected: f64 = 1.0;
704
705 let percentage: f64 = (expected - ans).abs() / expected;
706 assert!(percentage < 0.01);
707
708 let ans = t.estimate_quantile(0.5);
709 let expected: f64 = 500.0;
710
711 let percentage: f64 = (expected - ans).abs() / expected;
712 assert!(percentage < 0.01);
713 }
714}