use crate::{SketchError, splitmix64};
#[derive(Debug, Clone)]
pub struct KllSketch {
k: usize,
levels: Vec<Vec<f64>>,
count: u64,
rng_state: u64,
}
impl KllSketch {
pub fn new(k: usize) -> Result<Self, SketchError> {
if k < 2 {
return Err(SketchError::InvalidParameter(
"k must be greater than or equal to 2",
));
}
Ok(Self {
k,
levels: vec![Vec::new()],
count: 0,
rng_state: 0xD1B5_4A32_C192_ED03,
})
}
pub fn with_error_rate(rank_error: f64) -> Result<Self, SketchError> {
if !rank_error.is_finite() || rank_error <= 0.0 || rank_error >= 1.0 {
return Err(SketchError::InvalidParameter(
"rank_error must be finite and strictly between 0 and 1",
));
}
let k = (2.0 / rank_error).ceil() as usize;
Self::new(k.max(2))
}
pub fn k(&self) -> usize {
self.k
}
pub fn count(&self) -> u64 {
self.count
}
pub fn is_empty(&self) -> bool {
self.count == 0
}
pub fn add(&mut self, value: f64) {
if !value.is_finite() {
return;
}
self.levels[0].push(value);
self.count = self.count.saturating_add(1);
self.compact_all_levels();
}
pub fn quantile(&self, q: f64) -> Result<f64, SketchError> {
if !q.is_finite() || !(0.0..=1.0).contains(&q) {
return Err(SketchError::InvalidParameter(
"q must be finite and in [0, 1]",
));
}
if self.count == 0 {
return Err(SketchError::InvalidParameter(
"quantile is undefined for an empty sketch",
));
}
let mut weighted_values = Vec::new();
for (level, values) in self.levels.iter().enumerate() {
let weight = 1_u64.checked_shl(level as u32).unwrap_or(u64::MAX);
for &value in values {
weighted_values.push((value, weight));
}
}
weighted_values.sort_unstable_by(|left, right| left.0.total_cmp(&right.0));
let total_weight: u128 = weighted_values
.iter()
.map(|(_, weight)| *weight as u128)
.sum();
let target_rank = ((total_weight.saturating_sub(1)) as f64 * q).round() as u128;
let mut cumulative = 0_u128;
for (value, weight) in weighted_values {
cumulative = cumulative.saturating_add(weight as u128);
if cumulative > target_rank {
return Ok(value);
}
}
Err(SketchError::InvalidParameter(
"unable to compute quantile from current state",
))
}
pub fn merge(&mut self, other: &Self) -> Result<(), SketchError> {
if self.k != other.k {
return Err(SketchError::IncompatibleSketches(
"k must match for merge",
));
}
if self.levels.len() < other.levels.len() {
self.levels.resize_with(other.levels.len(), Vec::new);
}
for (level, values) in other.levels.iter().enumerate() {
self.levels[level].extend(values.iter().copied());
}
self.count = self.count.saturating_add(other.count);
self.compact_all_levels();
Ok(())
}
pub fn clear(&mut self) {
self.levels.clear();
self.levels.push(Vec::new());
self.count = 0;
}
fn level_capacity(&self, level: usize) -> usize {
let decay = 0.75_f64.powi(level as i32);
(self.k as f64 * decay).ceil().max(2.0) as usize
}
fn compact_all_levels(&mut self) {
let mut level = 0;
while level < self.levels.len() {
let capacity = self.level_capacity(level);
if self.levels[level].len() > capacity {
self.compact_level(level);
}
level += 1;
}
}
fn compact_level(&mut self, level: usize) {
if level + 1 == self.levels.len() {
self.levels.push(Vec::new());
}
let mut values = std::mem::take(&mut self.levels[level]);
values.sort_unstable_by(f64::total_cmp);
let carry = if values.len() % 2 == 1 {
values.pop()
} else {
None
};
let offset = self.next_u64() as usize & 1;
for index in (offset..values.len()).step_by(2) {
self.levels[level + 1].push(values[index]);
}
if let Some(value) = carry {
self.levels[level].push(value);
}
}
fn next_u64(&mut self) -> u64 {
self.rng_state = splitmix64(self.rng_state.wrapping_add(0x9E37_79B9_7F4A_7C15));
self.rng_state
}
}
#[cfg(test)]
mod tests {
use super::KllSketch;
#[test]
fn constructor_validates_k() {
assert!(KllSketch::new(1).is_err());
assert!(KllSketch::new(2).is_ok());
}
#[test]
fn quantile_rejects_empty_sketch() {
let kll = KllSketch::new(64).unwrap();
assert!(kll.quantile(0.5).is_err());
}
#[test]
fn median_estimate_is_reasonable() {
let mut kll = KllSketch::new(200).unwrap();
for value in 0_u64..10_000 {
kll.add(value as f64);
}
let p50 = kll.quantile(0.5).unwrap();
assert!(p50 > 4_300.0 && p50 < 5_700.0, "p50={p50}");
}
#[test]
fn quantiles_are_monotonic() {
let mut kll = KllSketch::new(128).unwrap();
for value in 0_u64..20_000 {
kll.add(value as f64);
}
let p50 = kll.quantile(0.5).unwrap();
let p90 = kll.quantile(0.9).unwrap();
assert!(p50 <= p90, "p50={p50} p90={p90}");
}
#[test]
fn merge_combines_streams() {
let mut left = KllSketch::new(160).unwrap();
let mut right = KllSketch::new(160).unwrap();
for value in 0_u64..10_000 {
left.add(value as f64);
}
for value in 10_000_u64..20_000 {
right.add(value as f64);
}
left.merge(&right).unwrap();
let p95 = left.quantile(0.95).unwrap();
assert!(p95 > 17_000.0 && p95 < 20_000.0, "p95={p95}");
}
#[test]
fn merge_rejects_different_k() {
let mut left = KllSketch::new(100).unwrap();
let right = KllSketch::new(101).unwrap();
assert!(left.merge(&right).is_err());
}
#[test]
fn clear_resets_state() {
let mut kll = KllSketch::new(128).unwrap();
kll.add(1.0);
kll.add(2.0);
kll.clear();
assert!(kll.is_empty());
assert!(kll.quantile(0.5).is_err());
}
}