#![deny(clippy::unwrap_used)]
#![deny(clippy::expect_used)]
#![deny(clippy::panic)]
use std::cmp::Ordering;
use std::fmt;
pub mod builder;
pub mod compactor;
pub mod error;
pub mod iter;
pub mod sorted_view;
pub trait TotalOrd {
fn total_cmp(&self, other: &Self) -> Ordering;
}
impl TotalOrd for f64 {
#[inline(always)]
fn total_cmp(&self, other: &Self) -> Ordering {
if let Some(ord) = self.partial_cmp(other) {
ord
} else {
f64::total_cmp(self, other)
}
}
}
impl TotalOrd for f32 {
#[inline(always)]
fn total_cmp(&self, other: &Self) -> Ordering {
f32::total_cmp(self, other)
}
}
macro_rules! impl_total_ord_for_ord {
($($t:ty),*) => {
$(
impl TotalOrd for $t {
#[inline(always)]
fn total_cmp(&self, other: &Self) -> Ordering {
self.cmp(other)
}
}
)*
};
}
impl_total_ord_for_ord!(i8, u8, i16, u16, i32, u32, i64, u64, i128, u128, isize, usize);
pub trait ReqKey: Copy + PartialOrd + Clone {}
impl ReqKey for f64 {}
impl ReqKey for f32 {}
impl ReqKey for i64 {}
impl ReqKey for i32 {}
impl ReqKey for u64 {}
impl ReqKey for u32 {}
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
pub use builder::ReqSketchBuilder;
pub use error::{ReqError, Result};
pub use iter::ReqSketchIterator;
pub use sorted_view::SortedView;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Default)]
pub enum RankAccuracy {
#[default]
HighRank,
LowRank,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SearchCriteria {
#[default]
Inclusive,
Exclusive,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(
feature = "serde",
serde(
bound = "T: Clone + TotalOrd + PartialEq + serde::Serialize + serde::de::DeserializeOwned"
)
)]
pub struct ReqSketch<T> {
k: u16,
rank_accuracy: RankAccuracy,
total_n: u64,
max_nom_size: u32,
num_retained: u32,
compactors: Vec<compactor::Compactor<T>>,
promotion_buf: Vec<T>,
min_item: Option<T>,
max_item: Option<T>,
sorted_view_cache: Option<SortedView<T>>,
}
impl<T> ReqSketch<T>
where
T: Clone + TotalOrd + PartialEq,
{
pub fn new() -> Self {
Self {
k: 12,
rank_accuracy: RankAccuracy::HighRank,
total_n: 0,
max_nom_size: 0,
num_retained: 0,
compactors: Vec::new(),
promotion_buf: Vec::with_capacity(12),
min_item: None,
max_item: None,
sorted_view_cache: None,
}
}
pub fn with_k(k: u16) -> Result<Self> {
ReqSketchBuilder::new().k(k)?.build()
}
pub fn builder() -> ReqSketchBuilder<T> {
ReqSketchBuilder::new()
}
pub fn k(&self) -> u16 {
self.k
}
pub fn rank_accuracy(&self) -> RankAccuracy {
self.rank_accuracy
}
pub fn is_empty(&self) -> bool {
self.total_n == 0
}
pub fn len(&self) -> u64 {
self.total_n
}
pub fn num_retained(&self) -> u32 {
self.num_retained
}
pub fn is_estimation_mode(&self) -> bool {
self.compactors.len() > 1
}
pub fn update(&mut self, item: T) {
match &mut self.min_item {
None => self.min_item = Some(item.clone()),
Some(min) if item.total_cmp(min).is_lt() => *min = item.clone(),
_ => {}
}
match &mut self.max_item {
None => self.max_item = Some(item.clone()),
Some(max) if item.total_cmp(max).is_gt() => *max = item.clone(),
_ => {}
}
if self.compactors.is_empty() {
self.grow();
}
self.compactors[0].append(item);
self.total_n += 1;
self.num_retained += 1;
if self.num_retained == self.max_nom_size {
self.compress();
}
self.sorted_view_cache = None;
}
pub fn merge(&mut self, other: &Self) -> Result<()> {
if self.rank_accuracy != other.rank_accuracy {
return Err(ReqError::IncompatibleSketches(
"Sketches must have the same rank accuracy setting".to_string(),
));
}
if self.k != other.k {
return Err(ReqError::IncompatibleSketches(
"Sketches must have the same k parameter".to_string(),
));
}
if other.is_empty() {
return Ok(());
}
self.total_n += other.total_n;
if let Some(other_min) = &other.min_item {
match &self.min_item {
None => self.min_item = Some(other_min.clone()),
Some(min) if other_min.total_cmp(min).is_lt() => {
self.min_item = Some(other_min.clone())
}
_ => {}
}
}
if let Some(other_max) = &other.max_item {
match &self.max_item {
None => self.max_item = Some(other_max.clone()),
Some(max) if other_max.total_cmp(max).is_gt() => {
self.max_item = Some(other_max.clone())
}
_ => {}
}
}
while self.compactors.len() < other.compactors.len() {
self.grow();
}
for (i, other_compactor) in other.compactors.iter().enumerate() {
self.compactors[i].merge(other_compactor)?;
}
self.update_max_nom_size();
self.update_num_retained();
if self.num_retained >= self.max_nom_size {
self.compress();
}
self.sorted_view_cache = None;
Ok(())
}
pub fn min_item(&self) -> Option<&T> {
self.min_item.as_ref()
}
pub fn max_item(&self) -> Option<&T> {
self.max_item.as_ref()
}
pub fn rank(&mut self, item: &T, criteria: SearchCriteria) -> Result<f64> {
if self.is_empty() {
return Err(ReqError::EmptySketch);
}
let sorted_view = self.get_sorted_view()?;
sorted_view.rank_no_interpolation(item, criteria)
}
pub fn rank_inclusive(&mut self, item: &T) -> Result<f64> {
self.rank(item, SearchCriteria::Inclusive)
}
pub fn quantile(&mut self, rank: f64, criteria: SearchCriteria) -> Result<T> {
if self.is_empty() {
return Err(ReqError::EmptySketch);
}
if !(0.0..=1.0).contains(&rank) {
return Err(ReqError::InvalidRank(rank));
}
let sorted_view = self.get_sorted_view()?;
sorted_view.quantile(rank, criteria)
}
pub fn quantile_inclusive(&mut self, rank: f64) -> Result<T> {
self.quantile(rank, SearchCriteria::Inclusive)
}
pub fn quantiles(&mut self, ranks: &[f64], criteria: SearchCriteria) -> Result<Vec<T>> {
if self.is_empty() {
return Err(ReqError::EmptySketch);
}
let sorted_view = self.get_sorted_view()?;
let mut results = Vec::with_capacity(ranks.len());
for &rank in ranks {
if !(0.0..=1.0).contains(&rank) {
return Err(ReqError::InvalidRank(rank));
}
results.push(sorted_view.quantile(rank, criteria)?);
}
Ok(results)
}
pub fn pmf(&mut self, split_points: &[T], criteria: SearchCriteria) -> Result<Vec<f64>> {
if self.is_empty() {
return Err(ReqError::EmptySketch);
}
let sorted_view = self.get_sorted_view()?;
sorted_view.pmf(split_points, criteria)
}
pub fn cdf(&mut self, split_points: &[T], criteria: SearchCriteria) -> Result<Vec<f64>> {
if self.is_empty() {
return Err(ReqError::EmptySketch);
}
let sorted_view = self.get_sorted_view()?;
sorted_view.cdf(split_points, criteria)
}
pub fn iter(&self) -> ReqSketchIterator<'_, T> {
ReqSketchIterator::new(&self.compactors)
}
pub fn sorted_view(&mut self) -> Result<&SortedView<T>> {
self.get_sorted_view()
}
pub fn reset(&mut self) {
self.total_n = 0;
self.num_retained = 0;
self.max_nom_size = 0;
self.min_item = None;
self.max_item = None;
self.compactors.clear();
self.sorted_view_cache = None;
}
fn get_sorted_view(&mut self) -> Result<&SortedView<T>> {
if self.is_empty() {
return Err(ReqError::EmptySketch);
}
if self.sorted_view_cache.is_none() {
self.sorted_view_cache = Some(self.compute_sorted_view()?);
}
self.sorted_view_cache
.as_ref()
.ok_or(ReqError::CacheInvalid)
}
fn compute_sorted_view(&self) -> Result<SortedView<T>> {
let mut weighted_items = Vec::new();
for compactor in &self.compactors {
let weight = compactor.weight();
for item in compactor.iter() {
weighted_items.push((item.clone(), weight));
}
}
Ok(SortedView::new(weighted_items))
}
fn compress(&mut self) {
for h in 0..self.compactors.len() {
if self.compactors[h].num_items() >= self.compactors[h].nominal_capacity() {
if h == 0 {
self.compactors[0].sort();
}
if h + 1 >= self.compactors.len() {
self.grow();
}
self.promotion_buf.clear();
self.compactors[h].compact_into(self.rank_accuracy, &mut self.promotion_buf);
if !self.promotion_buf.is_empty() {
self.compactors[h + 1].sort();
self.compactors[h + 1].merge_sorted(&self.promotion_buf);
}
self.update_max_nom_size();
self.update_num_retained();
}
}
self.sorted_view_cache = None;
}
fn grow(&mut self) {
let level = self.compactors.len() as u8;
let compactor = compactor::Compactor::new(level, self.k, self.rank_accuracy);
self.compactors.push(compactor);
self.update_max_nom_size();
}
fn update_max_nom_size(&mut self) {
self.max_nom_size = self.compactors.iter().map(|c| c.nominal_capacity()).sum();
}
fn update_num_retained(&mut self) {
self.num_retained = self.compactors.iter().map(|c| c.num_items()).sum();
}
#[cfg(test)]
pub(crate) fn debug_compactor_info(&self) -> Vec<(u8, u32, u32)> {
self.compactors
.iter()
.map(|c| (c.lg_weight(), c.num_items(), c.nominal_capacity()))
.collect()
}
#[doc(hidden)]
pub fn total_retained_items(&self) -> u32 {
self.compactors.iter().map(|c| c.num_items()).sum()
}
#[doc(hidden)]
pub fn total_nominal_capacity(&self) -> u32 {
self.compactors.iter().map(|c| c.nominal_capacity()).sum()
}
#[doc(hidden)]
pub fn level_info(&self) -> Vec<(usize, u32, u32, u64)> {
self.compactors
.iter()
.enumerate()
.map(|(i, c)| (i, c.num_items(), c.nominal_capacity(), c.weight()))
.collect()
}
#[doc(hidden)]
pub fn computed_total_weight(&self) -> u64 {
self.compactors
.iter()
.map(|c| c.num_items() as u64 * c.weight())
.sum()
}
#[doc(hidden)]
pub fn test_get_sorted_view(&mut self) -> Result<&SortedView<T>> {
self.sorted_view()
}
pub fn get_rank_lower_bound(&self, rank: f64, num_std_dev: u8) -> f64 {
self.compute_rank_lower_bound(
self.k,
self.compactors.len() as u8,
rank,
num_std_dev,
self.total_n,
matches!(self.rank_accuracy, RankAccuracy::HighRank),
)
}
pub fn get_rank_upper_bound(&self, rank: f64, num_std_dev: u8) -> f64 {
self.compute_rank_upper_bound(
self.k,
self.compactors.len() as u8,
rank,
num_std_dev,
self.total_n,
matches!(self.rank_accuracy, RankAccuracy::HighRank),
)
}
const FIXED_RSE_FACTOR: f64 = 0.084;
const INIT_NUM_SECTIONS: u8 = 3;
fn relative_rse_factor() -> f64 {
(0.0512 / Self::INIT_NUM_SECTIONS as f64).sqrt()
}
fn compute_rank_lower_bound(
&self,
k: u16,
num_levels: u8,
rank: f64,
num_std_dev: u8,
n: u64,
hra: bool,
) -> f64 {
if self.is_exact_rank_threshold(k, num_levels, rank, n, hra) {
return rank;
}
let relative = Self::relative_rse_factor() / k as f64 * if hra { 1.0 - rank } else { rank };
let fixed = Self::FIXED_RSE_FACTOR / k as f64;
let lb_rel = rank - num_std_dev as f64 * relative;
let lb_fix = rank - num_std_dev as f64 * fixed;
lb_rel.max(lb_fix).max(0.0)
}
fn compute_rank_upper_bound(
&self,
k: u16,
num_levels: u8,
rank: f64,
num_std_dev: u8,
n: u64,
hra: bool,
) -> f64 {
if self.is_exact_rank_threshold(k, num_levels, rank, n, hra) {
return rank;
}
let relative = Self::relative_rse_factor() / k as f64 * if hra { 1.0 - rank } else { rank };
let fixed = Self::FIXED_RSE_FACTOR / k as f64;
let ub_rel = rank + num_std_dev as f64 * relative;
let ub_fix = rank + num_std_dev as f64 * fixed;
ub_rel.min(ub_fix).min(1.0)
}
fn is_exact_rank_threshold(
&self,
k: u16,
num_levels: u8,
rank: f64,
n: u64,
hra: bool,
) -> bool {
let base_cap = k as u64 * Self::INIT_NUM_SECTIONS as u64;
if num_levels == 1 || n <= base_cap {
return true;
}
let exact_rank_thresh = base_cap as f64 / n as f64;
if hra {
rank >= 1.0 - exact_rank_thresh
} else {
rank <= exact_rank_thresh
}
}
}
impl<T> Default for ReqSketch<T>
where
T: Clone + TotalOrd + PartialEq,
{
fn default() -> Self {
Self::new()
}
}
impl<T> fmt::Display for ReqSketch<T>
where
T: fmt::Display + Clone + TotalOrd + PartialEq,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "REQ Sketch Summary:")?;
writeln!(f, " K: {}", self.k)?;
writeln!(f, " Total items: {}", self.total_n)?;
writeln!(f, " Retained items: {}", self.num_retained())?;
writeln!(f, " Estimation mode: {}", self.is_estimation_mode())?;
writeln!(f, " Rank accuracy: {:?}", self.rank_accuracy)?;
writeln!(f, " Levels: {}", self.compactors.len())?;
if let (Some(min), Some(max)) = (&self.min_item, &self.max_item) {
writeln!(f, " Min item: {}", min)?;
writeln!(f, " Max item: {}", max)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_sketch() {
let sketch: ReqSketch<f64> = ReqSketch::new();
assert!(sketch.is_empty());
assert_eq!(sketch.len(), 0);
assert_eq!(sketch.k(), 12);
assert_eq!(sketch.rank_accuracy(), RankAccuracy::HighRank);
}
#[test]
fn test_update_and_basic_queries() -> Result<()> {
let mut sketch = ReqSketch::new();
for i in 0..100 {
sketch.update(i as f64);
}
assert!(!sketch.is_empty());
assert_eq!(sketch.len(), 100);
let min = sketch.min_item().ok_or(ReqError::EmptySketch)?;
let max = sketch.max_item().ok_or(ReqError::EmptySketch)?;
assert!(
*min >= 0.0 && *min <= 10.0,
"Min should be in reasonable range, got {}",
min
);
assert!(
*max >= 89.0 && *max <= 99.0,
"Max should be in reasonable range, got {}",
max
);
Ok(())
}
#[test]
fn test_builder_pattern() -> Result<()> {
let sketch: Result<ReqSketch<i32>> = ReqSketch::builder()
.k(16)
.map(|builder| builder.rank_accuracy(RankAccuracy::LowRank))
.and_then(|builder| builder.build());
assert!(sketch.is_ok());
let sketch = sketch?;
assert_eq!(sketch.k(), 16);
assert_eq!(sketch.rank_accuracy(), RankAccuracy::LowRank);
Ok(())
}
#[test]
fn test_weight_consistency_detailed() {
let mut sketch = ReqSketch::new();
for i in 0..80 {
sketch.update(i as f64);
let total_weight: u64 = sketch.iter().map(|(_, weight)| weight).sum();
let expected = sketch.len();
if i >= 75 {
println!(
"Item {}: total_weight={}, expected={}, diff={}",
i,
total_weight,
expected,
total_weight as i64 - expected as i64
);
if total_weight != expected {
println!(" MISMATCH DETAILS:");
println!(" estimation_mode: {}", sketch.is_estimation_mode());
println!(" num_retained: {}", sketch.num_retained());
let debug_info = sketch.debug_compactor_info();
for (level, (lg_weight, num_items, capacity)) in debug_info.iter().enumerate() {
let weight = 1u64 << lg_weight;
let level_total = *num_items as u64 * weight;
println!(
" Level {}: {} items Ă— {} weight = {} total (capacity: {})",
level, num_items, weight, level_total, capacity
);
}
break;
}
}
}
}
#[test]
fn test_merge_commutativity_debug() -> Result<()> {
let mut sketch1a = ReqSketch::new();
let mut sketch2a = ReqSketch::new();
let mut sketch1b = ReqSketch::new();
let mut sketch2b = ReqSketch::new();
for _ in 0..35 {
sketch1a.update(0.0);
sketch1b.update(0.0);
}
let values = [412.275, 721.747, 731.854, 249.854, 979.752];
for &val in &values {
sketch1a.update(val);
sketch1b.update(val);
}
let values2 = [516.453, 879.855, 244.286, 909.822];
for &val in &values2 {
sketch2a.update(val);
sketch2b.update(val);
}
sketch1a.merge(&sketch2a)?; sketch2b.merge(&sketch1b)?;
assert!(!sketch1a.is_empty(), "Merged sketch should not be empty");
assert!(!sketch2b.is_empty(), "Merged sketch should not be empty");
assert_eq!(
sketch1a.len(),
sketch2b.len(),
"Merged sketches should have same length"
);
let q1: f64 = sketch1a.quantile(0.5, SearchCriteria::Inclusive)?;
let q2: f64 = sketch2b.quantile(0.5, SearchCriteria::Inclusive)?;
let diff = (q1 - q2).abs();
assert!(
diff < 100.0,
"Quantiles should be reasonably close: {} vs {} (diff: {})",
q1,
q2,
diff
);
Ok(())
}
#[test]
fn test_exact_quantile_debug() -> Result<()> {
let mut sketch = ReqSketch::new();
for i in 1..=10 {
sketch.update(i as f64);
}
assert_eq!(sketch.len(), 10, "Sketch should contain 10 items");
assert_eq!(sketch.total_n, 10, "Total count should be 10");
assert!(
!sketch.is_estimation_mode(),
"Should be in exact mode for 10 items"
);
let test_cases = [(0.0, 1.0), (0.1, 1.0), (0.5, 5.0), (0.9, 9.0), (1.0, 10.0)];
for &(rank, expected) in &test_cases {
let quantile = sketch.quantile(rank, SearchCriteria::Inclusive)?;
assert_eq!(
quantile, expected,
"quantile({}, Inclusive) should be {}",
rank, expected
);
}
let exclusive_cases = [(0.0, 1.0), (0.1, 2.0), (0.5, 6.0), (0.9, 10.0), (1.0, 10.0)];
for &(rank, expected) in &exclusive_cases {
let quantile = sketch.quantile(rank, SearchCriteria::Exclusive)?;
assert_eq!(
quantile, expected,
"quantile({}, Exclusive) should be {}",
rank, expected
);
}
let sorted_view = sketch.get_sorted_view()?;
assert_eq!(sorted_view.total_weight(), 10, "Total weight should be 10");
assert_eq!(sorted_view.len(), 10, "Sorted view should have 10 items");
Ok(())
}
#[test]
fn test_weight_debug_1000() {
let mut sketch = ReqSketch::new();
let n = 1000;
for i in 0..n {
sketch.update(i as f64);
}
assert_eq!(
sketch.total_n, n as u64,
"Total count should equal number of updates"
);
let mut total_weight = 0u64;
for compactor in &sketch.compactors {
if compactor.num_items() > 0 {
let level_weight = compactor.num_items() as u64 * compactor.weight();
total_weight += level_weight;
}
}
assert_eq!(
total_weight, sketch.total_n,
"Sum of compactor weights should equal total_n"
);
let iter_weight: u64 = sketch.iter().map(|(_, weight)| weight).sum();
assert_eq!(
iter_weight, sketch.total_n,
"Iterator weight sum should equal total_n"
);
assert!(
sketch.is_estimation_mode(),
"Should be in estimation mode for 1000 items"
);
}
#[test]
fn test_capacity_debug() {
let mut sketch = ReqSketch::new();
for level in 0..5 {
let compactor =
crate::compactor::Compactor::<f64>::new(level, 12, RankAccuracy::HighRank);
let section_size = compactor.section_size();
let nominal_capacity = compactor.nominal_capacity();
assert!(
section_size.is_multiple_of(2),
"Section size should be even for level {}",
level
);
assert!(
section_size >= 4,
"Section size should be at least 4 for level {}",
level
);
assert_eq!(
nominal_capacity,
2 * section_size * 3,
"Nominal capacity calculation should be correct for level {}",
level
);
}
let initial_len = sketch.compactors.len();
for i in 1..=50 {
sketch.update(i as f64);
}
assert!(sketch.len() <= 50, "Sketch should contain at most 50 items");
assert!(sketch.total_n == 50, "Total count should be 50");
if sketch.len() < 50 {
assert!(
sketch.compactors.len() > initial_len
|| sketch.compactors.iter().any(|c| c.num_items() > 0),
"Should have active compactors if items were compacted"
);
}
}
#[test]
fn test_compaction_debug() -> Result<()> {
let mut sketch = ReqSketch::new();
let n = 50;
let mut previous_len = 0;
for i in 0..n {
sketch.update(i as f64);
if sketch.len() < previous_len {
}
previous_len = sketch.len();
}
assert_eq!(
sketch.total_n, n,
"Total count should equal number of updates"
);
let total_weight: u64 = sketch.iter().map(|(_, weight)| weight).sum();
assert_eq!(
total_weight, sketch.total_n,
"Total weight should equal total_n"
);
let median = sketch.quantile(0.5, SearchCriteria::Inclusive)?;
let expected_median = (n - 1) as f64 * 0.5;
let error = (median - expected_median).abs() / expected_median;
assert!(
error < 0.2,
"Median error should be reasonable: got {}, expected {}, error: {:.2}%",
median,
expected_median,
error * 100.0
);
assert!(
sketch.len() <= n,
"Should not have more items than inserted"
);
Ok(())
}
#[test]
fn debug_section_growth() {
let mut sketch = ReqSketch::new();
let mut max_levels = 0;
let mut reached_estimation_mode = false;
for i in 0..200 {
sketch.update(i as f64);
if sketch.compactors.len() > max_levels {
max_levels = sketch.compactors.len();
}
if sketch.is_estimation_mode() {
reached_estimation_mode = true;
}
}
assert_eq!(sketch.total_n, 200, "Should have processed 200 items");
assert!(
reached_estimation_mode,
"Should reach estimation mode with 200 items"
);
assert!(max_levels > 1, "Should have multiple compactor levels");
let total_weight: u64 = sketch.iter().map(|(_, weight)| weight).sum();
assert_eq!(total_weight, sketch.total_n, "Weight should be consistent");
assert!(
sketch.len() <= 200,
"Should not have more retained items than total count"
);
}
}