use alloc::boxed::Box;
use alloc::vec;
use alloc::vec::Vec;
use crate::histogram::{BinEdges, BinningStrategy};
#[derive(Debug, Clone)]
struct GKTuple {
value: f64,
gap: u64,
delta: u64,
}
#[derive(Debug, Clone)]
pub struct QuantileBinning {
summary: Vec<GKTuple>,
count: u64,
epsilon: f64,
inserts_since_compress: u64,
}
impl QuantileBinning {
pub fn new() -> Self {
Self::with_epsilon(0.01)
}
pub fn with_epsilon(epsilon: f64) -> Self {
assert!(
epsilon > 0.0 && epsilon < 1.0,
"epsilon must be in (0.0, 1.0), got {epsilon}"
);
Self {
summary: Vec::new(),
count: 0,
epsilon,
inserts_since_compress: 0,
}
}
#[inline]
fn band_width(&self) -> u64 {
libm::floor(2.0 * self.epsilon * self.count as f64) as u64
}
#[inline]
fn compress_interval(&self) -> u64 {
libm::floor(1.0 / (2.0 * self.epsilon)) as u64
}
fn insert(&mut self, value: f64) {
self.count += 1;
if self.summary.is_empty() {
self.summary.push(GKTuple {
value,
gap: 1,
delta: 0,
});
self.inserts_since_compress += 1;
return;
}
let pos = self.summary.partition_point(|t| t.value < value);
let delta = if pos == 0 || pos == self.summary.len() {
0
} else {
self.band_width().saturating_sub(1)
};
self.summary.insert(
pos,
GKTuple {
value,
gap: 1,
delta,
},
);
self.inserts_since_compress += 1;
let interval = self.compress_interval().max(1);
if self.inserts_since_compress >= interval {
self.compress();
self.inserts_since_compress = 0;
}
}
fn compress(&mut self) {
if self.summary.len() < 3 {
return;
}
let threshold = self.band_width();
let mut i = self.summary.len() - 2;
while i >= 1 {
let merge_cost =
self.summary[i].gap + self.summary[i + 1].gap + self.summary[i + 1].delta;
if merge_cost <= threshold {
let absorbed_gap = self.summary[i].gap;
self.summary[i + 1].gap += absorbed_gap;
self.summary.remove(i);
}
if i == 0 {
break;
}
i -= 1;
}
}
pub fn quantile(&self, phi: f64) -> Option<f64> {
if self.summary.is_empty() {
return None;
}
let phi = phi.clamp(0.0, 1.0);
let target_rank = libm::ceil(phi * self.count as f64) as u64;
let tolerance = libm::floor(self.epsilon * self.count as f64) as u64;
let mut rank: u64 = 0;
let mut best_idx = 0;
for (i, tuple) in self.summary.iter().enumerate() {
rank += tuple.gap;
if rank + tuple.delta > target_rank + tolerance {
return Some(self.summary[best_idx].value);
}
best_idx = i;
}
Some(self.summary[best_idx].value)
}
#[inline]
pub fn count(&self) -> u64 {
self.count
}
#[inline]
pub fn summary_len(&self) -> usize {
self.summary.len()
}
}
impl Default for QuantileBinning {
fn default() -> Self {
Self::new()
}
}
impl BinningStrategy for QuantileBinning {
fn observe(&mut self, value: f64) {
self.insert(value);
}
fn compute_edges(&self, n_bins: usize) -> BinEdges {
if self.summary.is_empty() || n_bins <= 1 {
return BinEdges { edges: vec![] };
}
let mut edges = Vec::with_capacity(n_bins - 1);
for i in 1..n_bins {
let phi = i as f64 / n_bins as f64;
if let Some(val) = self.quantile(phi) {
if edges
.last()
.map_or(true, |&last: &f64| (val - last).abs() > f64::EPSILON)
{
edges.push(val);
}
}
}
BinEdges { edges }
}
fn reset(&mut self) {
self.summary.clear();
self.count = 0;
self.inserts_since_compress = 0;
}
fn clone_fresh(&self) -> Box<dyn BinningStrategy> {
Box::new(QuantileBinning::with_epsilon(self.epsilon))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_starts_empty() {
let q = QuantileBinning::new();
assert_eq!(q.count(), 0);
assert_eq!(q.summary_len(), 0);
assert!((q.epsilon - 0.01).abs() < f64::EPSILON);
}
#[test]
fn with_epsilon_stores_value() {
let q = QuantileBinning::with_epsilon(0.05);
assert!((q.epsilon - 0.05).abs() < f64::EPSILON);
}
#[test]
#[should_panic(expected = "epsilon must be in (0.0, 1.0)")]
fn with_epsilon_zero_panics() {
QuantileBinning::with_epsilon(0.0);
}
#[test]
#[should_panic(expected = "epsilon must be in (0.0, 1.0)")]
fn with_epsilon_one_panics() {
QuantileBinning::with_epsilon(1.0);
}
#[test]
#[should_panic(expected = "epsilon must be in (0.0, 1.0)")]
fn with_epsilon_negative_panics() {
QuantileBinning::with_epsilon(-0.1);
}
#[test]
fn single_value_quantile() {
let mut q = QuantileBinning::new();
q.observe(42.0);
assert_eq!(q.quantile(0.0), Some(42.0));
assert_eq!(q.quantile(0.5), Some(42.0));
assert_eq!(q.quantile(1.0), Some(42.0));
}
#[test]
fn empty_state_quantile_returns_none() {
let q = QuantileBinning::new();
assert_eq!(q.quantile(0.5), None);
}
#[test]
fn two_values() {
let mut q = QuantileBinning::new();
q.observe(10.0);
q.observe(20.0);
assert_eq!(q.count(), 2);
let med = q.quantile(0.5).unwrap();
assert!(med == 10.0 || med == 20.0);
}
#[test]
fn sequential_1000_median_close_to_500() {
let mut q = QuantileBinning::new(); for v in 0..1000 {
q.observe(v as f64);
}
let median = q.quantile(0.5).unwrap();
assert!(
(median - 500.0).abs() <= 10.0,
"median = {median}, expected ~500.0 (within +/-10)"
);
}
#[test]
fn sequential_1000_quartiles() {
let mut q = QuantileBinning::new();
for v in 0..1000 {
q.observe(v as f64);
}
let tolerance = 0.01 * 1000.0;
let q25 = q.quantile(0.25).unwrap();
assert!(
(q25 - 250.0).abs() <= tolerance,
"q25 = {q25}, expected ~250 (tol {tolerance})"
);
let q75 = q.quantile(0.75).unwrap();
assert!(
(q75 - 750.0).abs() <= tolerance,
"q75 = {q75}, expected ~750 (tol {tolerance})"
);
}
#[test]
fn reverse_insertion_median() {
let mut q = QuantileBinning::new();
for v in (0..1000).rev() {
q.observe(v as f64);
}
let median = q.quantile(0.5).unwrap();
assert!(
(median - 500.0).abs() <= 10.0,
"median = {median}, expected ~500.0 (within +/-10)"
);
}
#[test]
fn compression_keeps_summary_bounded() {
let mut q = QuantileBinning::new(); for v in 0..10_000 {
q.observe(v as f64);
}
let len = q.summary_len();
assert!(
len < 2000,
"summary length {len} should be much less than 10000"
);
assert!(
len < 10_000 / 3,
"summary length {len} should be at most ~1/3 of N"
);
}
#[test]
fn compression_large_epsilon() {
let mut q = QuantileBinning::with_epsilon(0.1);
for v in 0..10_000 {
q.observe(v as f64);
}
let len = q.summary_len();
assert!(
len < 500,
"with epsilon=0.1, summary length {len} should be < 500"
);
}
#[test]
fn compute_edges_4_bins_roughly_quartiles() {
let mut q = QuantileBinning::new();
for v in 0..1000 {
q.observe(v as f64);
}
let edges = q.compute_edges(4);
assert!(!edges.edges.is_empty(), "should produce at least one edge");
assert!(
edges.edges.len() <= 3,
"4 bins -> at most 3 edges, got {}",
edges.edges.len()
);
let tolerance = 0.01 * 1000.0;
if edges.edges.len() == 3 {
assert!(
(edges.edges[0] - 250.0).abs() <= tolerance,
"first edge {} should be ~250",
edges.edges[0]
);
assert!(
(edges.edges[1] - 500.0).abs() <= tolerance,
"second edge {} should be ~500",
edges.edges[1]
);
assert!(
(edges.edges[2] - 750.0).abs() <= tolerance,
"third edge {} should be ~750",
edges.edges[2]
);
}
}
#[test]
fn compute_edges_sorted() {
let mut q = QuantileBinning::new();
for v in 0..500 {
q.observe(v as f64);
}
let edges = q.compute_edges(10);
for i in 1..edges.edges.len() {
assert!(
edges.edges[i] > edges.edges[i - 1],
"edges must be strictly increasing: edges[{}]={} <= edges[{}]={}",
i,
edges.edges[i],
i - 1,
edges.edges[i - 1]
);
}
}
#[test]
fn compute_edges_empty_sketch() {
let q = QuantileBinning::new();
let edges = q.compute_edges(4);
assert!(edges.edges.is_empty());
}
#[test]
fn compute_edges_single_bin() {
let mut q = QuantileBinning::new();
q.observe(1.0);
q.observe(2.0);
let edges = q.compute_edges(1);
assert!(edges.edges.is_empty(), "1 bin means 0 edges");
}
#[test]
fn compute_edges_deduplicates_identical_values() {
let mut q = QuantileBinning::new();
for _ in 0..1000 {
q.observe(7.0);
}
let edges = q.compute_edges(4);
assert!(
edges.edges.len() <= 1,
"all-same input should dedup to <=1 edge, got {}",
edges.edges.len()
);
}
#[test]
fn compute_edges_deduplicates_two_clusters() {
let mut q = QuantileBinning::new();
for _ in 0..500 {
q.observe(1.0);
}
for _ in 0..500 {
q.observe(100.0);
}
let edges = q.compute_edges(10);
for i in 1..edges.edges.len() {
assert!(
(edges.edges[i] - edges.edges[i - 1]).abs() > f64::EPSILON,
"adjacent edges should not be equal"
);
}
}
#[test]
fn error_within_epsilon_005() {
let mut q = QuantileBinning::with_epsilon(0.05);
let n = 2000u64;
for v in 0..n {
q.observe(v as f64);
}
let tolerance = 0.05 * n as f64;
for &phi in &[0.1, 0.25, 0.5, 0.75, 0.9] {
let result = q.quantile(phi).unwrap();
let expected = phi * n as f64;
assert!(
(result - expected).abs() <= tolerance,
"phi={phi}: result={result}, expected~{expected}, tolerance={tolerance}"
);
}
}
#[test]
fn error_within_epsilon_001_large_n() {
let mut q = QuantileBinning::with_epsilon(0.01);
let n = 5000u64;
for v in 0..n {
q.observe(v as f64);
}
let tolerance = 0.01 * n as f64;
for &phi in &[0.1, 0.25, 0.5, 0.75, 0.9] {
let result = q.quantile(phi).unwrap();
let expected = phi * n as f64;
assert!(
(result - expected).abs() <= tolerance,
"phi={phi}: result={result}, expected~{expected}, tolerance={tolerance}"
);
}
}
#[test]
fn quantile_zero_returns_minimum() {
let mut q = QuantileBinning::new();
for v in 10..110 {
q.observe(v as f64);
}
let min_approx = q.quantile(0.0).unwrap();
assert!(
(min_approx - 10.0).abs() <= 1.0,
"quantile(0.0) = {min_approx}, expected ~10.0"
);
}
#[test]
fn quantile_one_returns_maximum() {
let mut q = QuantileBinning::new();
for v in 10..110 {
q.observe(v as f64);
}
let max_approx = q.quantile(1.0).unwrap();
assert!(
(max_approx - 109.0).abs() <= 1.0,
"quantile(1.0) = {max_approx}, expected ~109.0"
);
}
#[test]
fn reset_clears_state() {
let mut q = QuantileBinning::new();
for v in 0..100 {
q.observe(v as f64);
}
assert_eq!(q.count(), 100);
assert!(q.summary_len() > 0);
q.reset();
assert_eq!(q.count(), 0);
assert_eq!(q.summary_len(), 0);
assert_eq!(q.quantile(0.5), None);
}
#[test]
fn clone_fresh_preserves_epsilon() {
let mut q = QuantileBinning::with_epsilon(0.05);
for v in 0..100 {
q.observe(v as f64);
}
let fresh = q.clone_fresh();
let edges = fresh.compute_edges(4);
assert!(edges.edges.is_empty(), "fresh instance should be empty");
}
#[test]
fn find_bin_with_quantile_edges() {
let mut q = QuantileBinning::new();
for v in 0..1000 {
q.observe(v as f64);
}
let edges = q.compute_edges(4);
assert_eq!(edges.find_bin(0.0), 0);
assert_eq!(edges.find_bin(999.0), edges.n_bins() - 1);
}
#[test]
fn shuffled_input_median() {
let mut q = QuantileBinning::new();
let n = 1000u64;
let stride = 397; let mut val = 0u64;
for _ in 0..n {
q.observe(val as f64);
val = (val + stride) % n;
}
let median = q.quantile(0.5).unwrap();
let tolerance = 0.01 * n as f64;
assert!(
(median - 500.0).abs() <= tolerance,
"shuffled median = {median}, expected ~500 (tol {tolerance})"
);
}
#[test]
fn negative_range_quantiles() {
let mut q = QuantileBinning::new();
for v in -500..500 {
q.observe(v as f64);
}
let median = q.quantile(0.5).unwrap();
let tolerance = 0.01 * 1000.0;
assert!(
median.abs() <= tolerance,
"median = {median}, expected ~0.0 (tol {tolerance})"
);
}
#[test]
fn floating_point_values() {
let mut q = QuantileBinning::new();
for i in 0..1000 {
q.observe(i as f64 * 0.001); }
let median = q.quantile(0.5).unwrap();
let tolerance = 0.01 * 1.0; assert!(
(median - 0.5).abs() <= tolerance,
"median = {median}, expected ~0.5 (tol {tolerance})"
);
}
#[test]
fn many_bins_fine_grained() {
let mut q = QuantileBinning::with_epsilon(0.001);
for v in 0..10_000 {
q.observe(v as f64);
}
let edges = q.compute_edges(100);
assert!(
edges.edges.len() >= 90,
"100 bins with 10000 distinct values (eps=0.001) should give ~99 edges, got {}",
edges.edges.len()
);
for i in 1..edges.edges.len() {
assert!(
edges.edges[i] > edges.edges[i - 1],
"edges must be strictly increasing"
);
}
}
}