#![allow(clippy::cast_precision_loss)]
use serde::{Deserialize, Serialize};
pub(crate) fn next_after(v: f64) -> f64 {
if v.is_nan() || v == f64::INFINITY {
return v;
}
if v == 0.0 {
return f64::from_bits(1);
}
let bits = v.to_bits();
let next_bits = if v > 0.0 { bits + 1 } else { bits - 1 };
f64::from_bits(next_bits)
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct HistogramBucket {
pub lower_bound: f64,
pub upper_bound: f64,
pub count: u64,
#[serde(default)]
pub distinct_count: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct Histogram {
pub buckets: Vec<HistogramBucket>,
#[serde(default)]
pub total_count: u64,
#[serde(default)]
pub incremental_updates: u64,
#[serde(default)]
pub stale: bool,
}
impl Histogram {
#[must_use]
pub fn find_bucket(&self, value: f64) -> Option<usize> {
let buckets = &self.buckets;
if buckets.is_empty() {
return None;
}
let idx = buckets.partition_point(|b| b.lower_bound <= value);
if idx == 0 {
return None;
}
let candidate = idx - 1;
if value < buckets[candidate].upper_bound {
Some(candidate)
} else {
None
}
}
#[must_use]
pub fn estimate_eq_selectivity(&self, value: f64) -> f64 {
let total = self.bucket_sum();
if total == 0 {
return 0.0;
}
let sel = if let Some(idx) = self.find_bucket(value) {
let bucket = &self.buckets[idx];
if bucket.distinct_count > 0 {
bucket.count as f64 / (bucket.distinct_count as f64 * total as f64)
} else {
1.0 / total.max(1) as f64
}
} else {
1.0 / total.max(1) as f64
};
sel.clamp(0.0, 1.0)
}
#[must_use]
pub fn estimate_lt_selectivity(&self, value: f64) -> f64 {
let total = self.bucket_sum();
if self.buckets.is_empty() || total == 0 {
return 0.0;
}
if value <= self.buckets[0].lower_bound {
return 0.0;
}
if value >= self.buckets[self.buckets.len() - 1].upper_bound {
return 1.0;
}
let count_below = accumulate_lt_count(&self.buckets, value);
(count_below / total as f64).clamp(0.0, 1.0)
}
#[must_use]
pub fn estimate_range_selectivity(&self, low: f64, high: f64) -> f64 {
if let Some(shortcut) = self.range_selectivity_shortcut(low, high) {
return shortcut;
}
let total = self.bucket_sum();
let mut count_in_range: f64 = 0.0;
for bucket in &self.buckets {
if let Some(fraction) = bucket_range_fraction(bucket, low, high) {
count_in_range += bucket.count as f64 * fraction;
}
}
(count_in_range / total as f64).clamp(0.0, 1.0)
}
fn range_selectivity_shortcut(&self, low: f64, high: f64) -> Option<f64> {
if low > high || self.buckets.is_empty() || self.bucket_sum() == 0 {
return Some(0.0);
}
let first_lower = self.buckets[0].lower_bound;
let last_upper = self.buckets[self.buckets.len() - 1].upper_bound;
if low >= last_upper || high <= first_lower {
return Some(0.0);
}
if low <= first_lower && high >= last_upper {
return Some(1.0);
}
None
}
pub fn increment_bucket(&mut self, value: f64) {
if let Some(idx) = self.find_bucket(value) {
self.buckets[idx].count += 1;
self.incremental_updates += 1;
self.check_staleness();
}
}
pub fn decrement_bucket(&mut self, value: f64) {
if let Some(idx) = self.find_bucket(value) {
self.buckets[idx].count = self.buckets[idx].count.saturating_sub(1);
self.incremental_updates += 1;
self.check_staleness();
}
}
fn bucket_sum(&self) -> u64 {
self.buckets.iter().map(|b| b.count).sum()
}
fn check_staleness(&mut self) {
if self.total_count > 0 && self.incremental_updates > self.total_count / 5 {
self.stale = true;
}
}
}
fn accumulate_lt_count(buckets: &[HistogramBucket], value: f64) -> f64 {
let mut count_below: f64 = 0.0;
for bucket in buckets {
if bucket.upper_bound <= value {
count_below += bucket.count as f64;
} else if bucket.lower_bound < value {
let width = bucket.upper_bound - bucket.lower_bound;
if width > 0.0 {
count_below += bucket.count as f64 * ((value - bucket.lower_bound) / width);
}
break;
} else {
break;
}
}
count_below
}
fn bucket_range_fraction(bucket: &HistogramBucket, low: f64, high: f64) -> Option<f64> {
if bucket.upper_bound <= low || bucket.lower_bound >= high {
return None;
}
let width = bucket.upper_bound - bucket.lower_bound;
if width <= 0.0 {
return None;
}
let eff_low = low.max(bucket.lower_bound);
let eff_high = high.min(bucket.upper_bound);
Some((eff_high - eff_low) / width)
}
const DEFAULT_NUM_BUCKETS: usize = 64;
pub(crate) struct HistogramBuilder {
num_buckets: usize,
}
impl HistogramBuilder {
#[must_use]
pub fn new(num_buckets: usize) -> Self {
Self {
num_buckets: if num_buckets == 0 {
DEFAULT_NUM_BUCKETS
} else {
num_buckets
},
}
}
#[must_use]
pub fn build(&self, values: &mut [f64]) -> Histogram {
let valid_len = partition_nan(values);
let valid = &mut values[..valid_len];
if valid.is_empty() {
return Histogram::default();
}
valid.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let distinct = count_distinct(valid);
let buckets = if distinct == 1 {
build_single_value_buckets(valid)
} else if distinct < self.num_buckets {
build_per_distinct_buckets(valid, distinct)
} else {
build_equidepth_buckets(valid, self.num_buckets)
};
Histogram {
buckets,
total_count: valid_len as u64,
incremental_updates: 0,
stale: false,
}
}
}
fn partition_nan(values: &mut [f64]) -> usize {
let mut valid = 0;
for i in 0..values.len() {
if !values[i].is_nan() {
values.swap(valid, i);
valid += 1;
}
}
valid
}
#[allow(clippy::float_cmp)]
fn count_distinct(sorted: &[f64]) -> usize {
if sorted.is_empty() {
return 0;
}
1 + sorted.windows(2).filter(|w| w[0] != w[1]).count()
}
fn slice_distinct_count(sorted: &[f64]) -> u64 {
count_distinct(sorted) as u64
}
fn build_single_value_buckets(sorted: &[f64]) -> Vec<HistogramBucket> {
let val = sorted[0];
vec![HistogramBucket {
lower_bound: val,
upper_bound: next_after(val),
count: sorted.len() as u64,
distinct_count: 1,
}]
}
#[allow(clippy::float_cmp)]
fn build_per_distinct_buckets(sorted: &[f64], distinct: usize) -> Vec<HistogramBucket> {
let mut buckets = Vec::with_capacity(distinct);
let mut i = 0;
while i < sorted.len() {
let val = sorted[i];
let start = i;
while i < sorted.len() && sorted[i] == val {
i += 1;
}
let next_bound = if i < sorted.len() {
sorted[i]
} else {
next_after(val)
};
buckets.push(HistogramBucket {
lower_bound: val,
upper_bound: next_bound,
count: (i - start) as u64,
distinct_count: 1,
});
}
buckets
}
fn build_equidepth_buckets(sorted: &[f64], num_buckets: usize) -> Vec<HistogramBucket> {
let chunk_size = sorted.len().div_ceil(num_buckets);
let mut buckets = Vec::with_capacity(num_buckets);
for chunk in sorted.chunks(chunk_size) {
let lower = chunk[0];
let upper = upper_bound_for_chunk(chunk, sorted, &buckets);
buckets.push(HistogramBucket {
lower_bound: lower,
upper_bound: upper,
count: chunk.len() as u64,
distinct_count: slice_distinct_count(chunk),
});
}
merge_zero_width_buckets(buckets)
}
#[allow(clippy::float_cmp)]
pub(crate) fn merge_zero_width_buckets(buckets: Vec<HistogramBucket>) -> Vec<HistogramBucket> {
if buckets.is_empty() {
return buckets;
}
let mut pending_count: u64 = 0;
let mut pending_distinct: u64 = 0;
let mut result: Vec<HistogramBucket> = Vec::with_capacity(buckets.len());
for bucket in buckets {
if bucket.lower_bound == bucket.upper_bound {
pending_count += bucket.count;
pending_distinct = pending_distinct.max(bucket.distinct_count);
} else {
let mut merged = bucket;
merged.count += pending_count;
merged.distinct_count = merged.distinct_count.max(pending_distinct);
pending_count = 0;
pending_distinct = 0;
result.push(merged);
}
}
if pending_count > 0 {
if let Some(last) = result.last_mut() {
last.count += pending_count;
last.distinct_count = last.distinct_count.max(pending_distinct);
}
}
result
}
fn upper_bound_for_chunk(chunk: &[f64], sorted: &[f64], existing: &[HistogramBucket]) -> f64 {
let chunk_end_offset = existing.iter().map(|b| b.count as usize).sum::<usize>() + chunk.len();
if chunk_end_offset < sorted.len() {
sorted[chunk_end_offset]
} else {
next_after(chunk[chunk.len() - 1])
}
}