use crate::DataInput;
use crate::common::input::data_input_to_f64;
use crate::common::numerical::NumericalValue;
use crate::common::structures::Vector1D;
use rmp_serde::decode::Error as RmpDecodeError;
use rmp_serde::encode::Error as RmpEncodeError;
use rmp_serde::{from_slice, to_vec_named};
use serde::{Deserialize, Serialize};
const GROW_CHUNK: usize = 128;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Buckets {
counts: Vector1D<u64>,
offset: i32,
}
impl Buckets {
fn new() -> Self {
Self {
counts: Vector1D::from_vec(Vec::new()),
offset: 0,
}
}
#[inline(always)]
fn is_empty(&self) -> bool {
self.counts.is_empty()
}
#[inline(always)]
fn range(&self) -> Option<(i32, i32)> {
if self.counts.is_empty() {
None
} else {
let left = self.offset;
let right = self.offset + self.counts.len() as i32 - 1;
Some((left, right))
}
}
#[inline(always)]
fn ensure(&mut self, k: i32) {
if self.counts.is_empty() {
self.counts = Vector1D::from_vec(vec![0u64; GROW_CHUNK]);
self.offset = k - (GROW_CHUNK as i32 / 2);
return;
}
let (left, right) = self.range().unwrap();
if k < left {
let needed = (left - k) as usize;
let grow = needed.max(GROW_CHUNK);
let mut v = vec![0u64; grow];
v.extend_from_slice(self.counts.as_slice());
self.counts = Vector1D::from_vec(v);
self.offset -= grow as i32;
} else if k > right {
let needed = (k - right) as usize;
let grow = needed.max(GROW_CHUNK);
let mut v = self.counts.clone().into_vec();
v.resize(v.len() + grow, 0);
self.counts = Vector1D::from_vec(v);
}
}
#[inline(always)]
fn add_one(&mut self, k: i32) {
let idx_i32 = k - self.offset;
if idx_i32 >= 0 {
let idx = idx_i32 as usize;
let slice = self.counts.as_mut_slice();
if idx < slice.len() {
unsafe {
*slice.as_mut_ptr().add(idx) += 1;
}
return;
}
}
self.ensure(k);
let idx = (k - self.offset) as usize;
self.counts.as_mut_slice()[idx] += 1;
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DDSketch {
alpha: f64,
gamma: f64,
log_gamma: f64,
inv_log_gamma: f64,
store: Buckets,
count: u64,
sum: f64,
min: f64,
max: f64,
}
impl DDSketch {
pub fn new(alpha: f64) -> Self {
assert!((0.0..1.0).contains(&alpha), "alpha must be in (0,1)");
let gamma = (1.0 + alpha) / (1.0 - alpha);
let log_gamma = gamma.ln();
let inv_log_gamma = 1.0 / log_gamma;
Self {
alpha,
gamma,
log_gamma,
inv_log_gamma,
store: Buckets::new(),
count: 0,
sum: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
}
}
pub fn serialize_to_bytes(&self) -> Result<Vec<u8>, RmpEncodeError> {
to_vec_named(self)
}
pub fn deserialize_from_bytes(bytes: &[u8]) -> Result<Self, RmpDecodeError> {
from_slice(bytes)
}
#[inline(always)]
pub fn add<T: NumericalValue>(&mut self, val: &T) {
let v = val.to_f64();
if !(v.is_finite() && v > 0.0) {
return;
}
self.count += 1;
self.sum += v;
if v < self.min {
self.min = v;
}
if v > self.max {
self.max = v;
}
let k = self.key_for(v);
self.store.add_one(k);
}
pub fn get_value_at_quantile(&self, q: f64) -> Option<f64> {
if self.count == 0 || q.is_nan() {
return None;
}
if q <= 0.0 {
return Some(self.min);
}
if q >= 1.0 {
return Some(self.max);
}
let rank = (q * self.count as f64).ceil() as u64;
let mut seen = 0u64;
let slice = self.store.counts.as_slice();
let offset = self.store.offset;
for (i, &c) in slice.iter().enumerate() {
if c == 0 {
continue;
}
seen += c;
if seen >= rank {
let bin = offset + i as i32;
let mut v = self.bin_representative(bin);
if v < self.min {
v = self.min;
}
if v > self.max {
v = self.max;
}
return Some(v);
}
}
Some(self.max)
}
pub fn get_count(&self) -> u64 {
self.count
}
pub fn min(&self) -> Option<f64> {
if self.count == 0 {
None
} else {
Some(self.min)
}
}
pub fn max(&self) -> Option<f64> {
if self.count == 0 {
None
} else {
Some(self.max)
}
}
pub fn alpha(&self) -> f64 {
self.alpha
}
pub fn sum(&self) -> f64 {
self.sum
}
pub fn store_counts(&self) -> &[u64] {
self.store.counts.as_slice()
}
pub fn store_offset(&self) -> i32 {
self.store.offset
}
pub fn merge(&mut self, other: &DDSketch) {
debug_assert!((self.alpha - other.alpha).abs() < 1e-12);
debug_assert!((self.gamma - other.gamma).abs() < 1e-12);
if other.count == 0 {
return;
}
if self.count == 0 {
*self = other.clone();
return;
}
self.count += other.count;
self.sum += other.sum;
if other.min < self.min {
self.min = other.min;
}
if other.max > self.max {
self.max = other.max;
}
self.merge_buckets_from(other);
}
#[inline(always)]
fn key_for(&self, v: f64) -> i32 {
debug_assert!(v > 0.0);
(v.ln() * self.inv_log_gamma).floor() as i32
}
#[inline]
fn bin_representative(&self, k: i32) -> f64 {
self.gamma.powf(k as f64 + 0.5)
}
fn merge_buckets_from(&mut self, other: &DDSketch) {
if other.store.is_empty() {
return;
}
if self.store.is_empty() {
self.store = other.store.clone();
return;
}
let (self_l, self_r) = self.store.range().unwrap();
let (other_l, other_r) = other.store.range().unwrap();
let new_l = self_l.min(other_l);
let new_r = self_r.max(other_r);
let new_len = (new_r - new_l + 1) as usize;
let mut merged = vec![0u64; new_len];
for (i, &c) in self.store.counts.as_slice().iter().enumerate() {
let k = self_l + i as i32;
merged[(k - new_l) as usize] += c;
}
for (i, &c) in other.store.counts.as_slice().iter().enumerate() {
let k = other_l + i as i32;
merged[(k - new_l) as usize] += c;
}
self.store.counts = Vector1D::from_vec(merged);
self.store.offset = new_l;
}
}
impl Clone for DDSketch {
fn clone(&self) -> Self {
Self {
alpha: self.alpha,
gamma: self.gamma,
log_gamma: self.log_gamma,
inv_log_gamma: self.inv_log_gamma,
store: self.store.clone(),
count: self.count,
sum: self.sum,
min: self.min,
max: self.max,
}
}
}
impl DDSketch {
#[inline(always)]
pub fn add_input(&mut self, v: &DataInput) -> Result<(), &'static str> {
let value = data_input_to_f64(v).map_err(|_| "DDSketch only accepts numeric inputs")?;
self.add(&value);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
sample_exponential_f64, sample_normal_f64, sample_uniform_f64, sample_zipf_f64,
};
fn rel_err(a: f64, b: f64) -> f64 {
if a == 0.0 && b == 0.0 {
0.0
} else {
(a - b).abs() / f64::max(1e-30, b.abs())
}
}
fn true_quantile(sorted: &[f64], p: f64) -> f64 {
if sorted.is_empty() {
return f64::NAN;
}
if p <= 0.0 {
return sorted[0];
}
if p >= 1.0 {
return sorted[sorted.len() - 1];
}
let n = sorted.len();
let k = ((p * n as f64).ceil() as usize).clamp(1, n) - 1;
sorted[k]
}
#[test]
fn insert_and_query_basic() {
let mut s = DDSketch::new(0.01);
let vals = [0.0, -5.0, 1.0, 2.0, 3.0, 10.0, 50.0, 100.0, 1000.0];
for &v in &vals {
s.add(&v);
}
assert_eq!(s.get_count(), 7);
let ps = [0.0, 0.5, 0.9, 0.99, 1.0];
let mut prev = f64::NEG_INFINITY;
for &p in &ps {
let q = s.get_value_at_quantile(p).expect("quantile");
assert!(q >= prev - 1e-12, "non-monotone at p={p}: {q} < {prev}");
assert!(q <= s.max().unwrap() + 1e-12);
assert!(q >= s.min().unwrap() - 1e-12);
prev = q;
}
}
#[test]
fn empty_quantile_returns_none() {
let s = DDSketch::new(0.01);
assert!(s.get_value_at_quantile(0.5).is_none());
assert!(s.get_value_at_quantile(0.0).is_none());
assert!(s.get_value_at_quantile(1.0).is_none());
assert_eq!(s.get_count(), 0);
}
#[test]
fn dds_uniform_distribution_quantiles() {
const ALPHA: f64 = 0.01;
const QUANTILES: &[(f64, &str)] = &[
(0.0, "min"),
(0.10, "p10"),
(0.25, "p25"),
(0.50, "p50"),
(0.75, "p75"),
(0.90, "p90"),
(1.0, "max"),
];
fn build_dds_with_uniform(
alpha: f64,
n: usize,
min: f64,
max: f64,
seed: u64,
) -> (DDSketch, Vec<f64>) {
let mut vals = sample_uniform_f64(min, max, n, seed);
vals.retain(|v| v.is_finite() && *v > 0.0);
let mut sk = DDSketch::new(alpha);
for &x in &vals {
sk.add(&x);
}
(sk, vals)
}
fn assert_quantiles_within_error_dds(
sk: &DDSketch,
sorted_vals: &[f64],
qs: &[(f64, &str)],
tol: f64,
) {
for &(p, name) in qs {
let got = sk.get_value_at_quantile(p).expect("quantile");
let want = true_quantile(sorted_vals, p);
let err = rel_err(got, want);
assert!(
err <= tol,
"quantile {} (p={:.2}) relerr={:.4} got={} want={} tol={}",
name,
p,
err,
got,
want,
tol
);
}
}
for (idx, n) in [1_000usize, 5_000usize, 20_000usize]
.into_iter()
.enumerate()
{
let seed = 0xA5A5_0000_u64 + idx as u64;
let (sketch, mut values) =
build_dds_with_uniform(ALPHA, n, 1_000_000.0, 10_000_000.0, seed);
values.sort_by(|a, b| a.partial_cmp(b).unwrap());
assert_quantiles_within_error_dds(&sketch, &values, QUANTILES, ALPHA);
}
}
#[test]
fn dds_zipf_distribution_quantiles() {
const ALPHA: f64 = 0.01;
const QUANTILES: &[(f64, &str)] = &[
(0.0, "min"),
(0.10, "p10"),
(0.25, "p25"),
(0.50, "p50"),
(0.75, "p75"),
(0.90, "p90"),
(1.0, "max"),
];
fn build_dds_with_zipf(
alpha: f64,
n: usize,
min: f64,
max: f64,
domain: usize,
exponent: f64,
seed: u64,
) -> (DDSketch, Vec<f64>) {
let mut vals = sample_zipf_f64(min, max, domain, exponent, n, seed);
vals.retain(|v| v.is_finite() && *v > 0.0);
let mut sk = DDSketch::new(alpha);
for &x in &vals {
sk.add(&x);
}
(sk, vals)
}
fn assert_quantiles_within_error_dds(
sk: &DDSketch,
sorted_vals: &[f64],
qs: &[(f64, &str)],
tol: f64,
) {
for &(p, name) in qs {
let got = sk.get_value_at_quantile(p).expect("quantile");
let want = true_quantile(sorted_vals, p);
let err = rel_err(got, want);
assert!(
err <= tol,
"quantile {} (p={:.2}) relerr={:.4} got={} want={} tol={}",
name,
p,
err,
got,
want,
tol
);
}
}
for (idx, n) in [1_000usize, 5_000usize, 20_000usize]
.into_iter()
.enumerate()
{
let seed = 0xB4B4_0000_u64 + idx as u64;
let (sketch, mut values) =
build_dds_with_zipf(ALPHA, n, 1_000_000.0, 10_000_000.0, 8_192, 1.1, seed);
values.sort_by(|a, b| a.partial_cmp(b).unwrap());
assert_quantiles_within_error_dds(&sketch, &values, QUANTILES, ALPHA);
}
}
#[test]
fn dds_normal_distribution_quantiles() {
const ALPHA: f64 = 0.01;
const QUANTILES: &[(f64, &str)] = &[
(0.0, "min"),
(0.10, "p10"),
(0.25, "p25"),
(0.50, "p50"),
(0.75, "p75"),
(0.90, "p90"),
(1.0, "max"),
];
fn build_dds_with_normal(
alpha: f64,
n: usize,
mean: f64,
std: f64,
seed: u64,
) -> (DDSketch, Vec<f64>) {
let vals = sample_normal_f64(mean, std, n, seed)
.into_iter()
.filter(|v| v.is_finite() && *v > 0.0)
.collect::<Vec<_>>();
let mut sk = DDSketch::new(alpha);
for &x in &vals {
sk.add(&x);
}
(sk, vals)
}
fn assert_quantiles_within_error_dds(
sk: &DDSketch,
mut vals: Vec<f64>,
qs: &[(f64, &str)],
tol: f64,
) {
vals.sort_by(|a, b| a.partial_cmp(b).unwrap());
for &(p, name) in qs {
let got = sk.get_value_at_quantile(p).expect("quantile");
let want = true_quantile(&vals, p);
let err = rel_err(got, want);
assert!(
err <= tol,
"quantile {} (p={:.2}) relerr={:.4} got={} want={} tol={}",
name,
p,
err,
got,
want,
tol
);
}
}
let mean = 1_000.0;
let std = 100.0;
for (idx, n) in [1_000usize, 5_000usize, 20_000usize]
.into_iter()
.enumerate()
{
let seed = 0xC0DE_0000_u64 + idx as u64;
let (sketch, values) = build_dds_with_normal(ALPHA, n, mean, std, seed);
assert_quantiles_within_error_dds(&sketch, values, QUANTILES, ALPHA);
}
}
#[test]
fn dds_exponential_distribution_quantiles() {
const ALPHA: f64 = 0.01;
const LAMBDA: f64 = 1e-3; const QUANTILES: &[(f64, &str)] = &[
(0.0, "min"),
(0.10, "p10"),
(0.25, "p25"),
(0.50, "p50"),
(0.75, "p75"),
(0.90, "p90"),
(1.0, "max"),
];
fn build_dds_with_exponential(
alpha: f64,
n: usize,
lambda: f64,
seed: u64,
) -> (DDSketch, Vec<f64>) {
let vals = sample_exponential_f64(lambda, n, seed);
let mut sk = DDSketch::new(alpha);
for &x in &vals {
sk.add(&x);
}
(sk, vals)
}
fn assert_quantiles_within_error_dds(
sk: &DDSketch,
vals: &[f64],
qs: &[(f64, &str)],
tol: f64,
) {
let mut sorted = vals.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
for &(p, name) in qs {
let got = sk.get_value_at_quantile(p).expect("quantile");
let want = true_quantile(&sorted, p);
let err = rel_err(got, want);
assert!(
err <= tol + 1e-9,
"quantile {} (p={:.2}) relerr={:.4} got={} want={} tol={}",
name,
p,
err,
got,
want,
tol
);
}
}
for (idx, n) in [1_000usize, 5_000usize, 20_000usize]
.into_iter()
.enumerate()
{
let seed = 0xE3E3_0000_u64 + idx as u64;
let (sketch, values) = build_dds_with_exponential(ALPHA, n, LAMBDA, seed);
assert_quantiles_within_error_dds(&sketch, &values, QUANTILES, 0.011); }
}
#[test]
fn merge_two_sketches_combines_counts_and_bounds() {
const ALPHA: f64 = 0.01;
let mut s1 = DDSketch::new(ALPHA);
let mut s2 = DDSketch::new(ALPHA);
let vals1 = [1.0, 2.0, 3.0, 4.0];
let vals2 = [5.0, 10.0, 20.0];
for v in vals1 {
s1.add(&v);
}
for v in vals2 {
s2.add(&v);
}
s1.merge(&s2);
assert_eq!(s1.get_count(), (vals1.len() + vals2.len()) as u64);
assert_eq!(s1.min().unwrap(), 1.0);
assert_eq!(s1.max().unwrap(), 20.0);
assert_eq!(s1.get_value_at_quantile(0.0).unwrap(), 1.0);
assert_eq!(s1.get_value_at_quantile(1.0).unwrap(), 20.0);
let mid = s1.get_value_at_quantile(0.5).unwrap();
assert!((1.0..=20.0).contains(&mid));
}
#[test]
fn dds_serialization_round_trip() {
let mut s = DDSketch::new(0.01);
let vals = [1.0, 2.0, 3.0, 10.0, 50.0, 100.0, 1000.0];
for v in vals {
s.add(&v);
}
let encoded = s.serialize_to_bytes().expect("DDSketch serialization fail"); assert!(
!encoded.is_empty(),
"encoded bytes should not be empty for DDSketch"
);
let decoded =
DDSketch::deserialize_from_bytes(&encoded).expect("DDSketch deserialization fail");
assert_eq!(decoded.get_count(), s.get_count()); assert_eq!(decoded.min(), s.min()); assert_eq!(decoded.max(), s.max());
for q in [0.0, 0.1, 0.5, 0.9, 1.0] {
let a = s.get_value_at_quantile(q).unwrap();
let b = decoded.get_value_at_quantile(q).unwrap();
assert_eq!(a, b, "quantile mismatch at p={}", q);
}
}
}