tdigest_rs/
lib.rs

1//! T-Digest algorithm in rust
2//!
3//! ## Installation
4//!
5//! Add this to your `Cargo.toml`:
6//!
7//! ```toml
8//! [dependencies]
9//! tdigest = "0.2"
10//! ```
11//!
12//! then you are good to go. If you are using Rust 2015 you have to ``extern crate tdigest`` to your crate root as well.
13//!
14//! ## Example
15//!
16//! ```rust
17//! use tdigest::TDigest;
18//!
19//! let t = TDigest::new_with_size(100);
20//! let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
21//!
22//! let t = t.merge_sorted(values);
23//!
24//! let ans = t.estimate_quantile(0.99);
25//! let expected: f64 = 990_000.0;
26//!
27//! let percentage: f64 = (expected - ans).abs() / expected;
28//! assert!(percentage < 0.01);
29//! ```
30
31use ordered_float::OrderedFloat;
32use std::cmp::Ordering;
33
34#[cfg(feature = "use_serde")]
35use serde::{Deserialize, Serialize};
36
37/// Centroid implementation to the cluster mentioned in the paper.
38#[derive(Debug, PartialEq, Eq, Clone)]
39#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
40pub struct Centroid {
41    mean: OrderedFloat<f64>,
42    weight: OrderedFloat<f64>,
43}
44
45impl PartialOrd for Centroid {
46    fn partial_cmp(&self, other: &Centroid) -> Option<Ordering> {
47        Some(self.cmp(other))
48    }
49}
50
51impl Ord for Centroid {
52    fn cmp(&self, other: &Centroid) -> Ordering {
53        self.mean.cmp(&other.mean)
54    }
55}
56
57impl Centroid {
58    pub fn new(mean: f64, weight: f64) -> Self {
59        Centroid {
60            mean: OrderedFloat::from(mean),
61            weight: OrderedFloat::from(weight),
62        }
63    }
64
65    #[inline]
66    pub fn mean(&self) -> f64 {
67        self.mean.into_inner()
68    }
69
70    #[inline]
71    pub fn weight(&self) -> f64 {
72        self.weight.into_inner()
73    }
74
75    pub fn add(&mut self, sum: f64, weight: f64) -> f64 {
76        let weight_: f64 = self.weight.into_inner();
77        let mean_: f64 = self.mean.into_inner();
78
79        let new_sum: f64 = sum + weight_ * mean_;
80        let new_weight: f64 = weight_ + weight;
81        self.weight = OrderedFloat::from(new_weight);
82        self.mean = OrderedFloat::from(new_sum / new_weight);
83        new_sum
84    }
85}
86
87impl Default for Centroid {
88    fn default() -> Self {
89        Centroid {
90            mean: OrderedFloat::from(0.0),
91            weight: OrderedFloat::from(1.0),
92        }
93    }
94}
95
96/// T-Digest to be operated on.
97#[derive(Debug, PartialEq, Eq, Clone)]
98#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
99pub struct TDigest {
100    centroids: Vec<Centroid>,
101    max_size: usize,
102    sum: OrderedFloat<f64>,
103    count: OrderedFloat<f64>,
104    max: OrderedFloat<f64>,
105    min: OrderedFloat<f64>,
106}
107
108impl TDigest {
109    pub fn new_with_size(max_size: usize) -> Self {
110        TDigest {
111            centroids: Vec::new(),
112            max_size,
113            sum: OrderedFloat::from(0.0),
114            count: OrderedFloat::from(0.0),
115            max: OrderedFloat::from(std::f64::NAN),
116            min: OrderedFloat::from(std::f64::NAN),
117        }
118    }
119
120    pub fn new(centroids: Vec<Centroid>, sum: f64, count: f64, max: f64, min: f64, max_size: usize) -> Self {
121        if centroids.len() <= max_size {
122            TDigest {
123                centroids,
124                max_size,
125                sum: OrderedFloat::from(sum),
126                count: OrderedFloat::from(count),
127                max: OrderedFloat::from(max),
128                min: OrderedFloat::from(min),
129            }
130        } else {
131            let sz = centroids.len();
132            let digests: Vec<TDigest> = vec![
133                TDigest::new_with_size(100),
134                TDigest::new(centroids, sum, count, max, min, sz),
135            ];
136
137            Self::merge_digests(digests)
138        }
139    }
140
141    #[inline]
142    pub fn mean(&self) -> f64 {
143        let count_: f64 = self.count.into_inner();
144        let sum_: f64 = self.sum.into_inner();
145
146        if count_ > 0.0 {
147            sum_ / count_
148        } else {
149            0.0
150        }
151    }
152
153    #[inline]
154    pub fn sum(&self) -> f64 {
155        self.sum.into_inner()
156    }
157
158    #[inline]
159    pub fn centroids(&self) -> Vec<Centroid> {
160        self.centroids.clone()
161    }
162
163    #[inline]
164    pub fn count(&self) -> f64 {
165        self.count.into_inner()
166    }
167
168    #[inline]
169    pub fn max(&self) -> f64 {
170        self.max.into_inner()
171    }
172
173    #[inline]
174    pub fn min(&self) -> f64 {
175        self.min.into_inner()
176    }
177
178    #[inline]
179    pub fn is_empty(&self) -> bool {
180        self.centroids.is_empty()
181    }
182
183    #[inline]
184    pub fn max_size(&self) -> usize {
185        self.max_size
186    }
187}
188
189impl Default for TDigest {
190    fn default() -> Self {
191        TDigest {
192            centroids: Vec::new(),
193            max_size: 100,
194            sum: OrderedFloat::from(0.0),
195            count: OrderedFloat::from(0.0),
196            max: OrderedFloat::from(std::f64::NAN),
197            min: OrderedFloat::from(std::f64::NAN),
198        }
199    }
200}
201
202impl TDigest {
203    fn k_to_q(k: f64, d: f64) -> f64 {
204        let k_div_d = k / d;
205        if k_div_d >= 0.5 {
206            let base = 1.0 - k_div_d;
207            1.0 - 2.0 * base * base
208        } else {
209            2.0 * k_div_d * k_div_d
210        }
211    }
212
213    fn clamp(v: f64, lo: f64, hi: f64) -> f64 {
214        if v > hi {
215            hi
216        } else if v < lo {
217            lo
218        } else {
219            v
220        }
221    }
222
223    pub fn merge_unsorted(&self, unsorted_values: Vec<f64>) -> TDigest {
224        let mut sorted_values: Vec<OrderedFloat<f64>> = unsorted_values.into_iter().map(OrderedFloat::from).collect();
225        sorted_values.sort();
226        let sorted_values = sorted_values.into_iter().map(|f| f.into_inner()).collect();
227
228        self.merge_sorted(sorted_values)
229    }
230
231    pub fn merge_sorted(&self, sorted_values: Vec<f64>) -> TDigest {
232        if sorted_values.is_empty() {
233            return self.clone();
234        }
235
236        let mut result = TDigest::new_with_size(self.max_size());
237        result.count = OrderedFloat::from(self.count() + (sorted_values.len() as f64));
238
239        let maybe_min = OrderedFloat::from(*sorted_values.first().unwrap());
240        let maybe_max = OrderedFloat::from(*sorted_values.last().unwrap());
241
242        if self.count() > 0.0 {
243            result.min = std::cmp::min(self.min, maybe_min);
244            result.max = std::cmp::max(self.max, maybe_max);
245        } else {
246            result.min = maybe_min;
247            result.max = maybe_max;
248        }
249
250        let mut compressed: Vec<Centroid> = Vec::with_capacity(self.max_size);
251
252        let mut k_limit: f64 = 1.0;
253        let mut q_limit_times_count: f64 = Self::k_to_q(k_limit, self.max_size as f64) * result.count.into_inner();
254        k_limit += 1.0;
255
256        let mut iter_centroids = self.centroids.iter().peekable();
257        let mut iter_sorted_values = sorted_values.iter().peekable();
258
259        let mut curr: Centroid = if let Some(c) = iter_centroids.peek() {
260            let curr = **iter_sorted_values.peek().unwrap();
261            if c.mean() < curr {
262                iter_centroids.next().unwrap().clone()
263            } else {
264                Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
265            }
266        } else {
267            Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
268        };
269
270        let mut weight_so_far: f64 = curr.weight();
271
272        let mut sums_to_merge: f64 = 0.0;
273        let mut weights_to_merge: f64 = 0.0;
274
275        while iter_centroids.peek().is_some() || iter_sorted_values.peek().is_some() {
276            let next: Centroid = if let Some(c) = iter_centroids.peek() {
277                if iter_sorted_values.peek().is_none() || c.mean() < **iter_sorted_values.peek().unwrap() {
278                    iter_centroids.next().unwrap().clone()
279                } else {
280                    Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
281                }
282            } else {
283                Centroid::new(*iter_sorted_values.next().unwrap(), 1.0)
284            };
285
286            let next_sum: f64 = next.mean() * next.weight();
287            weight_so_far += next.weight();
288
289            if weight_so_far <= q_limit_times_count {
290                sums_to_merge += next_sum;
291                weights_to_merge += next.weight();
292            } else {
293                result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
294                sums_to_merge = 0.0;
295                weights_to_merge = 0.0;
296
297                compressed.push(curr.clone());
298                q_limit_times_count = Self::k_to_q(k_limit, self.max_size as f64) * result.count();
299                k_limit += 1.0;
300                curr = next;
301            }
302        }
303
304        result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
305        compressed.push(curr);
306        compressed.shrink_to_fit();
307        compressed.sort();
308
309        result.centroids = compressed;
310        result
311    }
312
313    fn external_merge(centroids: &mut Vec<Centroid>, first: usize, middle: usize, last: usize) {
314        let mut result: Vec<Centroid> = Vec::with_capacity(centroids.len());
315
316        let mut i = first;
317        let mut j = middle;
318
319        while i < middle && j < last {
320            match centroids[i].cmp(&centroids[j]) {
321                Ordering::Less => {
322                    result.push(centroids[i].clone());
323                    i += 1;
324                }
325                Ordering::Greater => {
326                    result.push(centroids[j].clone());
327                    j += 1;
328                }
329                Ordering::Equal => {
330                    result.push(centroids[i].clone());
331                    i += 1;
332                }
333            }
334        }
335
336        while i < middle {
337            result.push(centroids[i].clone());
338            i += 1;
339        }
340
341        while j < last {
342            result.push(centroids[j].clone());
343            j += 1;
344        }
345
346        i = first;
347        for centroid in result.into_iter() {
348            centroids[i] = centroid;
349            i += 1;
350        }
351    }
352
353    // Merge multiple T-Digests
354    pub fn merge_digests(digests: Vec<TDigest>) -> TDigest {
355        let n_centroids: usize = digests.iter().map(|d| d.centroids.len()).sum();
356        if n_centroids == 0 {
357            return TDigest::default();
358        }
359
360        let max_size = digests.first().unwrap().max_size;
361        let mut centroids: Vec<Centroid> = Vec::with_capacity(n_centroids);
362        let mut starts: Vec<usize> = Vec::with_capacity(digests.len());
363
364        let mut count: f64 = 0.0;
365        let mut min = OrderedFloat::from(std::f64::INFINITY);
366        let mut max = OrderedFloat::from(std::f64::NEG_INFINITY);
367
368        let mut start: usize = 0;
369        for digest in digests.into_iter() {
370            starts.push(start);
371
372            let curr_count: f64 = digest.count();
373            if curr_count > 0.0 {
374                min = std::cmp::min(min, digest.min);
375                max = std::cmp::max(max, digest.max);
376                count += curr_count;
377                for centroid in digest.centroids {
378                    centroids.push(centroid);
379                    start += 1;
380                }
381            }
382        }
383
384        let mut digests_per_block: usize = 1;
385        while digests_per_block < starts.len() {
386            for i in (0..starts.len()).step_by(digests_per_block * 2) {
387                if i + digests_per_block < starts.len() {
388                    let first = starts[i];
389                    let middle = starts[i + digests_per_block];
390                    let last = if i + 2 * digests_per_block < starts.len() {
391                        starts[i + 2 * digests_per_block]
392                    } else {
393                        centroids.len()
394                    };
395
396                    debug_assert!(first <= middle && middle <= last);
397                    Self::external_merge(&mut centroids, first, middle, last);
398                }
399            }
400
401            digests_per_block *= 2;
402        }
403
404        let mut result = TDigest::new_with_size(max_size);
405        let mut compressed: Vec<Centroid> = Vec::with_capacity(max_size);
406
407        let mut k_limit: f64 = 1.0;
408        let mut q_limit_times_count: f64 = Self::k_to_q(k_limit, max_size as f64) * (count as f64);
409
410        let mut iter_centroids = centroids.iter_mut();
411        let mut curr = iter_centroids.next().unwrap();
412        let mut weight_so_far: f64 = curr.weight();
413        let mut sums_to_merge: f64 = 0.0;
414        let mut weights_to_merge: f64 = 0.0;
415
416        for centroid in iter_centroids {
417            weight_so_far += centroid.weight();
418
419            if weight_so_far <= q_limit_times_count {
420                sums_to_merge += centroid.mean() * centroid.weight();
421                weights_to_merge += centroid.weight();
422            } else {
423                result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
424                sums_to_merge = 0.0;
425                weights_to_merge = 0.0;
426                compressed.push(curr.clone());
427                q_limit_times_count = Self::k_to_q(k_limit, max_size as f64) * (count as f64);
428                k_limit += 1.0;
429                curr = centroid;
430            }
431        }
432
433        result.sum = OrderedFloat::from(result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge));
434        compressed.push(curr.clone());
435        compressed.shrink_to_fit();
436        compressed.sort();
437
438        result.count = OrderedFloat::from(count as f64);
439        result.min = min;
440        result.max = max;
441        result.centroids = compressed;
442        result
443    }
444
445    /// To estimate the value located at `q` quantile
446    pub fn estimate_quantile(&self, q: f64) -> f64 {
447        if self.centroids.is_empty() {
448            return 0.0;
449        }
450
451        let count_: f64 = self.count.into_inner();
452        let rank: f64 = q * count_;
453
454        let mut pos: usize;
455        let mut t: f64;
456        if q > 0.5 {
457            if q >= 1.0 {
458                return self.max();
459            }
460
461            pos = 0;
462            t = count_;
463
464            for (k, centroid) in self.centroids.iter().enumerate().rev() {
465                t -= centroid.weight();
466
467                if rank >= t {
468                    pos = k;
469                    break;
470                }
471            }
472        } else {
473            if q <= 0.0 {
474                return self.min();
475            }
476
477            pos = self.centroids.len() - 1;
478            t = 0.0;
479
480            for (k, centroid) in self.centroids.iter().enumerate() {
481                if rank < t + centroid.weight() {
482                    pos = k;
483                    break;
484                }
485
486                t += centroid.weight();
487            }
488        }
489
490        let mut delta = 0.0;
491        let mut min: f64 = self.min.into_inner();
492        let mut max: f64 = self.max.into_inner();
493
494        if self.centroids.len() > 1 {
495            if pos == 0 {
496                delta = self.centroids[pos + 1].mean() - self.centroids[pos].mean();
497                max = self.centroids[pos + 1].mean();
498            } else if pos == (self.centroids.len() - 1) {
499                delta = self.centroids[pos].mean() - self.centroids[pos - 1].mean();
500                min = self.centroids[pos - 1].mean();
501            } else {
502                delta = (self.centroids[pos + 1].mean() - self.centroids[pos - 1].mean()) / 2.0;
503                min = self.centroids[pos - 1].mean();
504                max = self.centroids[pos + 1].mean();
505            }
506        }
507
508        let value = self.centroids[pos].mean() + ((rank - t) / self.centroids[pos].weight() - 0.5) * delta;
509        Self::clamp(value, min, max)
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516
517    #[test]
518    fn test_centroid_addition_regression() {
519        //https://github.com/MnO2/t-digest/pull/1
520
521        let vals = vec![1.0, 1.0, 1.0, 2.0, 1.0, 1.0];
522        let mut t = TDigest::new_with_size(10);
523
524        for v in vals {
525            t = t.merge_unsorted(vec![v]);
526        }
527
528        let ans = t.estimate_quantile(0.5);
529        let expected: f64 = 1.0;
530        let percentage: f64 = (expected - ans).abs() / expected;
531        assert!(percentage < 0.01);
532
533        let ans = t.estimate_quantile(0.95);
534        let expected: f64 = 2.0;
535        let percentage: f64 = (expected - ans).abs() / expected;
536        assert!(percentage < 0.01);
537    }
538
539    #[test]
540    fn test_merge_sorted_against_uniform_distro() {
541        let t = TDigest::new_with_size(100);
542        let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
543
544        let t = t.merge_sorted(values);
545
546        let ans = t.estimate_quantile(1.0);
547        let expected: f64 = 1_000_000.0;
548
549        let percentage: f64 = (expected - ans).abs() / expected;
550        assert!(percentage < 0.01);
551
552        let ans = t.estimate_quantile(0.99);
553        let expected: f64 = 990_000.0;
554
555        let percentage: f64 = (expected - ans).abs() / expected;
556        assert!(percentage < 0.01);
557
558        let ans = t.estimate_quantile(0.01);
559        let expected: f64 = 10_000.0;
560
561        let percentage: f64 = (expected - ans).abs() / expected;
562        assert!(percentage < 0.01);
563
564        let ans = t.estimate_quantile(0.0);
565        let expected: f64 = 1.0;
566
567        let percentage: f64 = (expected - ans).abs() / expected;
568        assert!(percentage < 0.01);
569
570        let ans = t.estimate_quantile(0.5);
571        let expected: f64 = 500_000.0;
572
573        let percentage: f64 = (expected - ans).abs() / expected;
574        assert!(percentage < 0.01);
575    }
576
577    #[test]
578    fn test_merge_unsorted_against_uniform_distro() {
579        let t = TDigest::new_with_size(100);
580        let values: Vec<f64> = (1..=1_000_000).map(f64::from).collect();
581
582        let t = t.merge_unsorted(values);
583
584        let ans = t.estimate_quantile(1.0);
585        let expected: f64 = 1_000_000.0;
586
587        let percentage: f64 = (expected - ans).abs() / expected;
588        assert!(percentage < 0.01);
589
590        let ans = t.estimate_quantile(0.99);
591        let expected: f64 = 990_000.0;
592
593        let percentage: f64 = (expected - ans).abs() / expected;
594        assert!(percentage < 0.01);
595
596        let ans = t.estimate_quantile(0.01);
597        let expected: f64 = 10_000.0;
598
599        let percentage: f64 = (expected - ans).abs() / expected;
600        assert!(percentage < 0.01);
601
602        let ans = t.estimate_quantile(0.0);
603        let expected: f64 = 1.0;
604
605        let percentage: f64 = (expected - ans).abs() / expected;
606        assert!(percentage < 0.01);
607
608        let ans = t.estimate_quantile(0.5);
609        let expected: f64 = 500_000.0;
610
611        let percentage: f64 = (expected - ans).abs() / expected;
612        assert!(percentage < 0.01);
613    }
614
615    #[test]
616    fn test_merge_sorted_against_skewed_distro() {
617        let t = TDigest::new_with_size(100);
618        let mut values: Vec<f64> = (1..=600_000).map(f64::from).collect();
619        for _ in 0..400_000 {
620            values.push(1_000_000.0);
621        }
622
623        let t = t.merge_sorted(values);
624
625        let ans = t.estimate_quantile(0.99);
626        let expected: f64 = 1_000_000.0;
627        let percentage: f64 = (expected - ans).abs() / expected;
628        assert!(percentage < 0.01);
629
630        let ans = t.estimate_quantile(0.01);
631        let expected: f64 = 10_000.0;
632
633        let percentage: f64 = (expected - ans).abs() / expected;
634        assert!(percentage < 0.01);
635
636        let ans = t.estimate_quantile(0.5);
637        let expected: f64 = 500_000.0;
638
639        let percentage: f64 = (expected - ans).abs() / expected;
640        assert!(percentage < 0.01);
641    }
642
643    #[test]
644    fn test_merge_unsorted_against_skewed_distro() {
645        let t = TDigest::new_with_size(100);
646        let mut values: Vec<f64> = (1..=600_000).map(f64::from).collect();
647        for _ in 0..400_000 {
648            values.push(1_000_000.0);
649        }
650
651        let t = t.merge_unsorted(values);
652
653        let ans = t.estimate_quantile(0.99);
654        let expected: f64 = 1_000_000.0;
655        let percentage: f64 = (expected - ans).abs() / expected;
656        assert!(percentage < 0.01);
657
658        let ans = t.estimate_quantile(0.01);
659        let expected: f64 = 10_000.0;
660
661        let percentage: f64 = (expected - ans).abs() / expected;
662        assert!(percentage < 0.01);
663
664        let ans = t.estimate_quantile(0.5);
665        let expected: f64 = 500_000.0;
666
667        let percentage: f64 = (expected - ans).abs() / expected;
668        assert!(percentage < 0.01);
669    }
670
671    #[test]
672    fn test_merge_digests() {
673        let mut digests: Vec<TDigest> = Vec::new();
674
675        for _ in 1..=100 {
676            let t = TDigest::new_with_size(100);
677            let values: Vec<f64> = (1..=1_000).map(f64::from).collect();
678            let t = t.merge_sorted(values);
679            digests.push(t)
680        }
681
682        let t = TDigest::merge_digests(digests);
683
684        let ans = t.estimate_quantile(1.0);
685        let expected: f64 = 1000.0;
686
687        let percentage: f64 = (expected - ans).abs() / expected;
688        assert!(percentage < 0.01);
689
690        let ans = t.estimate_quantile(0.99);
691        let expected: f64 = 990.0;
692
693        let percentage: f64 = (expected - ans).abs() / expected;
694        assert!(percentage < 0.01);
695
696        let ans = t.estimate_quantile(0.01);
697        let expected: f64 = 10.0;
698
699        let percentage: f64 = (expected - ans).abs() / expected;
700        assert!(percentage < 0.2);
701
702        let ans = t.estimate_quantile(0.0);
703        let expected: f64 = 1.0;
704
705        let percentage: f64 = (expected - ans).abs() / expected;
706        assert!(percentage < 0.01);
707
708        let ans = t.estimate_quantile(0.5);
709        let expected: f64 = 500.0;
710
711        let percentage: f64 = (expected - ans).abs() / expected;
712        assert!(percentage < 0.01);
713    }
714}