tdigest/
lib.rs

1//! T-Digest algorithm in rust
2//!
3//! ## Installation
4//!
5//! Add this to your `Cargo.toml`:
6//!
7//! ```toml
8//! [dependencies]
9//! tdigest = "0.2"
10//! ```
11//!
12//! then you are good to go. If you are using Rust 2015 you have to ``extern crate tdigest`` to your crate root as well.
13//!
14//! ## Example
15//!
16//! ```rust
17//! use tdigest::TDigest;
18//!
19//! let t = TDigest::new_with_size(100);
20//! let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
21//!
22//! let t = t.merge_sorted(values);
23//!
24//! let ans = t.estimate_quantile(0.99);
25//! let expected: f64 = 990_000.0;
26//!
27//! let percentage: f64 = (expected - ans).abs() / expected;
28//! assert!(percentage < 0.01);
29//! ```
30
31use ordered_float::OrderedFloat;
32use std::cmp::Ordering;
33
34#[cfg(feature = "use_serde")]
35use serde::{Deserialize, Serialize};
36
37/// Centroid implementation to the cluster mentioned in the paper.
38#[derive(Debug, PartialEq, Eq, Clone)]
39#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
40pub struct Centroid {
41    mean: OrderedFloat<f64>,
42    weight: OrderedFloat<f64>,
43}
44
45impl PartialOrd for Centroid {
46    fn partial_cmp(&self, other: &Centroid) -> Option<Ordering> {
47        Some(self.cmp(other))
48    }
49}
50
51impl Ord for Centroid {
52    fn cmp(&self, other: &Centroid) -> Ordering {
53        self.mean.cmp(&other.mean)
54    }
55}
56
57impl Centroid {
58    pub fn new(mean: f64, weight: f64) -> Self {
59        Centroid {
60            mean: OrderedFloat::from(mean),
61            weight: OrderedFloat::from(weight),
62        }
63    }
64
65    #[inline]
66    pub fn mean(&self) -> f64 {
67        self.mean.into_inner()
68    }
69
70    #[inline]
71    pub fn weight(&self) -> f64 {
72        self.weight.into_inner()
73    }
74
75    pub fn add(&mut self, sum: f64, weight: f64) -> f64 {
76        let weight_: f64 = self.weight.into_inner();
77        let mean_: f64 = self.mean.into_inner();
78
79        let new_sum: f64 = sum + weight_ * mean_;
80        let new_weight: f64 = weight_ + weight;
81        self.weight = OrderedFloat::from(new_weight);
82        self.mean = OrderedFloat::from(new_sum / new_weight);
83        new_sum
84    }
85}
86
87impl Default for Centroid {
88    fn default() -> Self {
89        Centroid {
90            mean: OrderedFloat::from(0.0),
91            weight: OrderedFloat::from(1.0),
92        }
93    }
94}
95
96/// T-Digest to be operated on.
97#[derive(Debug, PartialEq, Eq, Clone)]
98#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
99pub struct TDigest {
100    centroids: Vec<Centroid>,
101    max_size: usize,
102    sum: OrderedFloat<f64>,
103    count: OrderedFloat<f64>,
104    max: OrderedFloat<f64>,
105    min: OrderedFloat<f64>,
106}
107
108impl TDigest {
109    pub fn new_with_size(max_size: usize) -> Self {
110        TDigest {
111            centroids: Vec::new(),
112            max_size,
113            sum: OrderedFloat::from(0.0),
114            count: OrderedFloat::from(0.0),
115            max: OrderedFloat::from(std::f64::NAN),
116            min: OrderedFloat::from(std::f64::NAN),
117        }
118    }
119
120    pub fn new(centroids: Vec<Centroid>, sum: f64, count: f64, max: f64, min: f64, max_size: usize) -> Self {
121        if centroids.len() <= max_size {
122            TDigest {
123                centroids,
124                max_size,
125                sum: OrderedFloat::from(sum),
126                count: OrderedFloat::from(count),
127                max: OrderedFloat::from(max),
128                min: OrderedFloat::from(min),
129            }
130        } else {
131            let sz = centroids.len();
132            let digests: Vec<TDigest> = vec![
133                TDigest::new_with_size(100),
134                TDigest::new(centroids, sum, count, max, min, sz),
135            ];
136
137            Self::merge_digests(digests)
138        }
139    }
140
141    #[inline]
142    pub fn mean(&self) -> f64 {
143        let count_: f64 = self.count.into_inner();
144        let sum_: f64 = self.sum.into_inner();
145
146        if count_ > 0.0 {
147            sum_ / count_
148        } else {
149            0.0
150        }
151    }
152
153    #[inline]
154    pub fn sum(&self) -> f64 {
155        self.sum.into_inner()
156    }
157
158    #[inline]
159    pub fn count(&self) -> f64 {
160        self.count.into_inner()
161    }
162
163    #[inline]
164    pub fn max(&self) -> f64 {
165        self.max.into_inner()
166    }
167
168    #[inline]
169    pub fn min(&self) -> f64 {
170        self.min.into_inner()
171    }
172
173    #[inline]
174    pub fn is_empty(&self) -> bool {
175        self.centroids.is_empty()
176    }
177
178    #[inline]
179    pub fn max_size(&self) -> usize {
180        self.max_size
181    }
182}
183
184impl Default for TDigest {
185    fn default() -> Self {
186        TDigest {
187            centroids: Vec::new(),
188            max_size: 100,
189            sum: OrderedFloat::from(0.0),
190            count: OrderedFloat::from(0.0),
191            max: OrderedFloat::from(std::f64::NAN),
192            min: OrderedFloat::from(std::f64::NAN),
193        }
194    }
195}
196
197impl TDigest {
198    fn k_to_q(k: f64, d: f64) -> f64 {
199        let k_div_d = k / d;
200        if k_div_d >= 0.5 {
201            let base = 1.0 - k_div_d;
202            1.0 - 2.0 * base * base
203        } else {
204            2.0 * k_div_d * k_div_d
205        }
206    }
207
208    fn clamp(v: f64, lo: f64, hi: f64) -> f64 {
209        if v > hi {
210            hi
211        } else if v < lo {
212            lo
213        } else {
214            v
215        }
216    }
217
218    pub fn merge_unsorted(&self, unsorted_values: Vec<f64>) -> TDigest {
219        let mut sorted_values: Vec<OrderedFloat<f64>> = unsorted_values.into_iter().map(OrderedFloat::from).collect();
220        sorted_values.sort();
221        let sorted_values = sorted_values.into_iter().map(|f| f.into_inner()).collect();
222
223        self.merge_sorted(sorted_values)
224    }
225
226    pub fn merge_sorted(&self, sorted_values: Vec<f64>) -> TDigest {
227        if sorted_values.is_empty() {
228            return self.clone();
229        }
230
231        let mut result = TDigest::new_with_size(self.max_size());
232        result.count = OrderedFloat::from(self.count() + (sorted_values.len() as f64));
233
234        let maybe_min = OrderedFloat::from(*sorted_values.first().unwrap());
235        let maybe_max = OrderedFloat::from(*sorted_values.last().unwrap());
236
237        if self.count() > 0.0 {
238            result.min = std::cmp::min(self.min, maybe_min);
239            result.max = std::cmp::max(self.max, maybe_max);
240        } else {
241            result.min = maybe_min;
242            result.max = maybe_max;
243        }
244
245        let mut compressed: Vec<Centroid> = Vec::with_capacity(self.max_size);
246
247        let mut k_limit: f64 = 1.0;
248        let mut q_limit_times_count: f64 = Self::k_to_q(k_limit, self.max_size as f64) * result.count.into_inner();
249        k_limit += 1.0;
250
251        let mut iter_centroids = self.centroids.iter().peekable();
252        let mut iter_sorted_values = sorted_values.iter().peekable();
253
254        let mut curr: Centroid = if let Some(c) = iter_centroids.peek() {
255            let curr = **iter_sorted_values.peek().unwrap();
256            if c.mean() < curr {
257                iter_centroids.next().unwrap().clone()
258            } else {
259                Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
260            }
261        } else {
262            Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
263        };
264
265        let mut weight_so_far: f64 = curr.weight();
266
267        let mut sums_to_merge: f64 = 0.0;
268        let mut weights_to_merge: f64 = 0.0;
269
270        while iter_centroids.peek().is_some() || iter_sorted_values.peek().is_some() {
271            let next: Centroid = if let Some(c) = iter_centroids.peek() {
272                if iter_sorted_values.peek().is_none() || c.mean() < **iter_sorted_values.peek().unwrap() {
273                    iter_centroids.next().unwrap().clone()
274                } else {
275                    Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
276                }
277            } else {
278                Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
279            };
280
281            let next_sum: f64 = next.mean() * next.weight();
282            weight_so_far += next.weight();
283
284            if weight_so_far <= q_limit_times_count {
285                sums_to_merge += next_sum;
286                weights_to_merge += next.weight();
287            } else {
288                result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
289                sums_to_merge = 0.0;
290                weights_to_merge = 0.0;
291
292                compressed.push(curr.clone());
293                q_limit_times_count = Self::k_to_q(k_limit, self.max_size as f64) * result.count();
294                k_limit += 1.0;
295                curr = next;
296            }
297        }
298
299        result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
300        compressed.push(curr);
301        compressed.shrink_to_fit();
302        compressed.sort();
303
304        result.centroids = compressed;
305        result
306    }
307
308    fn external_merge(centroids: &mut Vec<Centroid>, first: usize, middle: usize, last: usize) {
309        let mut result: Vec<Centroid> = Vec::with_capacity(centroids.len());
310
311        let mut i = first;
312        let mut j = middle;
313
314        while i < middle && j < last {
315            match centroids[i].cmp(&centroids[j]) {
316                Ordering::Less => {
317                    result.push(centroids[i].clone());
318                    i += 1;
319                }
320                Ordering::Greater => {
321                    result.push(centroids[j].clone());
322                    j += 1;
323                }
324                Ordering::Equal => {
325                    result.push(centroids[i].clone());
326                    i += 1;
327                }
328            }
329        }
330
331        while i < middle {
332            result.push(centroids[i].clone());
333            i += 1;
334        }
335
336        while j < last {
337            result.push(centroids[j].clone());
338            j += 1;
339        }
340
341        i = first;
342        for centroid in result.into_iter() {
343            centroids[i] = centroid;
344            i += 1;
345        }
346    }
347
348    // Merge multiple T-Digests
349    pub fn merge_digests(digests: Vec<TDigest>) -> TDigest {
350        let n_centroids: usize = digests.iter().map(|d| d.centroids.len()).sum();
351        if n_centroids == 0 {
352            return TDigest::default();
353        }
354
355        let max_size = digests.first().unwrap().max_size;
356        let mut centroids: Vec<Centroid> = Vec::with_capacity(n_centroids);
357        let mut starts: Vec<usize> = Vec::with_capacity(digests.len());
358
359        let mut count: f64 = 0.0;
360        let mut min = OrderedFloat::from(std::f64::INFINITY);
361        let mut max = OrderedFloat::from(std::f64::NEG_INFINITY);
362
363        let mut start: usize = 0;
364        for digest in digests.into_iter() {
365            starts.push(start);
366
367            let curr_count: f64 = digest.count();
368            if curr_count > 0.0 {
369                min = std::cmp::min(min, digest.min);
370                max = std::cmp::max(max, digest.max);
371                count += curr_count;
372                for centroid in digest.centroids {
373                    centroids.push(centroid);
374                    start += 1;
375                }
376            }
377        }
378
379        let mut digests_per_block: usize = 1;
380        while digests_per_block < starts.len() {
381            for i in (0..starts.len()).step_by(digests_per_block * 2) {
382                if i + digests_per_block < starts.len() {
383                    let first = starts[i];
384                    let middle = starts[i + digests_per_block];
385                    let last = if i + 2 * digests_per_block < starts.len() {
386                        starts[i + 2 * digests_per_block]
387                    } else {
388                        centroids.len()
389                    };
390
391                    debug_assert!(first <= middle && middle <= last);
392                    Self::external_merge(&mut centroids, first, middle, last);
393                }
394            }
395
396            digests_per_block *= 2;
397        }
398
399        let mut result = TDigest::new_with_size(max_size);
400        let mut compressed: Vec<Centroid> = Vec::with_capacity(max_size);
401
402        let mut k_limit: f64 = 1.0;
403        let mut q_limit_times_count: f64 = Self::k_to_q(k_limit, max_size as f64) * (count as f64);
404
405        let mut iter_centroids = centroids.iter_mut();
406        let mut curr = iter_centroids.next().unwrap();
407        let mut weight_so_far: f64 = curr.weight();
408        let mut sums_to_merge: f64 = 0.0;
409        let mut weights_to_merge: f64 = 0.0;
410
411        for centroid in iter_centroids {
412            weight_so_far += centroid.weight();
413
414            if weight_so_far <= q_limit_times_count {
415                sums_to_merge += centroid.mean() * centroid.weight();
416                weights_to_merge += centroid.weight();
417            } else {
418                result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
419                sums_to_merge = 0.0;
420                weights_to_merge = 0.0;
421                compressed.push(curr.clone());
422                q_limit_times_count = Self::k_to_q(k_limit, max_size as f64) * (count as f64);
423                k_limit += 1.0;
424                curr = centroid;
425            }
426        }
427
428        result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
429        compressed.push(curr.clone());
430        compressed.shrink_to_fit();
431        compressed.sort();
432
433        result.count = OrderedFloat::from(count as f64);
434        result.min = min;
435        result.max = max;
436        result.centroids = compressed;
437        result
438    }
439
440    /// To estimate the value located at `q` quantile
441    pub fn estimate_quantile(&self, q: f64) -> f64 {
442        if self.centroids.is_empty() {
443            return 0.0;
444        }
445
446        let count_: f64 = self.count.into_inner();
447        let rank: f64 = q * count_;
448
449        let mut pos: usize;
450        let mut t: f64;
451        if q > 0.5 {
452            if q >= 1.0 {
453                return self.max();
454            }
455
456            pos = 0;
457            t = count_;
458
459            for (k, centroid) in self.centroids.iter().enumerate().rev() {
460                t -= centroid.weight();
461
462                if rank >= t {
463                    pos = k;
464                    break;
465                }
466            }
467        } else {
468            if q <= 0.0 {
469                return self.min();
470            }
471
472            pos = self.centroids.len() - 1;
473            t = 0.0;
474
475            for (k, centroid) in self.centroids.iter().enumerate() {
476                if rank < t + centroid.weight() {
477                    pos = k;
478                    break;
479                }
480
481                t += centroid.weight();
482            }
483        }
484
485        let mut delta = 0.0;
486        let mut min: f64 = self.min.into_inner();
487        let mut max: f64 = self.max.into_inner();
488
489        if self.centroids.len() > 1 {
490            if pos == 0 {
491                delta = self.centroids[pos + 1].mean() - self.centroids[pos].mean();
492                max = self.centroids[pos + 1].mean();
493            } else if pos == (self.centroids.len() - 1) {
494                delta = self.centroids[pos].mean() - self.centroids[pos - 1].mean();
495                min = self.centroids[pos - 1].mean();
496            } else {
497                delta = (self.centroids[pos + 1].mean() - self.centroids[pos - 1].mean()) / 2.0;
498                min = self.centroids[pos - 1].mean();
499                max = self.centroids[pos + 1].mean();
500            }
501        }
502
503        let value = self.centroids[pos].mean() + ((rank - t) / self.centroids[pos].weight() - 0.5) * delta;
504        Self::clamp(value, min, max)
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511
512    #[test]
513    fn test_centroid_addition_regression() {
514        //https://github.com/MnO2/t-digest/pull/1
515
516        let vals = vec![1.0, 1.0, 1.0, 2.0, 1.0, 1.0];
517        let mut t = TDigest::new_with_size(10);
518
519        for v in vals {
520            t = t.merge_unsorted(vec![v]);
521        }
522
523        let ans = t.estimate_quantile(0.5);
524        let expected: f64 = 1.0;
525        let percentage: f64 = (expected - ans).abs() / expected;
526        assert!(percentage < 0.01);
527
528        let ans = t.estimate_quantile(0.95);
529        let expected: f64 = 2.0;
530        let percentage: f64 = (expected - ans).abs() / expected;
531        assert!(percentage < 0.01);
532    }
533
534    #[test]
535    fn test_merge_sorted_against_uniform_distro() {
536        let t = TDigest::new_with_size(100);
537        let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
538
539        let t = t.merge_sorted(values);
540
541        let ans = t.estimate_quantile(1.0);
542        let expected: f64 = 1_000_000.0;
543
544        let percentage: f64 = (expected - ans).abs() / expected;
545        assert!(percentage < 0.01);
546
547        let ans = t.estimate_quantile(0.99);
548        let expected: f64 = 990_000.0;
549
550        let percentage: f64 = (expected - ans).abs() / expected;
551        assert!(percentage < 0.01);
552
553        let ans = t.estimate_quantile(0.01);
554        let expected: f64 = 10_000.0;
555
556        let percentage: f64 = (expected - ans).abs() / expected;
557        assert!(percentage < 0.01);
558
559        let ans = t.estimate_quantile(0.0);
560        let expected: f64 = 1.0;
561
562        let percentage: f64 = (expected - ans).abs() / expected;
563        assert!(percentage < 0.01);
564
565        let ans = t.estimate_quantile(0.5);
566        let expected: f64 = 500_000.0;
567
568        let percentage: f64 = (expected - ans).abs() / expected;
569        assert!(percentage < 0.01);
570    }
571
572    #[test]
573    fn test_merge_unsorted_against_uniform_distro() {
574        let t = TDigest::new_with_size(100);
575        let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
576
577        let t = t.merge_unsorted(values);
578
579        let ans = t.estimate_quantile(1.0);
580        let expected: f64 = 1_000_000.0;
581
582        let percentage: f64 = (expected - ans).abs() / expected;
583        assert!(percentage < 0.01);
584
585        let ans = t.estimate_quantile(0.99);
586        let expected: f64 = 990_000.0;
587
588        let percentage: f64 = (expected - ans).abs() / expected;
589        assert!(percentage < 0.01);
590
591        let ans = t.estimate_quantile(0.01);
592        let expected: f64 = 10_000.0;
593
594        let percentage: f64 = (expected - ans).abs() / expected;
595        assert!(percentage < 0.01);
596
597        let ans = t.estimate_quantile(0.0);
598        let expected: f64 = 1.0;
599
600        let percentage: f64 = (expected - ans).abs() / expected;
601        assert!(percentage < 0.01);
602
603        let ans = t.estimate_quantile(0.5);
604        let expected: f64 = 500_000.0;
605
606        let percentage: f64 = (expected - ans).abs() / expected;
607        assert!(percentage < 0.01);
608    }
609
610    #[test]
611    fn test_merge_sorted_against_skewed_distro() {
612        let t = TDigest::new_with_size(100);
613        let mut values: Vec<f64> = (1..=600_000).map(f64::from).collect();
614        for _ in 0..400_000 {
615            values.push(1_000_000.0);
616        }
617
618        let t = t.merge_sorted(values);
619
620        let ans = t.estimate_quantile(0.99);
621        let expected: f64 = 1_000_000.0;
622        let percentage: f64 = (expected - ans).abs() / expected;
623        assert!(percentage < 0.01);
624
625        let ans = t.estimate_quantile(0.01);
626        let expected: f64 = 10_000.0;
627
628        let percentage: f64 = (expected - ans).abs() / expected;
629        assert!(percentage < 0.01);
630
631        let ans = t.estimate_quantile(0.5);
632        let expected: f64 = 500_000.0;
633
634        let percentage: f64 = (expected - ans).abs() / expected;
635        assert!(percentage < 0.01);
636    }
637
638    #[test]
639    fn test_merge_unsorted_against_skewed_distro() {
640        let t = TDigest::new_with_size(100);
641        let mut values: Vec<f64> = (1..=600_000).map(f64::from).collect();
642        for _ in 0..400_000 {
643            values.push(1_000_000.0);
644        }
645
646        let t = t.merge_unsorted(values);
647
648        let ans = t.estimate_quantile(0.99);
649        let expected: f64 = 1_000_000.0;
650        let percentage: f64 = (expected - ans).abs() / expected;
651        assert!(percentage < 0.01);
652
653        let ans = t.estimate_quantile(0.01);
654        let expected: f64 = 10_000.0;
655
656        let percentage: f64 = (expected - ans).abs() / expected;
657        assert!(percentage < 0.01);
658
659        let ans = t.estimate_quantile(0.5);
660        let expected: f64 = 500_000.0;
661
662        let percentage: f64 = (expected - ans).abs() / expected;
663        assert!(percentage < 0.01);
664    }
665
666    #[test]
667    fn test_merge_digests() {
668        let mut digests: Vec<TDigest> = Vec::new();
669
670        for _ in 1..=100 {
671            let t = TDigest::new_with_size(100);
672            let values: Vec<f64> = (1..=1_000).map(f64::from).collect();
673            let t = t.merge_sorted(values);
674            digests.push(t)
675        }
676
677        let t = TDigest::merge_digests(digests);
678
679        let ans = t.estimate_quantile(1.0);
680        let expected: f64 = 1000.0;
681
682        let percentage: f64 = (expected - ans).abs() / expected;
683        assert!(percentage < 0.01);
684
685        let ans = t.estimate_quantile(0.99);
686        let expected: f64 = 990.0;
687
688        let percentage: f64 = (expected - ans).abs() / expected;
689        assert!(percentage < 0.01);
690
691        let ans = t.estimate_quantile(0.01);
692        let expected: f64 = 10.0;
693
694        let percentage: f64 = (expected - ans).abs() / expected;
695        assert!(percentage < 0.2);
696
697        let ans = t.estimate_quantile(0.0);
698        let expected: f64 = 1.0;
699
700        let percentage: f64 = (expected - ans).abs() / expected;
701        assert!(percentage < 0.01);
702
703        let ans = t.estimate_quantile(0.5);
704        let expected: f64 = 500.0;
705
706        let percentage: f64 = (expected - ans).abs() / expected;
707        assert!(percentage < 0.01);
708    }
709}