Skip to main content

anomstream_core/
tdigest.rs

1//! Streaming quantile estimator — Ted Dunning's t-digest
2//! (Computing Extremely Accurate Quantiles Using t-Digests, 2019).
3//!
4//! [`crate::ScoreHistogram`] bins values in a fixed number of
5//! equal-width buckets — fine for central percentiles, lossy at the
6//! tails where SOC SLOs typically live (p99, p99.9). [`TDigest`]
7//! maintains a small set of **centroids** whose weight grows near
8//! the distribution tails and stays tight in the centre, giving
9//! sub-percent error on tail quantiles for `O(δ)` memory where `δ`
10//! is the compression parameter.
11//!
12//! # Scale function
13//!
14//! This implementation uses Dunning's **scale function 1**
15//! (`k_1(q) = (δ / 2π) · asin(2q − 1)`), which gives near-uniform
16//! error across the quantile range. Centroids may grow up to a
17//! weight of `total · (q_limit(q) − q)` where
18//! `q_limit(q) = (sin(2π · (k_1(q) + 1) / δ) + 1) / 2`, i.e. the
19//! quantile a single scale-function step above `q`.
20//!
21//! # Merging variant
22//!
23//! `record(x)` pushes into an **unsorted buffer**. When the buffer
24//! length exceeds `compression · 10`, or a quantile is queried, the
25//! buffer is flushed: sorted, then merged with the existing
26//! centroids via one linear pass that respects the scale-function
27//! weight bound. This amortises per-record cost to `O(1)` and keeps
28//! query latency bounded in `O(δ)`.
29
30use alloc::format;
31use alloc::vec::Vec;
32
33#[cfg(not(feature = "std"))]
34#[allow(unused_imports)]
35use num_traits::Float;
36
37use crate::error::{RcfError, RcfResult};
38
39/// Default compression parameter — 100 balances accuracy and
40/// memory, matches Dunning's reference implementation.
41pub const DEFAULT_COMPRESSION: f64 = 100.0;
42
43/// Buffer-flush trigger — when pending inserts exceed
44/// `compression · BUFFER_MULT`, flush and merge.
45const BUFFER_MULT: usize = 10;
46
47/// One centroid: mean plus weight. Weight is `f64` because
48/// compaction merges centroids; non-integer accumulations are
49/// native.
50#[derive(Debug, Clone, Copy, PartialEq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub struct Centroid {
53    /// Centroid mean — running average of every value that landed
54    /// in this centroid during compaction.
55    pub mean: f64,
56    /// Centroid weight — number of `record` values this centroid
57    /// summarises (sub-1 weights are possible in principle but this
58    /// implementation always starts them at 1.0).
59    pub weight: f64,
60}
61
62/// Streaming quantile estimator with tight-tail accuracy.
63///
64/// `TDigest` is the streaming analogue of a percentile sketch —
65/// `record(x)` is `O(1)` amortised, `quantile(q)` is `O(δ)`, and
66/// the maximum centroid count is `~2 · δ`.
67#[derive(Debug, Clone)]
68#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
69pub struct TDigest {
70    /// Compression — larger values → more centroids → more
71    /// accurate (especially on tails) at higher memory / merge
72    /// cost. Typical range `[20, 1000]`; `100` is a sane default.
73    compression: f64,
74    /// Sorted centroids (ascending `mean`). Always coherent after
75    /// [`Self::flush_buffer`].
76    centroids: Vec<Centroid>,
77    /// Unsorted insertion buffer. Drained by `flush_buffer`.
78    buffer: Vec<f64>,
79    /// Cached total weight — `centroids.weight_sum + buffer.len`
80    /// once pending inserts are flushed. Surfaced by
81    /// [`Self::total_weight`] for diagnostics.
82    total_weight: f64,
83    /// Running minimum across every `record` call. Lower-tail
84    /// queries (`quantile(q)` with very small `q`) extrapolate
85    /// between `min` and the first centroid.
86    min: f64,
87    /// Running maximum. Symmetric role on the upper tail.
88    max: f64,
89}
90
91impl TDigest {
92    /// Build a fresh digest with caller-chosen compression `δ`.
93    ///
94    /// # Errors
95    ///
96    /// Returns [`RcfError::InvalidConfig`] when `compression` is
97    /// non-finite or out of `[2, 10_000]`.
98    pub fn new(compression: f64) -> RcfResult<Self> {
99        if !compression.is_finite() || !(2.0..=10_000.0).contains(&compression) {
100            return Err(RcfError::InvalidConfig(
101                format!("TDigest: compression must be finite in [2, 10000], got {compression}")
102                    .into(),
103            ));
104        }
105        Ok(Self {
106            compression,
107            centroids: Vec::new(),
108            buffer: Vec::new(),
109            total_weight: 0.0,
110            min: f64::INFINITY,
111            max: f64::NEG_INFINITY,
112        })
113    }
114
115    /// Convenience: default compression ([`DEFAULT_COMPRESSION`] =
116    /// `100`).
117    #[must_use]
118    pub fn with_default_compression() -> Self {
119        // `DEFAULT_COMPRESSION` is in range by construction.
120        Self {
121            compression: DEFAULT_COMPRESSION,
122            centroids: Vec::new(),
123            buffer: Vec::new(),
124            total_weight: 0.0,
125            min: f64::INFINITY,
126            max: f64::NEG_INFINITY,
127        }
128    }
129
130    /// Compression parameter `δ`.
131    #[must_use]
132    pub fn compression(&self) -> f64 {
133        self.compression
134    }
135
136    /// Total weight across every `record` call (including pending
137    /// buffer entries).
138    #[must_use]
139    pub fn total_weight(&self) -> f64 {
140        #[allow(clippy::cast_precision_loss)]
141        let pending = self.buffer.len() as f64;
142        self.total_weight + pending
143    }
144
145    /// Number of centroids — bounded by `~2·compression` after a
146    /// flush.
147    #[must_use]
148    pub fn centroid_count(&self) -> usize {
149        self.centroids.len()
150    }
151
152    /// Observed running minimum. `None` when no values have been
153    /// recorded yet.
154    #[must_use]
155    pub fn min(&self) -> Option<f64> {
156        if self.min.is_finite() {
157            Some(self.min)
158        } else {
159            None
160        }
161    }
162
163    /// Observed running maximum. `None` when no values have been
164    /// recorded yet.
165    #[must_use]
166    pub fn max(&self) -> Option<f64> {
167        if self.max.is_finite() {
168            Some(self.max)
169        } else {
170            None
171        }
172    }
173
174    /// Fold a single observation into the digest. Non-finite
175    /// values are silently ignored — the digest has no way to
176    /// surface an error per-call and silently dropping matches
177    /// [`crate::ScoreHistogram::record`] semantics.
178    pub fn record(&mut self, value: f64) {
179        if !value.is_finite() {
180            return;
181        }
182        if value < self.min {
183            self.min = value;
184        }
185        if value > self.max {
186            self.max = value;
187        }
188        self.buffer.push(value);
189        #[allow(
190            clippy::cast_possible_truncation,
191            clippy::cast_sign_loss,
192            clippy::cast_precision_loss
193        )]
194        let threshold = (self.compression as usize).saturating_mul(BUFFER_MULT);
195        if self.buffer.len() >= threshold {
196            self.flush_buffer();
197        }
198    }
199
200    /// Force-flush the pending buffer. Callers normally don't need
201    /// this — [`Self::quantile`] flushes transparently — but it
202    /// helps bound memory in high-churn scenarios where quantiles
203    /// are queried rarely.
204    pub fn flush(&mut self) {
205        self.flush_buffer();
206    }
207
208    /// Quantile `q` in `[0, 1]`. Returns `None` when the digest is
209    /// empty; returns `min` at `q = 0` and `max` at `q = 1`.
210    #[must_use]
211    pub fn quantile(&mut self, q: f64) -> Option<f64> {
212        if !q.is_finite() || !(0.0..=1.0).contains(&q) {
213            return None;
214        }
215        self.flush_buffer();
216        if self.centroids.is_empty() {
217            return None;
218        }
219        if q <= 0.0 {
220            return Some(self.min);
221        }
222        if q >= 1.0 {
223            return Some(self.max);
224        }
225        let target = q * self.total_weight;
226
227        // Walk centroids, tracking cumulative weight. Interpolate
228        // between the two centroids that straddle `target`.
229        let mut cum = 0.0_f64;
230        let first = &self.centroids[0];
231        // Left of the first centroid → interpolate between `min`
232        // and first.mean.
233        let first_center = first.weight / 2.0;
234        if target < first_center {
235            if first.weight <= 1.0 || first_center <= 0.0 {
236                return Some(first.mean);
237            }
238            let frac = target / first_center;
239            return Some(self.min + frac * (first.mean - self.min));
240        }
241        cum += first.weight;
242
243        for i in 1..self.centroids.len() {
244            let prev = &self.centroids[i - 1];
245            let cur = &self.centroids[i];
246            let prev_center = cum - prev.weight / 2.0;
247            let cur_center = cum + cur.weight / 2.0;
248            if target < cur_center {
249                let span = cur_center - prev_center;
250                if span <= 0.0 {
251                    return Some(prev.mean);
252                }
253                let frac = (target - prev_center) / span;
254                return Some(prev.mean + frac * (cur.mean - prev.mean));
255            }
256            cum += cur.weight;
257        }
258
259        // Right of the last centroid → interpolate toward `max`.
260        let last = self.centroids.last()?;
261        let last_center = self.total_weight - last.weight / 2.0;
262        let span = self.total_weight - last_center;
263        if span <= 0.0 {
264            return Some(last.mean);
265        }
266        let frac = ((target - last_center) / span).clamp(0.0, 1.0);
267        Some(last.mean + frac * (self.max - last.mean))
268    }
269
270    /// Percentile — shorthand for `quantile(p / 100.0)`.
271    #[must_use]
272    pub fn percentile(&mut self, p: f64) -> Option<f64> {
273        self.quantile(p / 100.0)
274    }
275
276    /// Merge `other` into `self`. Both digests must share the same
277    /// compression parameter; the merge preserves distributional
278    /// accuracy by re-running the scale-function compression pass.
279    ///
280    /// # Errors
281    ///
282    /// Returns [`RcfError::InvalidConfig`] when `other.compression`
283    /// does not match `self.compression`.
284    pub fn merge(&mut self, other: &Self) -> RcfResult<()> {
285        #[allow(clippy::float_cmp)]
286        let compat = self.compression == other.compression;
287        if !compat {
288            return Err(RcfError::InvalidConfig(
289                format!(
290                    "TDigest merge: compression mismatch ({} vs {})",
291                    self.compression, other.compression
292                )
293                .into(),
294            ));
295        }
296        self.flush_buffer();
297        // Fold other's centroids + buffer into self's buffer, then
298        // flush — simplest path that round-trips through the same
299        // scale-function compaction.
300        for c in &other.centroids {
301            // Expand centroid back to `weight` copies of its mean —
302            // close-enough approximation because `mean` is the
303            // centroid's summary value.
304            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
305            let n = c.weight.round() as usize;
306            for _ in 0..n.max(1) {
307                self.buffer.push(c.mean);
308            }
309        }
310        for v in &other.buffer {
311            self.buffer.push(*v);
312        }
313        if other.min < self.min {
314            self.min = other.min;
315        }
316        if other.max > self.max {
317            self.max = other.max;
318        }
319        self.flush_buffer();
320        Ok(())
321    }
322
323    /// Drop every recorded value — digest goes back to its empty
324    /// post-construction state. Compression is preserved.
325    pub fn reset(&mut self) {
326        self.centroids.clear();
327        self.buffer.clear();
328        self.total_weight = 0.0;
329        self.min = f64::INFINITY;
330        self.max = f64::NEG_INFINITY;
331    }
332
333    /// Immutable view of the current centroid set — exposed for
334    /// diagnostics / persistence helpers. Empty until the first
335    /// flush.
336    #[must_use]
337    pub fn centroids(&self) -> &[Centroid] {
338        &self.centroids
339    }
340
341    /// Flush the buffer — sort buffer, merge with centroids using
342    /// scale-function 1 weight bounds.
343    fn flush_buffer(&mut self) {
344        if self.buffer.is_empty() {
345            return;
346        }
347        // Merge buffer + existing centroids into a sorted list
348        // sorted ascending by mean.
349        self.buffer
350            .sort_by(|a, b| a.partial_cmp(b).unwrap_or(core::cmp::Ordering::Equal));
351
352        // Combine buffer entries (unit weight) with prior centroids
353        // in a single sorted merge pass.
354        let mut combined: Vec<Centroid> =
355            Vec::with_capacity(self.centroids.len() + self.buffer.len());
356        let mut i = 0_usize;
357        let mut j = 0_usize;
358        while i < self.centroids.len() && j < self.buffer.len() {
359            let c = self.centroids[i];
360            let v = self.buffer[j];
361            if c.mean <= v {
362                combined.push(c);
363                i += 1;
364            } else {
365                combined.push(Centroid {
366                    mean: v,
367                    weight: 1.0,
368                });
369                j += 1;
370            }
371        }
372        while i < self.centroids.len() {
373            combined.push(self.centroids[i]);
374            i += 1;
375        }
376        while j < self.buffer.len() {
377            combined.push(Centroid {
378                mean: self.buffer[j],
379                weight: 1.0,
380            });
381            j += 1;
382        }
383        self.buffer.clear();
384
385        // Recompute total weight.
386        let total: f64 = combined.iter().map(|c| c.weight).sum();
387        self.total_weight = total;
388        if total <= 0.0 {
389            self.centroids = combined;
390            return;
391        }
392
393        // Compact via scale function 1.
394        let mut out: Vec<Centroid> = Vec::with_capacity(combined.len());
395        let mut cum = 0.0_f64;
396        let mut current = combined[0];
397        cum += current.weight;
398        for centroid in &combined[1..] {
399            let q0 = (cum - current.weight) / total;
400            let q1 = (cum + centroid.weight) / total;
401            let q_limit = q_limit_for(q0, self.compression);
402            // Merge into the current centroid when the resulting
403            // centroid's cumulative-weight upper bound stays within
404            // the quantile limit; else seal `current` and start a
405            // new one.
406            if q1 <= q_limit {
407                let new_weight = current.weight + centroid.weight;
408                current.mean =
409                    (current.mean * current.weight + centroid.mean * centroid.weight) / new_weight;
410                current.weight = new_weight;
411            } else {
412                out.push(current);
413                current = *centroid;
414            }
415            cum += centroid.weight;
416        }
417        out.push(current);
418        self.centroids = out;
419    }
420}
421
422/// Scale function 1 upper bound:
423/// `q_limit(q, δ) = (sin(2π · (k_1(q) + 1) / δ) + 1) / 2`, where
424/// `k_1(q) = (δ / 2π) · asin(2q − 1)`. Returns `q + 1/δ` as a
425/// fallback when the k-scale saturates.
426fn q_limit_for(q: f64, compression: f64) -> f64 {
427    use core::f64::consts::PI;
428    let clamped = q.clamp(0.0, 1.0);
429    let k = (compression / (2.0 * PI)) * (2.0 * clamped - 1.0).asin();
430    let next = (2.0 * PI * (k + 1.0) / compression).sin();
431    let limit = f64::midpoint(next, 1.0);
432    if limit.is_finite() && limit > clamped {
433        limit.min(1.0)
434    } else {
435        (clamped + 1.0 / compression).min(1.0)
436    }
437}
438
439#[cfg(test)]
440#[allow(
441    clippy::unwrap_used,
442    clippy::panic,
443    clippy::float_cmp,
444    clippy::cast_precision_loss,
445    clippy::cast_lossless
446)]
447mod tests {
448    use super::*;
449
450    #[test]
451    fn new_rejects_bad_compression() {
452        assert!(TDigest::new(0.0).is_err());
453        assert!(TDigest::new(1.0).is_err());
454        assert!(TDigest::new(f64::NAN).is_err());
455        assert!(TDigest::new(1.0e6).is_err());
456    }
457
458    #[test]
459    fn empty_quantile_returns_none() {
460        let mut d = TDigest::with_default_compression();
461        assert!(d.quantile(0.5).is_none());
462    }
463
464    #[test]
465    fn record_updates_min_max() {
466        let mut d = TDigest::with_default_compression();
467        d.record(5.0);
468        d.record(2.0);
469        d.record(8.0);
470        assert_eq!(d.min(), Some(2.0));
471        assert_eq!(d.max(), Some(8.0));
472    }
473
474    #[test]
475    fn record_ignores_nan_and_inf() {
476        let mut d = TDigest::with_default_compression();
477        d.record(f64::NAN);
478        d.record(f64::INFINITY);
479        d.record(1.0);
480        assert_eq!(d.total_weight(), 1.0);
481    }
482
483    #[test]
484    fn median_of_uniform_stream() {
485        let mut d = TDigest::with_default_compression();
486        for i in 0..10_000 {
487            d.record(i as f64);
488        }
489        let median = d.quantile(0.5).unwrap();
490        // True median is 4999.5; t-digest target is ~1 % error.
491        assert!((median - 4999.5).abs() < 150.0, "median = {median}");
492    }
493
494    #[test]
495    fn tail_quantiles_accurate_on_uniform() {
496        let mut d = TDigest::new(200.0).unwrap();
497        for i in 0..10_000 {
498            d.record(i as f64);
499        }
500        let p99 = d.quantile(0.99).unwrap();
501        let p999 = d.quantile(0.999).unwrap();
502        // True p99 = 9899, p99.9 = 9989. Allow < 1% absolute error
503        // on the uniform [0, 9999] range.
504        assert!((p99 - 9899.0).abs() < 100.0, "p99 = {p99}");
505        assert!((p999 - 9989.0).abs() < 100.0, "p99.9 = {p999}");
506    }
507
508    #[test]
509    fn percentile_is_quantile_over_100() {
510        let mut d = TDigest::with_default_compression();
511        for i in 0..1000 {
512            d.record(i as f64);
513        }
514        let q50 = d.quantile(0.5).unwrap();
515        let p50 = d.percentile(50.0).unwrap();
516        assert_eq!(q50, p50);
517    }
518
519    #[test]
520    fn quantile_0_returns_min_quantile_1_returns_max() {
521        let mut d = TDigest::with_default_compression();
522        for v in &[1.0, 2.0, 3.0, 100.0] {
523            d.record(*v);
524        }
525        assert_eq!(d.quantile(0.0), Some(1.0));
526        assert_eq!(d.quantile(1.0), Some(100.0));
527    }
528
529    #[test]
530    fn merge_two_digests_preserves_quantiles() {
531        let mut a = TDigest::new(200.0).unwrap();
532        let mut b = TDigest::new(200.0).unwrap();
533        for i in 0..5_000 {
534            a.record(i as f64);
535        }
536        for i in 5_000..10_000 {
537            b.record(i as f64);
538        }
539        a.merge(&b).unwrap();
540        let median = a.quantile(0.5).unwrap();
541        assert!((median - 4999.5).abs() < 200.0, "median = {median}");
542        assert_eq!(a.min(), Some(0.0));
543        assert_eq!(a.max(), Some(9999.0));
544    }
545
546    #[test]
547    fn merge_rejects_compression_mismatch() {
548        let mut a = TDigest::new(100.0).unwrap();
549        let b = TDigest::new(200.0).unwrap();
550        assert!(a.merge(&b).is_err());
551    }
552
553    #[test]
554    fn reset_drops_state() {
555        let mut d = TDigest::with_default_compression();
556        for i in 0..100 {
557            d.record(i as f64);
558        }
559        d.reset();
560        assert_eq!(d.total_weight(), 0.0);
561        assert!(d.min().is_none());
562        assert!(d.max().is_none());
563        assert!(d.quantile(0.5).is_none());
564    }
565
566    #[test]
567    fn centroid_count_bounded_by_compression() {
568        let mut d = TDigest::new(100.0).unwrap();
569        for i in 0..50_000 {
570            d.record(i as f64);
571        }
572        d.flush();
573        // Scale-function-1 bound gives <= ~ 2·δ centroids post-
574        // compaction. Allow a small slack for implementation
575        // rounding.
576        assert!(
577            d.centroid_count() <= 250,
578            "centroids = {}",
579            d.centroid_count()
580        );
581    }
582
583    #[test]
584    fn quantile_rejects_out_of_range() {
585        let mut d = TDigest::with_default_compression();
586        d.record(1.0);
587        assert!(d.quantile(-0.1).is_none());
588        assert!(d.quantile(1.1).is_none());
589        assert!(d.quantile(f64::NAN).is_none());
590    }
591
592    #[cfg(all(feature = "serde", feature = "postcard"))]
593    #[test]
594    fn postcard_roundtrip_preserves_quantiles() {
595        let mut d = TDigest::new(200.0).unwrap();
596        for i in 0..2_000 {
597            d.record(i as f64);
598        }
599        d.flush();
600        let bytes = postcard::to_allocvec(&d).unwrap();
601        let mut back: TDigest = postcard::from_bytes(&bytes).unwrap();
602        let before = d.quantile(0.9).unwrap();
603        let after = back.quantile(0.9).unwrap();
604        assert_eq!(before, after);
605    }
606}