use std::collections::BTreeMap;
const MIN_INDEXABLE_VALUE: f64 = 1e-300;
const MAX_INDEXABLE_VALUE: f64 = 1e300;
#[derive(Debug, Clone)]
pub struct DDSketch {
alpha: f64,
gamma: f64,
log_gamma: f64,
positive_buckets: BTreeMap<i32, u64>,
negative_buckets: BTreeMap<i32, u64>,
count: u64,
zero_count: u64,
min: f64,
max: f64,
sum: f64,
}
impl DDSketch {
pub fn new(alpha: f64) -> Self {
assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be between 0 and 1");
let gamma = (1.0 + alpha) / (1.0 - alpha);
Self {
alpha,
gamma,
log_gamma: gamma.ln(),
positive_buckets: BTreeMap::new(),
negative_buckets: BTreeMap::new(),
count: 0,
zero_count: 0,
min: f64::MAX,
max: f64::MIN,
sum: 0.0,
}
}
pub fn default_accuracy() -> Self {
Self::new(0.01)
}
#[inline]
fn index(&self, value: f64) -> i32 {
debug_assert!(value > 0.0);
let clamped = value.clamp(MIN_INDEXABLE_VALUE, MAX_INDEXABLE_VALUE);
(clamped.ln() / self.log_gamma).ceil() as i32
}
#[inline]
fn bucket_value(&self, index: i32) -> f64 {
self.gamma.powf(index as f64 - 0.5)
}
#[inline]
pub fn add(&mut self, value: f64) {
self.count += 1;
self.sum += value;
self.min = self.min.min(value);
self.max = self.max.max(value);
if value == 0.0 {
self.zero_count += 1;
} else if value > 0.0 {
let idx = self.index(value);
*self.positive_buckets.entry(idx).or_default() += 1;
} else {
let idx = self.index(-value);
*self.negative_buckets.entry(idx).or_default() += 1;
}
}
pub fn add_count(&mut self, value: f64, count: u64) {
self.count += count;
self.sum += value * count as f64;
self.min = self.min.min(value);
self.max = self.max.max(value);
if value == 0.0 {
self.zero_count += count;
} else if value > 0.0 {
let idx = self.index(value);
*self.positive_buckets.entry(idx).or_default() += count;
} else {
let idx = self.index(-value);
*self.negative_buckets.entry(idx).or_default() += count;
}
}
pub fn quantile(&self, q: f64) -> f64 {
if self.count == 0 {
return 0.0;
}
let q = q.clamp(0.0, 1.0);
let target_rank = (q * self.count as f64).ceil() as u64;
if target_rank == 0 {
return self.min;
}
let mut cumulative = 0u64;
for (&idx, &count) in self.negative_buckets.iter().rev() {
cumulative += count;
if cumulative >= target_rank {
return -self.bucket_value(idx);
}
}
cumulative += self.zero_count;
if cumulative >= target_rank {
return 0.0;
}
for (&idx, &count) in &self.positive_buckets {
cumulative += count;
if cumulative >= target_rank {
return self.bucket_value(idx);
}
}
self.max
}
pub fn percentiles(&self) -> DDSketchPercentiles {
DDSketchPercentiles {
p50: self.quantile(0.50),
p75: self.quantile(0.75),
p90: self.quantile(0.90),
p95: self.quantile(0.95),
p99: self.quantile(0.99),
min: if self.count > 0 { self.min } else { 0.0 },
max: if self.count > 0 { self.max } else { 0.0 },
mean: self.mean(),
count: self.count,
}
}
pub fn count(&self) -> u64 {
self.count
}
pub fn alpha(&self) -> f64 {
self.alpha
}
pub fn mean(&self) -> f64 {
if self.count == 0 {
0.0
} else {
self.sum / self.count as f64
}
}
pub fn sum(&self) -> f64 {
self.sum
}
pub fn min(&self) -> f64 {
if self.count > 0 { self.min } else { 0.0 }
}
pub fn max(&self) -> f64 {
if self.count > 0 { self.max } else { 0.0 }
}
pub fn merge(&mut self, other: &DDSketch) {
if other.count == 0 {
return;
}
for (&idx, &count) in &other.positive_buckets {
*self.positive_buckets.entry(idx).or_default() += count;
}
for (&idx, &count) in &other.negative_buckets {
*self.negative_buckets.entry(idx).or_default() += count;
}
self.count += other.count;
self.zero_count += other.zero_count;
self.sum += other.sum;
if other.count > 0 {
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
}
}
pub fn is_empty(&self) -> bool {
self.count == 0
}
pub fn memory_usage(&self) -> usize {
std::mem::size_of::<Self>()
+ self.positive_buckets.len()
* (std::mem::size_of::<i32>() + std::mem::size_of::<u64>())
+ self.negative_buckets.len()
* (std::mem::size_of::<i32>() + std::mem::size_of::<u64>())
}
pub fn clear(&mut self) {
self.positive_buckets.clear();
self.negative_buckets.clear();
self.count = 0;
self.zero_count = 0;
self.min = f64::MAX;
self.max = f64::MIN;
self.sum = 0.0;
}
}
impl Default for DDSketch {
fn default() -> Self {
Self::default_accuracy()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct DDSketchPercentiles {
pub p50: f64,
pub p75: f64,
pub p90: f64,
pub p95: f64,
pub p99: f64,
pub min: f64,
pub max: f64,
pub mean: f64,
pub count: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_operations() {
let mut sketch = DDSketch::new(0.01);
for i in 1..=100 {
sketch.add(i as f64);
}
assert_eq!(sketch.count(), 100);
assert!((sketch.mean() - 50.5).abs() < 0.01);
let p50 = sketch.quantile(0.50);
assert!((p50 - 50.0).abs() < 5.0, "P50 was {}", p50);
let p99 = sketch.quantile(0.99);
assert!((p99 - 99.0).abs() < 5.0, "P99 was {}", p99);
}
#[test]
fn test_merge() {
let mut sketch1 = DDSketch::new(0.01);
let mut sketch2 = DDSketch::new(0.01);
for i in 1..=50 {
sketch1.add(i as f64);
}
for i in 51..=100 {
sketch2.add(i as f64);
}
sketch1.merge(&sketch2);
assert_eq!(sketch1.count(), 100);
assert!((sketch1.mean() - 50.5).abs() < 0.01);
}
#[test]
fn test_empty_sketch() {
let sketch = DDSketch::new(0.01);
assert_eq!(sketch.count(), 0);
assert_eq!(sketch.quantile(0.50), 0.0);
assert_eq!(sketch.mean(), 0.0);
}
#[test]
fn test_latency_distribution() {
let mut sketch = DDSketch::new(0.01);
for _ in 0..900 {
sketch.add(5_000.0); }
for _ in 0..90 {
sketch.add(30_000.0); }
for _ in 0..10 {
sketch.add(150_000.0); }
let percentiles = sketch.percentiles();
assert!(
percentiles.p50 < 15_000.0,
"P50 {} too high",
percentiles.p50
);
assert!(percentiles.p99 > 5_000.0, "P99 {} too low", percentiles.p99);
}
}