use alloc::format;
use alloc::vec::Vec;
#[cfg(not(feature = "std"))]
#[allow(unused_imports)]
use num_traits::Float;
use crate::error::{RcfError, RcfResult};
pub const DEFAULT_COMPRESSION: f64 = 100.0;
const BUFFER_MULT: usize = 10;
#[derive(Debug, Clone, Copy, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Centroid {
pub mean: f64,
pub weight: f64,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct TDigest {
compression: f64,
centroids: Vec<Centroid>,
buffer: Vec<f64>,
total_weight: f64,
min: f64,
max: f64,
}
impl TDigest {
pub fn new(compression: f64) -> RcfResult<Self> {
if !compression.is_finite() || !(2.0..=10_000.0).contains(&compression) {
return Err(RcfError::InvalidConfig(
format!("TDigest: compression must be finite in [2, 10000], got {compression}")
.into(),
));
}
Ok(Self {
compression,
centroids: Vec::new(),
buffer: Vec::new(),
total_weight: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
})
}
#[must_use]
pub fn with_default_compression() -> Self {
Self {
compression: DEFAULT_COMPRESSION,
centroids: Vec::new(),
buffer: Vec::new(),
total_weight: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
}
}
#[must_use]
pub fn compression(&self) -> f64 {
self.compression
}
#[must_use]
pub fn total_weight(&self) -> f64 {
#[allow(clippy::cast_precision_loss)]
let pending = self.buffer.len() as f64;
self.total_weight + pending
}
#[must_use]
pub fn centroid_count(&self) -> usize {
self.centroids.len()
}
#[must_use]
pub fn min(&self) -> Option<f64> {
if self.min.is_finite() {
Some(self.min)
} else {
None
}
}
#[must_use]
pub fn max(&self) -> Option<f64> {
if self.max.is_finite() {
Some(self.max)
} else {
None
}
}
pub fn record(&mut self, value: f64) {
if !value.is_finite() {
return;
}
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value;
}
self.buffer.push(value);
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
let threshold = (self.compression as usize).saturating_mul(BUFFER_MULT);
if self.buffer.len() >= threshold {
self.flush_buffer();
}
}
pub fn flush(&mut self) {
self.flush_buffer();
}
#[must_use]
pub fn quantile(&mut self, q: f64) -> Option<f64> {
if !q.is_finite() || !(0.0..=1.0).contains(&q) {
return None;
}
self.flush_buffer();
if self.centroids.is_empty() {
return None;
}
if q <= 0.0 {
return Some(self.min);
}
if q >= 1.0 {
return Some(self.max);
}
let target = q * self.total_weight;
let mut cum = 0.0_f64;
let first = &self.centroids[0];
let first_center = first.weight / 2.0;
if target < first_center {
if first.weight <= 1.0 || first_center <= 0.0 {
return Some(first.mean);
}
let frac = target / first_center;
return Some(self.min + frac * (first.mean - self.min));
}
cum += first.weight;
for i in 1..self.centroids.len() {
let prev = &self.centroids[i - 1];
let cur = &self.centroids[i];
let prev_center = cum - prev.weight / 2.0;
let cur_center = cum + cur.weight / 2.0;
if target < cur_center {
let span = cur_center - prev_center;
if span <= 0.0 {
return Some(prev.mean);
}
let frac = (target - prev_center) / span;
return Some(prev.mean + frac * (cur.mean - prev.mean));
}
cum += cur.weight;
}
let last = self.centroids.last()?;
let last_center = self.total_weight - last.weight / 2.0;
let span = self.total_weight - last_center;
if span <= 0.0 {
return Some(last.mean);
}
let frac = ((target - last_center) / span).clamp(0.0, 1.0);
Some(last.mean + frac * (self.max - last.mean))
}
#[must_use]
pub fn percentile(&mut self, p: f64) -> Option<f64> {
self.quantile(p / 100.0)
}
pub fn merge(&mut self, other: &Self) -> RcfResult<()> {
#[allow(clippy::float_cmp)]
let compat = self.compression == other.compression;
if !compat {
return Err(RcfError::InvalidConfig(
format!(
"TDigest merge: compression mismatch ({} vs {})",
self.compression, other.compression
)
.into(),
));
}
self.flush_buffer();
for c in &other.centroids {
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let n = c.weight.round() as usize;
for _ in 0..n.max(1) {
self.buffer.push(c.mean);
}
}
for v in &other.buffer {
self.buffer.push(*v);
}
if other.min < self.min {
self.min = other.min;
}
if other.max > self.max {
self.max = other.max;
}
self.flush_buffer();
Ok(())
}
pub fn reset(&mut self) {
self.centroids.clear();
self.buffer.clear();
self.total_weight = 0.0;
self.min = f64::INFINITY;
self.max = f64::NEG_INFINITY;
}
#[must_use]
pub fn centroids(&self) -> &[Centroid] {
&self.centroids
}
fn flush_buffer(&mut self) {
if self.buffer.is_empty() {
return;
}
self.buffer
.sort_by(|a, b| a.partial_cmp(b).unwrap_or(core::cmp::Ordering::Equal));
let mut combined: Vec<Centroid> =
Vec::with_capacity(self.centroids.len() + self.buffer.len());
let mut i = 0_usize;
let mut j = 0_usize;
while i < self.centroids.len() && j < self.buffer.len() {
let c = self.centroids[i];
let v = self.buffer[j];
if c.mean <= v {
combined.push(c);
i += 1;
} else {
combined.push(Centroid {
mean: v,
weight: 1.0,
});
j += 1;
}
}
while i < self.centroids.len() {
combined.push(self.centroids[i]);
i += 1;
}
while j < self.buffer.len() {
combined.push(Centroid {
mean: self.buffer[j],
weight: 1.0,
});
j += 1;
}
self.buffer.clear();
let total: f64 = combined.iter().map(|c| c.weight).sum();
self.total_weight = total;
if total <= 0.0 {
self.centroids = combined;
return;
}
let mut out: Vec<Centroid> = Vec::with_capacity(combined.len());
let mut cum = 0.0_f64;
let mut current = combined[0];
cum += current.weight;
for centroid in &combined[1..] {
let q0 = (cum - current.weight) / total;
let q1 = (cum + centroid.weight) / total;
let q_limit = q_limit_for(q0, self.compression);
if q1 <= q_limit {
let new_weight = current.weight + centroid.weight;
current.mean =
(current.mean * current.weight + centroid.mean * centroid.weight) / new_weight;
current.weight = new_weight;
} else {
out.push(current);
current = *centroid;
}
cum += centroid.weight;
}
out.push(current);
self.centroids = out;
}
}
fn q_limit_for(q: f64, compression: f64) -> f64 {
use core::f64::consts::PI;
let clamped = q.clamp(0.0, 1.0);
let k = (compression / (2.0 * PI)) * (2.0 * clamped - 1.0).asin();
let next = (2.0 * PI * (k + 1.0) / compression).sin();
let limit = f64::midpoint(next, 1.0);
if limit.is_finite() && limit > clamped {
limit.min(1.0)
} else {
(clamped + 1.0 / compression).min(1.0)
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::panic,
clippy::float_cmp,
clippy::cast_precision_loss,
clippy::cast_lossless
)]
mod tests {
use super::*;
#[test]
fn new_rejects_bad_compression() {
assert!(TDigest::new(0.0).is_err());
assert!(TDigest::new(1.0).is_err());
assert!(TDigest::new(f64::NAN).is_err());
assert!(TDigest::new(1.0e6).is_err());
}
#[test]
fn empty_quantile_returns_none() {
let mut d = TDigest::with_default_compression();
assert!(d.quantile(0.5).is_none());
}
#[test]
fn record_updates_min_max() {
let mut d = TDigest::with_default_compression();
d.record(5.0);
d.record(2.0);
d.record(8.0);
assert_eq!(d.min(), Some(2.0));
assert_eq!(d.max(), Some(8.0));
}
#[test]
fn record_ignores_nan_and_inf() {
let mut d = TDigest::with_default_compression();
d.record(f64::NAN);
d.record(f64::INFINITY);
d.record(1.0);
assert_eq!(d.total_weight(), 1.0);
}
#[test]
fn median_of_uniform_stream() {
let mut d = TDigest::with_default_compression();
for i in 0..10_000 {
d.record(i as f64);
}
let median = d.quantile(0.5).unwrap();
assert!((median - 4999.5).abs() < 150.0, "median = {median}");
}
#[test]
fn tail_quantiles_accurate_on_uniform() {
let mut d = TDigest::new(200.0).unwrap();
for i in 0..10_000 {
d.record(i as f64);
}
let p99 = d.quantile(0.99).unwrap();
let p999 = d.quantile(0.999).unwrap();
assert!((p99 - 9899.0).abs() < 100.0, "p99 = {p99}");
assert!((p999 - 9989.0).abs() < 100.0, "p99.9 = {p999}");
}
#[test]
fn percentile_is_quantile_over_100() {
let mut d = TDigest::with_default_compression();
for i in 0..1000 {
d.record(i as f64);
}
let q50 = d.quantile(0.5).unwrap();
let p50 = d.percentile(50.0).unwrap();
assert_eq!(q50, p50);
}
#[test]
fn quantile_0_returns_min_quantile_1_returns_max() {
let mut d = TDigest::with_default_compression();
for v in &[1.0, 2.0, 3.0, 100.0] {
d.record(*v);
}
assert_eq!(d.quantile(0.0), Some(1.0));
assert_eq!(d.quantile(1.0), Some(100.0));
}
#[test]
fn merge_two_digests_preserves_quantiles() {
let mut a = TDigest::new(200.0).unwrap();
let mut b = TDigest::new(200.0).unwrap();
for i in 0..5_000 {
a.record(i as f64);
}
for i in 5_000..10_000 {
b.record(i as f64);
}
a.merge(&b).unwrap();
let median = a.quantile(0.5).unwrap();
assert!((median - 4999.5).abs() < 200.0, "median = {median}");
assert_eq!(a.min(), Some(0.0));
assert_eq!(a.max(), Some(9999.0));
}
#[test]
fn merge_rejects_compression_mismatch() {
let mut a = TDigest::new(100.0).unwrap();
let b = TDigest::new(200.0).unwrap();
assert!(a.merge(&b).is_err());
}
#[test]
fn reset_drops_state() {
let mut d = TDigest::with_default_compression();
for i in 0..100 {
d.record(i as f64);
}
d.reset();
assert_eq!(d.total_weight(), 0.0);
assert!(d.min().is_none());
assert!(d.max().is_none());
assert!(d.quantile(0.5).is_none());
}
#[test]
fn centroid_count_bounded_by_compression() {
let mut d = TDigest::new(100.0).unwrap();
for i in 0..50_000 {
d.record(i as f64);
}
d.flush();
assert!(
d.centroid_count() <= 250,
"centroids = {}",
d.centroid_count()
);
}
#[test]
fn quantile_rejects_out_of_range() {
let mut d = TDigest::with_default_compression();
d.record(1.0);
assert!(d.quantile(-0.1).is_none());
assert!(d.quantile(1.1).is_none());
assert!(d.quantile(f64::NAN).is_none());
}
#[cfg(all(feature = "serde", feature = "postcard"))]
#[test]
fn postcard_roundtrip_preserves_quantiles() {
let mut d = TDigest::new(200.0).unwrap();
for i in 0..2_000 {
d.record(i as f64);
}
d.flush();
let bytes = postcard::to_allocvec(&d).unwrap();
let mut back: TDigest = postcard::from_bytes(&bytes).unwrap();
let before = d.quantile(0.9).unwrap();
let after = back.quantile(0.9).unwrap();
assert_eq!(before, after);
}
}