use serde::{Deserialize, Serialize};
use crate::sketches::Sketch;
use crate::{Error, Result};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct EquiDepthHistogram {
boundaries: Vec<f64>,
counts: Vec<u64>,
total: u64,
}
const MIN_BUCKETS: usize = 2;
impl EquiDepthHistogram {
pub fn try_new(buckets: usize) -> Result<Self> {
if buckets < MIN_BUCKETS {
return Err(Error::InvalidSketch(format!(
"EquiDepthHistogram requires buckets >= {MIN_BUCKETS} \
(Ioannidis-Poosala 1996, Jagadish 1998); got {buckets}"
)));
}
Ok(Self {
boundaries: vec![0.0; buckets + 1],
counts: vec![0u64; buckets],
total: 0,
})
}
pub fn from_values(values: &[f64], buckets: usize) -> Result<Self> {
if buckets < MIN_BUCKETS {
return Err(Error::InvalidSketch(format!(
"EquiDepthHistogram requires buckets >= {MIN_BUCKETS} \
(Ioannidis-Poosala 1996, Jagadish 1998); got {buckets}"
)));
}
if values.len() < MIN_BUCKETS {
return Err(Error::InvalidSketch(format!(
"EquiDepthHistogram requires values.len() >= {MIN_BUCKETS} \
to populate a non-trivial equi-depth partition; got {}",
values.len()
)));
}
let mut sorted: Vec<f64> = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let n = sorted.len();
let buckets = buckets.min(n);
let per_bucket = n / buckets;
let mut remainder = n % buckets;
let mut boundaries = Vec::with_capacity(buckets + 1);
let mut counts = Vec::with_capacity(buckets);
boundaries.push(sorted[0]);
let mut cursor = 0usize;
for _ in 0..buckets {
let mut take = per_bucket;
if remainder > 0 {
take += 1;
remainder -= 1;
}
cursor += take;
let edge_idx = cursor.min(n) - 1;
boundaries.push(sorted[edge_idx]);
counts.push(take as u64);
}
Ok(Self {
boundaries,
counts,
total: n as u64,
})
}
pub fn estimate_range(&self, lo: f64, hi: f64) -> u64 {
if lo > hi || self.counts.is_empty() {
return 0;
}
let nb = self.counts.len();
let mut estimate = 0.0f64;
for i in 0..nb {
let b_lo = self.boundaries[i];
let b_hi = self.boundaries[i + 1];
let cnt = self.counts[i] as f64;
if b_hi < lo || b_lo > hi {
continue;
}
if b_lo >= lo && b_hi <= hi {
estimate += cnt;
continue;
}
let overlap_lo = b_lo.max(lo);
let overlap_hi = b_hi.min(hi);
let bucket_width = (b_hi - b_lo).max(f64::EPSILON);
let overlap_width = (overlap_hi - overlap_lo).max(0.0);
estimate += cnt * (overlap_width / bucket_width);
}
estimate.max(0.0) as u64
}
pub fn total(&self) -> u64 {
self.total
}
pub fn buckets(&self) -> usize {
self.counts.len()
}
fn validate(&self) -> Result<()> {
if self.counts.len() < MIN_BUCKETS {
return Err(Error::InvalidSketch(format!(
"EquiDepthHistogram requires >= {MIN_BUCKETS} buckets \
(Ioannidis-Poosala 1996, Jagadish 1998); got {}",
self.counts.len()
)));
}
if self.boundaries.len() != self.counts.len() + 1 {
return Err(Error::InvalidSketch(format!(
"EquiDepthHistogram boundaries.len() {} != counts.len()+1 = {}",
self.boundaries.len(),
self.counts.len() + 1
)));
}
for (i, b) in self.boundaries.iter().enumerate() {
if b.is_nan() {
return Err(Error::InvalidSketch(format!(
"EquiDepthHistogram boundary[{i}] is NaN"
)));
}
}
for w in self.boundaries.windows(2) {
if w[0] > w[1] {
return Err(Error::InvalidSketch(format!(
"EquiDepthHistogram boundaries not non-decreasing: {} > {}",
w[0], w[1]
)));
}
}
let sum: u64 = self.counts.iter().fold(0u64, |a, &c| a.saturating_add(c));
if sum != self.total {
return Err(Error::InvalidSketch(format!(
"EquiDepthHistogram total {} != sum(counts) {}",
self.total, sum
)));
}
Ok(())
}
}
impl Sketch for EquiDepthHistogram {
const KIND: &'static str = "samkhya.histogram-equidepth-v1";
fn to_bytes(&self) -> Result<Vec<u8>> {
bincode::serialize(self).map_err(Into::into)
}
fn from_bytes(bytes: &[u8]) -> Result<Self> {
let s: Self = bincode::deserialize(bytes).map_err(Error::from)?;
s.validate()?;
Ok(s)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn estimates_whole_range_correctly() {
let values: Vec<f64> = (0..1000).map(|i| i as f64).collect();
let h = EquiDepthHistogram::from_values(&values, 10).unwrap();
assert_eq!(h.estimate_range(0.0, 999.0), 1000);
}
#[test]
fn estimates_half_range_approximately() {
let values: Vec<f64> = (0..1000).map(|i| i as f64).collect();
let h = EquiDepthHistogram::from_values(&values, 10).unwrap();
let est = h.estimate_range(0.0, 500.0);
assert!(
(400..=600).contains(&est),
"half-range estimate {est} too far"
);
}
#[test]
fn empty_range_returns_zero() {
let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
let h = EquiDepthHistogram::from_values(&values, 5).unwrap();
assert_eq!(h.estimate_range(50.0, 40.0), 0);
assert_eq!(h.estimate_range(200.0, 300.0), 0);
}
#[test]
fn empty_population_rejected() {
assert!(EquiDepthHistogram::from_values(&[], 5).is_err());
assert!(EquiDepthHistogram::from_values(&[42.0], 5).is_err());
}
#[test]
fn round_trip() {
let values: Vec<f64> = (0..500).map(|i| (i as f64) * 0.3).collect();
let h = EquiDepthHistogram::from_values(&values, 16).unwrap();
let bytes = h.to_bytes().unwrap();
let h2 = EquiDepthHistogram::from_bytes(&bytes).unwrap();
assert_eq!(h.total, h2.total);
assert_eq!(h.counts, h2.counts);
assert_eq!(h.boundaries, h2.boundaries);
}
#[test]
fn zero_buckets_errors() {
let values = vec![1.0, 2.0, 3.0];
assert!(EquiDepthHistogram::from_values(&values, 0).is_err());
}
#[test]
fn one_bucket_rejected_by_from_values() {
let values = vec![1.0, 2.0, 3.0, 4.0];
assert!(EquiDepthHistogram::from_values(&values, 1).is_err());
}
#[test]
fn try_new_rejects_zero_buckets() {
assert!(EquiDepthHistogram::try_new(0).is_err());
}
#[test]
fn try_new_rejects_one_bucket() {
assert!(EquiDepthHistogram::try_new(1).is_err());
}
#[test]
fn try_new_allocates_empty_shell() {
let h = EquiDepthHistogram::try_new(8).unwrap();
assert_eq!(h.buckets(), 8);
assert_eq!(h.total(), 0);
}
#[test]
fn from_bytes_rejects_one_bucket_payload() {
#[derive(serde::Serialize)]
struct Wire {
boundaries: Vec<f64>,
counts: Vec<u64>,
total: u64,
}
let bad = Wire {
boundaries: vec![0.0, 1.0],
counts: vec![5],
total: 5,
};
let bytes = bincode::serialize(&bad).unwrap();
assert!(
EquiDepthHistogram::from_bytes(&bytes).is_err(),
"single-bucket payload accepted by from_bytes"
);
}
#[test]
fn from_bytes_rejects_all_zero_payload() {
for n in [4usize, 8, 16, 32, 128, 1024, 4 * 1024 * 1024] {
let zeros = vec![0u8; n];
assert!(
EquiDepthHistogram::from_bytes(&zeros).is_err(),
"all-zero len {n} accepted by from_bytes"
);
}
}
#[test]
fn from_bytes_rejects_non_monotone_boundaries() {
#[derive(serde::Serialize)]
struct Wire {
boundaries: Vec<f64>,
counts: Vec<u64>,
total: u64,
}
let bad = Wire {
boundaries: vec![5.0, 3.0, 7.0], counts: vec![1, 1],
total: 2,
};
let bytes = bincode::serialize(&bad).unwrap();
assert!(
EquiDepthHistogram::from_bytes(&bytes).is_err(),
"non-monotone boundaries accepted"
);
}
#[test]
fn from_bytes_rejects_nan_boundary() {
#[derive(serde::Serialize)]
struct Wire {
boundaries: Vec<f64>,
counts: Vec<u64>,
total: u64,
}
let bad = Wire {
boundaries: vec![0.0, f64::NAN, 1.0],
counts: vec![1, 1],
total: 2,
};
let bytes = bincode::serialize(&bad).unwrap();
assert!(
EquiDepthHistogram::from_bytes(&bytes).is_err(),
"NaN bin edge accepted"
);
}
#[test]
fn from_bytes_rejects_length_mismatch() {
#[derive(serde::Serialize)]
struct Wire {
boundaries: Vec<f64>,
counts: Vec<u64>,
total: u64,
}
let bad = Wire {
boundaries: vec![0.0, 1.0, 2.0],
counts: vec![5], total: 5,
};
let bytes = bincode::serialize(&bad).unwrap();
assert!(
EquiDepthHistogram::from_bytes(&bytes).is_err(),
"boundaries/counts length mismatch accepted"
);
}
#[test]
fn from_bytes_accepts_valid_payload() {
let values: Vec<f64> = (0..500).map(|i| (i as f64) * 0.3).collect();
let h = EquiDepthHistogram::from_values(&values, 16).unwrap();
let bytes = h.to_bytes().unwrap();
let h2 = EquiDepthHistogram::from_bytes(&bytes).unwrap();
assert_eq!(h.buckets(), h2.buckets());
assert_eq!(h.total(), h2.total());
}
}