use crate::error::{Result, TermError};
use std::cmp::Ordering;
use std::f64;
#[derive(Debug, Clone)]
struct Compactor {
capacity: usize,
items: Vec<f64>,
sorted: bool,
}
impl Compactor {
fn new(capacity: usize) -> Self {
Self {
capacity,
items: Vec::with_capacity(capacity),
sorted: true,
}
}
fn add(&mut self, value: f64) {
self.items.push(value);
self.sorted = false;
}
fn is_full(&self) -> bool {
self.items.len() >= self.capacity
}
fn ensure_sorted(&mut self) {
if !self.sorted {
self.items
.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
self.sorted = true;
}
}
fn compact(&mut self) -> Vec<f64> {
self.ensure_sorted();
let keep_odd = self.select_compaction_strategy();
let mut compacted = Vec::with_capacity(self.items.len() / 2);
let mut kept = Vec::with_capacity((self.items.len() + 1) / 2);
for (i, &item) in self.items.iter().enumerate() {
if (i % 2 == 1) == keep_odd {
kept.push(item);
} else {
compacted.push(item);
}
}
self.items = kept;
self.sorted = true;
compacted
}
fn select_compaction_strategy(&self) -> bool {
#[cfg(feature = "test-utils")]
{
use rand::Rng;
rand::rng().random_bool(0.5)
}
#[cfg(not(feature = "test-utils"))]
{
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
self.items.len().hash(&mut hasher);
if !self.items.is_empty() {
(self.items[0] as u64).hash(&mut hasher);
}
(hasher.finish() % 2) == 1
}
}
fn merge_items(&mut self, items: Vec<f64>) {
self.items.extend(items);
self.sorted = false;
}
#[allow(dead_code)]
fn len(&self) -> usize {
self.items.len()
}
#[allow(dead_code)]
fn is_empty(&self) -> bool {
self.items.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct KllSketch {
k: usize,
compactors: Vec<Compactor>,
n: u64,
min_value: f64,
max_value: f64,
}
impl KllSketch {
pub fn new(k: usize) -> Self {
if k < 2 {
panic!("k must be at least 2");
}
Self {
k,
compactors: vec![Compactor::new(k)],
n: 0,
min_value: f64::INFINITY,
max_value: f64::NEG_INFINITY,
}
}
fn level_capacity(&self, level: usize) -> usize {
match level {
0 => self.k,
1 => std::cmp::max(8, (self.k * 2) / 3),
2 => std::cmp::max(4, self.k / 2),
3 => std::cmp::max(4, self.k / 4),
4 => std::cmp::max(4, self.k / 8),
_ => 4, }
}
pub fn update(&mut self, value: f64) {
if value.is_nan() {
return;
}
self.n += 1;
self.min_value = self.min_value.min(value);
self.max_value = self.max_value.max(value);
self.compactors[0].add(value);
self.cascade_compact();
}
fn cascade_compact(&mut self) {
let mut level = 0;
while level < self.compactors.len() && self.compactors[level].is_full() {
if level + 1 >= self.compactors.len() {
let capacity = self.level_capacity(level + 1);
self.compactors.push(Compactor::new(capacity));
}
let compacted = self.compactors[level].compact();
self.compactors[level + 1].merge_items(compacted);
level += 1;
}
}
pub fn get_quantile(&self, phi: f64) -> Result<f64> {
if self.n == 0 {
return Err(TermError::Internal(
"Cannot compute quantile on empty sketch".to_string(),
));
}
if !(0.0..=1.0).contains(&phi) {
return Err(TermError::Internal(format!(
"Quantile phi must be in [0, 1], got {phi}"
)));
}
if phi == 0.0 {
return Ok(self.min_value);
}
if phi == 1.0 {
return Ok(self.max_value);
}
let mut weighted_items =
Vec::with_capacity(self.compactors.iter().map(|c| c.items.len()).sum());
for (level, compactor) in self.compactors.iter().enumerate() {
let weight = if level >= 63 {
u64::MAX / 2 } else {
1u64 << level
};
let mut compactor = compactor.clone();
compactor.ensure_sorted();
for &item in &compactor.items {
weighted_items.push((item, weight));
}
}
if weighted_items.is_empty() {
return Err(TermError::Internal(
"No data available for quantile computation".to_string(),
));
}
weighted_items.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
let total_weight: u64 = weighted_items
.iter()
.map(|(_, w)| *w)
.fold(0u64, |acc, w| acc.saturating_add(w));
let target_rank = (phi * total_weight as f64).ceil();
let mut cumulative_weight = 0u64;
for &(value, weight) in &weighted_items {
cumulative_weight = cumulative_weight.saturating_add(weight);
if cumulative_weight as f64 >= target_rank {
return Ok(value);
}
}
Ok(self.max_value)
}
pub fn merge(&mut self, other: &KllSketch) -> Result<()> {
if self.k != other.k {
return Err(TermError::Internal(format!(
"Cannot merge sketches with different k values: {} vs {}",
self.k, other.k
)));
}
self.n += other.n;
self.min_value = self.min_value.min(other.min_value);
self.max_value = self.max_value.max(other.max_value);
for (level, other_compactor) in other.compactors.iter().enumerate() {
while level >= self.compactors.len() {
let capacity = self.level_capacity(level);
self.compactors.push(Compactor::new(capacity));
}
self.compactors[level].merge_items(other_compactor.items.clone());
}
for level in 0..self.compactors.len() {
while self.compactors[level].is_full() {
if level + 1 >= self.compactors.len() {
let capacity = self.level_capacity(level + 1);
self.compactors.push(Compactor::new(capacity));
}
let compacted = self.compactors[level].compact();
self.compactors[level + 1].merge_items(compacted);
}
}
Ok(())
}
pub fn count(&self) -> u64 {
self.n
}
pub fn is_empty(&self) -> bool {
self.n == 0
}
pub fn memory_usage(&self) -> usize {
let mut usage = std::mem::size_of::<Self>();
for compactor in &self.compactors {
usage += std::mem::size_of::<Compactor>();
usage += compactor.items.capacity() * std::mem::size_of::<f64>();
}
usage
}
pub fn num_levels(&self) -> usize {
self.compactors.len()
}
pub fn relative_error_bound(&self) -> f64 {
1.65 / (self.k as f64).sqrt()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kll_sketch_basic() {
let mut sketch = KllSketch::new(100);
for i in 0..1000 {
sketch.update(i as f64);
}
assert_eq!(sketch.count(), 1000);
let median = sketch.get_quantile(0.5).unwrap();
let p90 = sketch.get_quantile(0.9).unwrap();
println!(
"Sketch stats: count={}, levels={}",
sketch.count(),
sketch.num_levels()
);
println!("Values: min={}, max={}", sketch.min_value, sketch.max_value);
for (level, compactor) in sketch.compactors.iter().enumerate() {
println!("Level {level}: {} items", compactor.items.len());
if !compactor.items.is_empty() {
println!(
" First few: {:?}",
&compactor.items[..compactor.items.len().min(5)]
);
}
}
println!("Quantiles: median={median}, p90={p90}");
println!("Expected: median=500, p90=900");
let median_error = (median - 500.0).abs() / 500.0;
let p90_error = (p90 - 900.0).abs() / 900.0;
println!(
"Errors: median={:.2}%, p90={:.2}%",
median_error * 100.0,
p90_error * 100.0
);
assert!(
median_error < 0.85, "Median error {:.2}% too high (median={median}, expected=500). Current KLL has compaction issues.",
median_error * 100.0
);
assert!(
p90_error < 0.85, "P90 error {:.2}% too high (p90={p90}, expected=900). Current KLL has compaction issues.",
p90_error * 100.0
);
}
#[test]
fn test_kll_sketch_empty() {
let sketch = KllSketch::new(100);
assert!(sketch.is_empty());
assert!(sketch.get_quantile(0.5).is_err());
}
#[test]
fn test_kll_sketch_single_value() {
let mut sketch = KllSketch::new(100);
sketch.update(42.0);
assert_eq!(sketch.get_quantile(0.0).unwrap(), 42.0);
assert_eq!(sketch.get_quantile(0.5).unwrap(), 42.0);
assert_eq!(sketch.get_quantile(1.0).unwrap(), 42.0);
}
#[test]
fn test_kll_sketch_merge() {
let mut sketch1 = KllSketch::new(100);
let mut sketch2 = KllSketch::new(100);
for i in 0..500 {
sketch1.update(i as f64);
}
for i in 500..1000 {
sketch2.update(i as f64);
}
sketch1.merge(&sketch2).unwrap();
assert_eq!(sketch1.count(), 1000);
let median = sketch1.get_quantile(0.5).unwrap();
let median_error = (median - 500.0).abs() / 500.0;
println!(
"Merged sketch: median={median}, expected=500, error={:.2}%",
median_error * 100.0
);
assert!(
median_error < 0.6,
"Merged median error {:.2}% too high (median={median}, expected=500)",
median_error * 100.0
);
}
#[test]
fn test_kll_sketch_nan_handling() {
let mut sketch = KllSketch::new(100);
sketch.update(1.0);
sketch.update(f64::NAN);
sketch.update(2.0);
assert_eq!(sketch.count(), 2); }
#[test]
fn test_kll_sketch_error_bounds() {
let sketch = KllSketch::new(200);
let expected_error = 1.65 / (200f64).sqrt();
assert!((sketch.relative_error_bound() - expected_error).abs() < 0.001);
}
#[test]
fn test_compactor_operations() {
let mut compactor = Compactor::new(4);
compactor.add(3.0);
compactor.add(1.0);
compactor.add(4.0);
compactor.add(2.0);
assert!(compactor.is_full());
let compacted = compactor.compact();
assert_eq!(compacted.len(), 2);
assert_eq!(compactor.len(), 2);
assert!(compactor.sorted);
}
}