anomstream_core/
tdigest.rs1use alloc::format;
31use alloc::vec::Vec;
32
33#[cfg(not(feature = "std"))]
34#[allow(unused_imports)]
35use num_traits::Float;
36
37use crate::error::{RcfError, RcfResult};
38
39pub const DEFAULT_COMPRESSION: f64 = 100.0;
42
43const BUFFER_MULT: usize = 10;
46
47#[derive(Debug, Clone, Copy, PartialEq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub struct Centroid {
53 pub mean: f64,
56 pub weight: f64,
60}
61
62#[derive(Debug, Clone)]
68#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
69pub struct TDigest {
70 compression: f64,
74 centroids: Vec<Centroid>,
77 buffer: Vec<f64>,
79 total_weight: f64,
83 min: f64,
87 max: f64,
89}
90
91impl TDigest {
92 pub fn new(compression: f64) -> RcfResult<Self> {
99 if !compression.is_finite() || !(2.0..=10_000.0).contains(&compression) {
100 return Err(RcfError::InvalidConfig(
101 format!("TDigest: compression must be finite in [2, 10000], got {compression}")
102 .into(),
103 ));
104 }
105 Ok(Self {
106 compression,
107 centroids: Vec::new(),
108 buffer: Vec::new(),
109 total_weight: 0.0,
110 min: f64::INFINITY,
111 max: f64::NEG_INFINITY,
112 })
113 }
114
115 #[must_use]
118 pub fn with_default_compression() -> Self {
119 Self {
121 compression: DEFAULT_COMPRESSION,
122 centroids: Vec::new(),
123 buffer: Vec::new(),
124 total_weight: 0.0,
125 min: f64::INFINITY,
126 max: f64::NEG_INFINITY,
127 }
128 }
129
130 #[must_use]
132 pub fn compression(&self) -> f64 {
133 self.compression
134 }
135
136 #[must_use]
139 pub fn total_weight(&self) -> f64 {
140 #[allow(clippy::cast_precision_loss)]
141 let pending = self.buffer.len() as f64;
142 self.total_weight + pending
143 }
144
145 #[must_use]
148 pub fn centroid_count(&self) -> usize {
149 self.centroids.len()
150 }
151
152 #[must_use]
155 pub fn min(&self) -> Option<f64> {
156 if self.min.is_finite() {
157 Some(self.min)
158 } else {
159 None
160 }
161 }
162
163 #[must_use]
166 pub fn max(&self) -> Option<f64> {
167 if self.max.is_finite() {
168 Some(self.max)
169 } else {
170 None
171 }
172 }
173
174 pub fn record(&mut self, value: f64) {
179 if !value.is_finite() {
180 return;
181 }
182 if value < self.min {
183 self.min = value;
184 }
185 if value > self.max {
186 self.max = value;
187 }
188 self.buffer.push(value);
189 #[allow(
190 clippy::cast_possible_truncation,
191 clippy::cast_sign_loss,
192 clippy::cast_precision_loss
193 )]
194 let threshold = (self.compression as usize).saturating_mul(BUFFER_MULT);
195 if self.buffer.len() >= threshold {
196 self.flush_buffer();
197 }
198 }
199
200 pub fn flush(&mut self) {
205 self.flush_buffer();
206 }
207
208 #[must_use]
211 pub fn quantile(&mut self, q: f64) -> Option<f64> {
212 if !q.is_finite() || !(0.0..=1.0).contains(&q) {
213 return None;
214 }
215 self.flush_buffer();
216 if self.centroids.is_empty() {
217 return None;
218 }
219 if q <= 0.0 {
220 return Some(self.min);
221 }
222 if q >= 1.0 {
223 return Some(self.max);
224 }
225 let target = q * self.total_weight;
226
227 let mut cum = 0.0_f64;
230 let first = &self.centroids[0];
231 let first_center = first.weight / 2.0;
234 if target < first_center {
235 if first.weight <= 1.0 || first_center <= 0.0 {
236 return Some(first.mean);
237 }
238 let frac = target / first_center;
239 return Some(self.min + frac * (first.mean - self.min));
240 }
241 cum += first.weight;
242
243 for i in 1..self.centroids.len() {
244 let prev = &self.centroids[i - 1];
245 let cur = &self.centroids[i];
246 let prev_center = cum - prev.weight / 2.0;
247 let cur_center = cum + cur.weight / 2.0;
248 if target < cur_center {
249 let span = cur_center - prev_center;
250 if span <= 0.0 {
251 return Some(prev.mean);
252 }
253 let frac = (target - prev_center) / span;
254 return Some(prev.mean + frac * (cur.mean - prev.mean));
255 }
256 cum += cur.weight;
257 }
258
259 let last = self.centroids.last()?;
261 let last_center = self.total_weight - last.weight / 2.0;
262 let span = self.total_weight - last_center;
263 if span <= 0.0 {
264 return Some(last.mean);
265 }
266 let frac = ((target - last_center) / span).clamp(0.0, 1.0);
267 Some(last.mean + frac * (self.max - last.mean))
268 }
269
270 #[must_use]
272 pub fn percentile(&mut self, p: f64) -> Option<f64> {
273 self.quantile(p / 100.0)
274 }
275
276 pub fn merge(&mut self, other: &Self) -> RcfResult<()> {
285 #[allow(clippy::float_cmp)]
286 let compat = self.compression == other.compression;
287 if !compat {
288 return Err(RcfError::InvalidConfig(
289 format!(
290 "TDigest merge: compression mismatch ({} vs {})",
291 self.compression, other.compression
292 )
293 .into(),
294 ));
295 }
296 self.flush_buffer();
297 for c in &other.centroids {
301 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
305 let n = c.weight.round() as usize;
306 for _ in 0..n.max(1) {
307 self.buffer.push(c.mean);
308 }
309 }
310 for v in &other.buffer {
311 self.buffer.push(*v);
312 }
313 if other.min < self.min {
314 self.min = other.min;
315 }
316 if other.max > self.max {
317 self.max = other.max;
318 }
319 self.flush_buffer();
320 Ok(())
321 }
322
323 pub fn reset(&mut self) {
326 self.centroids.clear();
327 self.buffer.clear();
328 self.total_weight = 0.0;
329 self.min = f64::INFINITY;
330 self.max = f64::NEG_INFINITY;
331 }
332
333 #[must_use]
337 pub fn centroids(&self) -> &[Centroid] {
338 &self.centroids
339 }
340
341 fn flush_buffer(&mut self) {
344 if self.buffer.is_empty() {
345 return;
346 }
347 self.buffer
350 .sort_by(|a, b| a.partial_cmp(b).unwrap_or(core::cmp::Ordering::Equal));
351
352 let mut combined: Vec<Centroid> =
355 Vec::with_capacity(self.centroids.len() + self.buffer.len());
356 let mut i = 0_usize;
357 let mut j = 0_usize;
358 while i < self.centroids.len() && j < self.buffer.len() {
359 let c = self.centroids[i];
360 let v = self.buffer[j];
361 if c.mean <= v {
362 combined.push(c);
363 i += 1;
364 } else {
365 combined.push(Centroid {
366 mean: v,
367 weight: 1.0,
368 });
369 j += 1;
370 }
371 }
372 while i < self.centroids.len() {
373 combined.push(self.centroids[i]);
374 i += 1;
375 }
376 while j < self.buffer.len() {
377 combined.push(Centroid {
378 mean: self.buffer[j],
379 weight: 1.0,
380 });
381 j += 1;
382 }
383 self.buffer.clear();
384
385 let total: f64 = combined.iter().map(|c| c.weight).sum();
387 self.total_weight = total;
388 if total <= 0.0 {
389 self.centroids = combined;
390 return;
391 }
392
393 let mut out: Vec<Centroid> = Vec::with_capacity(combined.len());
395 let mut cum = 0.0_f64;
396 let mut current = combined[0];
397 cum += current.weight;
398 for centroid in &combined[1..] {
399 let q0 = (cum - current.weight) / total;
400 let q1 = (cum + centroid.weight) / total;
401 let q_limit = q_limit_for(q0, self.compression);
402 if q1 <= q_limit {
407 let new_weight = current.weight + centroid.weight;
408 current.mean =
409 (current.mean * current.weight + centroid.mean * centroid.weight) / new_weight;
410 current.weight = new_weight;
411 } else {
412 out.push(current);
413 current = *centroid;
414 }
415 cum += centroid.weight;
416 }
417 out.push(current);
418 self.centroids = out;
419 }
420}
421
422fn q_limit_for(q: f64, compression: f64) -> f64 {
427 use core::f64::consts::PI;
428 let clamped = q.clamp(0.0, 1.0);
429 let k = (compression / (2.0 * PI)) * (2.0 * clamped - 1.0).asin();
430 let next = (2.0 * PI * (k + 1.0) / compression).sin();
431 let limit = f64::midpoint(next, 1.0);
432 if limit.is_finite() && limit > clamped {
433 limit.min(1.0)
434 } else {
435 (clamped + 1.0 / compression).min(1.0)
436 }
437}
438
439#[cfg(test)]
440#[allow(
441 clippy::unwrap_used,
442 clippy::panic,
443 clippy::float_cmp,
444 clippy::cast_precision_loss,
445 clippy::cast_lossless
446)]
447mod tests {
448 use super::*;
449
450 #[test]
451 fn new_rejects_bad_compression() {
452 assert!(TDigest::new(0.0).is_err());
453 assert!(TDigest::new(1.0).is_err());
454 assert!(TDigest::new(f64::NAN).is_err());
455 assert!(TDigest::new(1.0e6).is_err());
456 }
457
458 #[test]
459 fn empty_quantile_returns_none() {
460 let mut d = TDigest::with_default_compression();
461 assert!(d.quantile(0.5).is_none());
462 }
463
464 #[test]
465 fn record_updates_min_max() {
466 let mut d = TDigest::with_default_compression();
467 d.record(5.0);
468 d.record(2.0);
469 d.record(8.0);
470 assert_eq!(d.min(), Some(2.0));
471 assert_eq!(d.max(), Some(8.0));
472 }
473
474 #[test]
475 fn record_ignores_nan_and_inf() {
476 let mut d = TDigest::with_default_compression();
477 d.record(f64::NAN);
478 d.record(f64::INFINITY);
479 d.record(1.0);
480 assert_eq!(d.total_weight(), 1.0);
481 }
482
483 #[test]
484 fn median_of_uniform_stream() {
485 let mut d = TDigest::with_default_compression();
486 for i in 0..10_000 {
487 d.record(i as f64);
488 }
489 let median = d.quantile(0.5).unwrap();
490 assert!((median - 4999.5).abs() < 150.0, "median = {median}");
492 }
493
494 #[test]
495 fn tail_quantiles_accurate_on_uniform() {
496 let mut d = TDigest::new(200.0).unwrap();
497 for i in 0..10_000 {
498 d.record(i as f64);
499 }
500 let p99 = d.quantile(0.99).unwrap();
501 let p999 = d.quantile(0.999).unwrap();
502 assert!((p99 - 9899.0).abs() < 100.0, "p99 = {p99}");
505 assert!((p999 - 9989.0).abs() < 100.0, "p99.9 = {p999}");
506 }
507
508 #[test]
509 fn percentile_is_quantile_over_100() {
510 let mut d = TDigest::with_default_compression();
511 for i in 0..1000 {
512 d.record(i as f64);
513 }
514 let q50 = d.quantile(0.5).unwrap();
515 let p50 = d.percentile(50.0).unwrap();
516 assert_eq!(q50, p50);
517 }
518
519 #[test]
520 fn quantile_0_returns_min_quantile_1_returns_max() {
521 let mut d = TDigest::with_default_compression();
522 for v in &[1.0, 2.0, 3.0, 100.0] {
523 d.record(*v);
524 }
525 assert_eq!(d.quantile(0.0), Some(1.0));
526 assert_eq!(d.quantile(1.0), Some(100.0));
527 }
528
529 #[test]
530 fn merge_two_digests_preserves_quantiles() {
531 let mut a = TDigest::new(200.0).unwrap();
532 let mut b = TDigest::new(200.0).unwrap();
533 for i in 0..5_000 {
534 a.record(i as f64);
535 }
536 for i in 5_000..10_000 {
537 b.record(i as f64);
538 }
539 a.merge(&b).unwrap();
540 let median = a.quantile(0.5).unwrap();
541 assert!((median - 4999.5).abs() < 200.0, "median = {median}");
542 assert_eq!(a.min(), Some(0.0));
543 assert_eq!(a.max(), Some(9999.0));
544 }
545
546 #[test]
547 fn merge_rejects_compression_mismatch() {
548 let mut a = TDigest::new(100.0).unwrap();
549 let b = TDigest::new(200.0).unwrap();
550 assert!(a.merge(&b).is_err());
551 }
552
553 #[test]
554 fn reset_drops_state() {
555 let mut d = TDigest::with_default_compression();
556 for i in 0..100 {
557 d.record(i as f64);
558 }
559 d.reset();
560 assert_eq!(d.total_weight(), 0.0);
561 assert!(d.min().is_none());
562 assert!(d.max().is_none());
563 assert!(d.quantile(0.5).is_none());
564 }
565
566 #[test]
567 fn centroid_count_bounded_by_compression() {
568 let mut d = TDigest::new(100.0).unwrap();
569 for i in 0..50_000 {
570 d.record(i as f64);
571 }
572 d.flush();
573 assert!(
577 d.centroid_count() <= 250,
578 "centroids = {}",
579 d.centroid_count()
580 );
581 }
582
583 #[test]
584 fn quantile_rejects_out_of_range() {
585 let mut d = TDigest::with_default_compression();
586 d.record(1.0);
587 assert!(d.quantile(-0.1).is_none());
588 assert!(d.quantile(1.1).is_none());
589 assert!(d.quantile(f64::NAN).is_none());
590 }
591
592 #[cfg(all(feature = "serde", feature = "postcard"))]
593 #[test]
594 fn postcard_roundtrip_preserves_quantiles() {
595 let mut d = TDigest::new(200.0).unwrap();
596 for i in 0..2_000 {
597 d.record(i as f64);
598 }
599 d.flush();
600 let bytes = postcard::to_allocvec(&d).unwrap();
601 let mut back: TDigest = postcard::from_bytes(&bytes).unwrap();
602 let before = d.quantile(0.9).unwrap();
603 let after = back.quantile(0.9).unwrap();
604 assert_eq!(before, after);
605 }
606}