const DEFAULT_COMPRESSION: f64 = 100.0;
#[derive(Debug, Clone, Copy, PartialEq)]
struct Centroid {
mean: f64,
weight: f64,
}
#[derive(Debug, Clone)]
pub struct TDigest {
compression: f64,
centroids: Vec<Centroid>,
buffer: Vec<Centroid>,
buffer_limit: usize,
total_weight: f64,
min: f64,
max: f64,
}
impl Default for TDigest {
fn default() -> Self {
Self::with_compression(DEFAULT_COMPRESSION)
}
}
impl TDigest {
pub fn new() -> Self {
Self::default()
}
pub fn with_compression(compression: f64) -> Self {
let c = compression.max(10.0);
Self {
compression: c,
centroids: Vec::new(),
buffer: Vec::new(),
buffer_limit: (6.0 * c) as usize + 10,
total_weight: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
}
}
pub fn is_empty(&self) -> bool {
self.total_weight == 0.0
}
pub fn count(&self) -> f64 {
self.total_weight
}
pub fn min(&self) -> f64 {
self.min
}
pub fn max(&self) -> f64 {
self.max
}
pub fn add(&mut self, value: f64) {
self.add_weighted(value, 1.0);
}
pub fn add_weighted(&mut self, value: f64, weight: f64) {
if !value.is_finite() || weight <= 0.0 {
return;
}
self.buffer.push(Centroid {
mean: value,
weight,
});
self.total_weight += weight;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value;
}
if self.buffer.len() >= self.buffer_limit {
self.flush();
}
}
fn flush(&mut self) {
if self.buffer.is_empty() {
return;
}
let mut merged = Vec::with_capacity(self.centroids.len() + self.buffer.len());
merged.extend_from_slice(&self.centroids);
merged.extend_from_slice(&self.buffer);
merged.sort_by(|a, b| {
a.mean
.partial_cmp(&b.mean)
.unwrap_or(std::cmp::Ordering::Equal)
});
self.centroids = compact(&merged, self.compression, self.total_weight);
self.buffer.clear();
}
pub fn merge(&mut self, other: &TDigest) {
if other.is_empty() {
return;
}
self.total_weight += other.total_weight;
if other.min < self.min {
self.min = other.min;
}
if other.max > self.max {
self.max = other.max;
}
self.buffer.extend_from_slice(&other.centroids);
self.buffer.extend_from_slice(&other.buffer);
self.flush();
}
pub fn quantile(&self, q: f64) -> f64 {
if self.is_empty() {
return f64::NAN;
}
let mut snapshot = self.clone();
snapshot.flush();
if snapshot.centroids.is_empty() {
return snapshot.min;
}
if q <= 0.0 {
return snapshot.min;
}
if q >= 1.0 {
return snapshot.max;
}
let target = q * snapshot.total_weight;
let mut cumulative = 0.0;
let mut prev: Option<(f64, f64)> = None; for c in &snapshot.centroids {
let next = cumulative + c.weight;
if target <= next {
if let Some((pm, pc)) = prev {
let span = cumulative - pc;
if span <= 0.0 {
return c.mean;
}
let frac = (target - pc) / span;
let clamped = frac.clamp(0.0, 1.0);
return pm + (c.mean - pm) * clamped;
}
return c.mean;
}
cumulative = next;
prev = Some((c.mean, cumulative));
}
snapshot.max
}
}
fn compact(sorted: &[Centroid], compression: f64, total: f64) -> Vec<Centroid> {
let mut out: Vec<Centroid> = Vec::new();
if sorted.is_empty() || total <= 0.0 {
return out;
}
let mut cumulative = 0.0;
let mut current = sorted[0];
cumulative += current.weight;
for next in &sorted[1..] {
let q0 = (cumulative - current.weight) / total;
let q1 = (cumulative + next.weight) / total;
let max_weight = total
* (4.0 * q0.min(1.0 - q0).max(0.0).min(q1.min(1.0 - q1).max(0.0)))
.max(1.0 / compression);
let combined_weight = current.weight + next.weight;
if combined_weight <= max_weight {
let new_mean =
current.mean + (next.mean - current.mean) * (next.weight / combined_weight);
current.mean = new_mean;
current.weight = combined_weight;
} else {
out.push(current);
current = *next;
}
cumulative += next.weight;
}
out.push(current);
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_digest_returns_nan() {
let d = TDigest::new();
assert!(d.quantile(0.5).is_nan());
}
#[test]
fn single_value_is_every_quantile() {
let mut d = TDigest::new();
d.add(42.0);
assert_eq!(d.quantile(0.0), 42.0);
assert_eq!(d.quantile(0.5), 42.0);
assert_eq!(d.quantile(1.0), 42.0);
}
#[test]
fn median_of_uniform_is_near_half() {
let mut d = TDigest::new();
for i in 0..10_000 {
d.add(i as f64);
}
let m = d.quantile(0.5);
assert!(m > 2000.0 && m < 8000.0, "median was {m}");
}
#[test]
fn tail_quantiles_are_better_than_20pct_error() {
let mut d = TDigest::new();
for i in 0..100_000 {
d.add(i as f64);
}
let p99 = d.quantile(0.99);
let expected = 99_000.0;
let err = ((p99 - expected).abs() / expected) * 100.0;
assert!(err < 20.0, "p99 error {err}% (got {p99})");
}
#[test]
fn min_max_are_preserved_exactly() {
let mut d = TDigest::new();
for x in [3.14, 2.71, 1.41, 10.0, -5.0] {
d.add(x);
}
assert_eq!(d.min(), -5.0);
assert_eq!(d.max(), 10.0);
}
#[test]
fn merge_is_associative_enough_for_parallel_agg() {
let mut left = TDigest::new();
let mut right = TDigest::new();
for i in 0..5_000 {
left.add(i as f64);
}
for i in 5_000..10_000 {
right.add(i as f64);
}
let mut combined = TDigest::new();
for i in 0..10_000 {
combined.add(i as f64);
}
left.merge(&right);
let m_combined = combined.quantile(0.5);
let m_merged = left.quantile(0.5);
assert!(
(m_combined - m_merged).abs() < 200.0,
"merge median drift: {m_combined} vs {m_merged}"
);
assert_eq!(left.count(), 10_000.0);
}
#[test]
fn non_finite_and_non_positive_weight_are_ignored() {
let mut d = TDigest::new();
d.add(f64::NAN);
d.add(f64::INFINITY);
d.add_weighted(1.0, 0.0);
d.add_weighted(1.0, -3.0);
assert!(d.is_empty());
}
}