use crate::store::Store;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum DDSketchError {
InvalidAlpha,
InvalidQuantile,
AlphaMismatch,
BinCountMismatch,
}
impl std::fmt::Display for DDSketchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DDSketchError::InvalidAlpha => write!(f, "Alpha must be in range (0, 1)"),
DDSketchError::InvalidQuantile => write!(f, "Quantile must be in range [0, 1]"),
DDSketchError::AlphaMismatch => {
write!(f, "Cannot merge sketches with different alpha values")
}
DDSketchError::BinCountMismatch => write!(f, "Bin count exceeds maximum allowed"),
}
}
}
impl std::error::Error for DDSketchError {}
#[cfg(feature = "serde")]
fn serialize_f64_option<S>(value: &f64, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if value.is_infinite() {
serializer.serialize_none()
} else if value.is_nan() {
serializer.serialize_none()
} else {
serializer.serialize_some(value)
}
}
#[cfg(feature = "serde")]
fn deserialize_min_f64<'de, D>(deserializer: D) -> Result<f64, D::Error>
where
D: serde::Deserializer<'de>,
{
let opt: Option<f64> = serde::Deserialize::deserialize(deserializer)?;
Ok(opt.unwrap_or(f64::INFINITY))
}
#[cfg(feature = "serde")]
fn deserialize_max_f64<'de, D>(deserializer: D) -> Result<f64, D::Error>
where
D: serde::Deserializer<'de>,
{
let opt: Option<f64> = serde::Deserialize::deserialize(deserializer)?;
Ok(opt.unwrap_or(f64::NEG_INFINITY))
}
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(deny_unknown_fields))]
pub struct DDSketch {
alpha: f64,
gamma: f64,
inv_ln_gamma: f64,
offset: i32, min_indexable_value: f64,
positive_store: Store,
negative_store: Store,
zero_count: u64,
sum: f64,
#[cfg_attr(
feature = "serde",
serde(
serialize_with = "serialize_f64_option",
deserialize_with = "deserialize_min_f64"
)
)]
min: f64,
#[cfg_attr(
feature = "serde",
serde(
serialize_with = "serialize_f64_option",
deserialize_with = "deserialize_max_f64"
)
)]
max: f64,
max_bins: usize,
}
impl DDSketch {
pub fn new(alpha: f64) -> Result<Self, DDSketchError> {
if !alpha.is_finite() || alpha <= 0.0 || alpha >= 1.0 {
return Err(DDSketchError::InvalidAlpha);
}
let gamma = 1.0 + (2.0 * alpha) / (1.0 - alpha);
let gamma_ln = ((2.0 * alpha) / (1.0 - alpha)).ln_1p();
let inv_ln_gamma = 1.0 / gamma_ln;
let offset = 0i32;
let min_indexable_from_range = ((i32::MIN - offset) as f64 / inv_ln_gamma + 1.0).exp();
let min_indexable_from_normal = f64::MIN_POSITIVE * gamma;
let min_indexable_value = min_indexable_from_range.max(min_indexable_from_normal);
let max_bins = 4096;
Ok(Self {
alpha,
gamma,
inv_ln_gamma,
offset,
min_indexable_value,
positive_store: Store::new(max_bins),
negative_store: Store::new(max_bins),
zero_count: 0,
sum: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
max_bins,
})
}
#[inline]
pub fn key(&self, value: f64) -> i32 {
crate::mapping::value_to_key_i32(value, self.inv_ln_gamma)
}
#[inline]
pub fn min_possible(&self) -> f64 {
self.min_indexable_value
}
#[inline]
pub fn value(&self, key: i32) -> f64 {
crate::mapping::key_to_value_i32(key, self.gamma, self.inv_ln_gamma.recip())
}
#[inline]
pub fn add(&mut self, value: f64) {
if !value.is_finite() {
return;
}
if value >= self.min_indexable_value {
let key = (value.ln() * self.inv_ln_gamma).ceil() as i32;
self.positive_store.add(key);
} else if value <= -self.min_indexable_value {
let key = ((-value).ln() * self.inv_ln_gamma).ceil() as i32;
self.negative_store.add(key);
} else {
self.zero_count += 1;
}
self.sum += value;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value;
}
}
pub fn merge(&mut self, other: &Self) -> Result<(), DDSketchError> {
if (self.gamma - other.gamma).abs() > 1e-10 {
return Err(DDSketchError::AlphaMismatch);
}
if other.count() == 0 {
return Ok(());
}
self.positive_store.merge(&other.positive_store);
self.negative_store.merge(&other.negative_store);
self.zero_count += other.zero_count;
self.sum += other.sum;
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
Ok(())
}
#[inline]
pub fn count(&self) -> u64 {
self.positive_store.count() + self.negative_store.count() + self.zero_count
}
#[inline]
pub fn len(&self) -> usize {
self.count() as usize
}
#[inline]
pub fn is_empty(&self) -> bool {
self.count() == 0
}
#[inline]
pub fn sum(&self) -> f64 {
self.sum
}
#[inline]
pub fn mean(&self) -> f64 {
if self.count() == 0 {
0.0
} else {
self.sum / (self.count() as f64)
}
}
#[cfg(test)]
pub fn zero_count(&self) -> u64 {
self.zero_count
}
#[cfg(test)]
pub fn min_indexable_value(&self) -> f64 {
self.min_indexable_value
}
#[cfg(test)]
pub fn debug_key_to_value(&self, key: i64) -> f64 {
self.value(key as i32)
}
#[cfg(test)]
pub fn bins(&self) -> Vec<u64> {
let mut combined_bins = Vec::new();
if !self.negative_store.is_empty() {
for _i in 0..self.negative_store.length() {
combined_bins.push(0); }
}
if !self.positive_store.is_empty() {
for _i in 0..self.positive_store.length() {
combined_bins.push(0); }
}
combined_bins
}
#[inline]
pub fn min(&self) -> f64 {
if self.count() == 0 {
return f64::INFINITY;
}
self.quantile(0.0).unwrap_or(f64::INFINITY)
}
#[inline]
pub fn max(&self) -> f64 {
if self.count() == 0 {
return f64::NEG_INFINITY;
}
self.quantile(1.0).unwrap_or(f64::NEG_INFINITY)
}
#[inline]
pub fn alpha(&self) -> f64 {
(self.gamma - 1.0) / (self.gamma + 1.0)
}
pub fn clear(&mut self) {
self.sum = 0.0;
self.min = f64::INFINITY;
self.max = f64::NEG_INFINITY;
self.positive_store = Store::new(self.max_bins);
self.negative_store = Store::new(self.max_bins);
self.zero_count = 0;
}
pub fn quantile(&self, q: f64) -> Result<f64, DDSketchError> {
if !(0.0..=1.0).contains(&q) {
return Err(DDSketchError::InvalidQuantile);
}
let total_count = self.count();
if total_count == 0 {
return Ok(0.0);
}
if total_count == 1 {
return Ok(self.min); }
let rank = (q * (total_count as f64 - 1.0)) as u64;
let neg_count = self.negative_store.count();
let zero_count = self.zero_count;
if rank < neg_count {
let reversed_rank = neg_count - 1 - rank;
let key = self.negative_store.key_at_rank(reversed_rank);
Ok(-self.value(key)) } else if rank < zero_count + neg_count {
Ok(0.0)
} else {
let positive_rank = rank - zero_count - neg_count;
let key = self.positive_store.key_at_rank(positive_rank);
Ok(self.value(key))
}
}
pub fn quantile_opt(&self, q: f64) -> Result<Option<f64>, DDSketchError> {
if !(0.0..=1.0).contains(&q) {
return Err(DDSketchError::InvalidQuantile);
}
if self.count() == 0 {
return Ok(None);
}
Ok(Some(self.quantile(q)?))
}
pub fn positive_store_count(&self) -> u64 {
self.positive_store.count()
}
pub fn get_zero_count(&self) -> u64 {
self.zero_count
}
pub fn negative_store_count(&self) -> u64 {
self.negative_store.count()
}
pub fn positive_key_at_rank(&self, rank: u64) -> i32 {
self.positive_store.key_at_rank(rank)
}
pub fn percentiles(&self) -> Option<(f64, f64, f64, f64)> {
if self.count() == 0 {
return None;
}
Some((
self.quantile(0.5).unwrap(),
self.quantile(0.9).unwrap(),
self.quantile(0.95).unwrap(),
self.quantile(0.99).unwrap(),
))
}
#[inline]
pub fn add_batch<I>(&mut self, values: I)
where
I: IntoIterator<Item = f64>,
{
for value in values {
self.add(value);
}
}
}
impl Default for DDSketch {
fn default() -> Self {
Self::new(0.01).expect("Default alpha should be valid")
}
}
impl std::fmt::Display for DDSketch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DDSketch(count={}, alpha={:.3}, min={:.3}, max={:.3}, mean={:.3})",
self.count(),
self.alpha(),
self.min,
self.max,
self.mean()
)
}
}
impl FromIterator<f64> for DDSketch {
fn from_iter<T: IntoIterator<Item = f64>>(iter: T) -> Self {
let mut sketch = Self::default();
sketch.extend(iter);
sketch
}
}
impl Extend<f64> for DDSketch {
fn extend<T: IntoIterator<Item = f64>>(&mut self, iter: T) {
for value in iter {
self.add(value);
}
}
}
pub struct DDSketchBuilder {
alpha: f64,
max_bins: Option<usize>,
}
impl DDSketchBuilder {
pub fn new(alpha: f64) -> Self {
Self {
alpha,
max_bins: None,
}
}
pub fn max_bins(mut self, max_bins: usize) -> Self {
self.max_bins = Some(max_bins);
self
}
pub fn build(self) -> Result<DDSketch, DDSketchError> {
if !self.alpha.is_finite() || self.alpha <= 0.0 || self.alpha >= 1.0 {
return Err(DDSketchError::InvalidAlpha);
}
let gamma = 1.0 + (2.0 * self.alpha) / (1.0 - self.alpha);
let gamma_ln = ((2.0 * self.alpha) / (1.0 - self.alpha)).ln_1p();
let inv_ln_gamma = 1.0 / gamma_ln;
let offset = 0i32;
let min_indexable_from_range = ((i32::MIN - offset) as f64 / inv_ln_gamma + 1.0).exp();
let min_indexable_from_normal = f64::MIN_POSITIVE * gamma;
let min_indexable_value = min_indexable_from_range.max(min_indexable_from_normal);
let max_bins = self.max_bins.unwrap_or(4096);
Ok(DDSketch {
alpha: self.alpha,
gamma,
inv_ln_gamma,
offset,
min_indexable_value,
positive_store: Store::new(max_bins),
negative_store: Store::new(max_bins),
zero_count: 0,
sum: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
max_bins,
})
}
}
impl DDSketch {
pub fn builder(alpha: f64) -> DDSketchBuilder {
DDSketchBuilder::new(alpha)
}
pub fn with_max_bins(alpha: f64, max_bins: usize) -> Result<Self, DDSketchError> {
Self::builder(alpha).max_bins(max_bins).build()
}
}