use std::collections::BTreeMap;
use crate::quantile::{Quantile, QuantilesResult, SampleQuantiles};
use crate::{Bucket, Config, Error, Histogram};
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct SparseHistogram {
pub(crate) config: Config,
pub(crate) index: Vec<u32>,
pub(crate) count: Vec<u64>,
}
impl SparseHistogram {
pub fn new(grouping_power: u8, max_value_power: u8) -> Result<Self, Error> {
let config = Config::new(grouping_power, max_value_power)?;
Ok(Self::with_config(&config))
}
pub fn with_config(config: &Config) -> Self {
Self {
config: *config,
index: Vec::new(),
count: Vec::new(),
}
}
pub fn from_parts(config: Config, index: Vec<u32>, count: Vec<u64>) -> Result<Self, Error> {
if index.len() != count.len() {
return Err(Error::IncompatibleParameters);
}
let total_buckets = config.total_buckets();
let mut prev = None;
for &idx in &index {
if idx as usize >= total_buckets {
return Err(Error::OutOfRange);
}
if let Some(p) = prev {
if idx <= p {
return Err(Error::IncompatibleParameters);
}
}
prev = Some(idx);
}
for &c in &count {
if c == 0 {
return Err(Error::IncompatibleParameters);
}
}
Ok(Self {
config,
index,
count,
})
}
pub fn into_parts(self) -> (Config, Vec<u32>, Vec<u64>) {
(self.config, self.index, self.count)
}
pub fn config(&self) -> Config {
self.config
}
pub fn index(&self) -> &[u32] {
&self.index
}
pub fn count(&self) -> &[u64] {
&self.count
}
fn add_bucket(&mut self, idx: u32, n: u64) {
if n != 0 {
self.index.push(idx);
self.count.push(n);
}
}
#[allow(clippy::comparison_chain)]
pub fn checked_add(&self, h: &SparseHistogram) -> Result<SparseHistogram, Error> {
if self.config != h.config {
return Err(Error::IncompatibleParameters);
}
let mut histogram = SparseHistogram::with_config(&self.config);
let (mut i, mut j) = (0, 0);
while i < self.index.len() && j < h.index.len() {
let (k1, v1) = (self.index[i], self.count[i]);
let (k2, v2) = (h.index[j], h.count[j]);
if k1 == k2 {
let v = v1.checked_add(v2).ok_or(Error::Overflow)?;
histogram.add_bucket(k1, v);
(i, j) = (i + 1, j + 1);
} else if k1 < k2 {
histogram.add_bucket(k1, v1);
i += 1;
} else {
histogram.add_bucket(k2, v2);
j += 1;
}
}
if i < self.index.len() {
histogram.index.extend(&self.index[i..]);
histogram.count.extend(&self.count[i..]);
}
if j < h.index.len() {
histogram.index.extend(&h.index[j..]);
histogram.count.extend(&h.count[j..]);
}
Ok(histogram)
}
#[allow(clippy::comparison_chain)]
pub fn wrapping_add(&self, h: &SparseHistogram) -> Result<SparseHistogram, Error> {
if self.config != h.config {
return Err(Error::IncompatibleParameters);
}
let mut histogram = SparseHistogram::with_config(&self.config);
let (mut i, mut j) = (0, 0);
while i < self.index.len() && j < h.index.len() {
let (k1, v1) = (self.index[i], self.count[i]);
let (k2, v2) = (h.index[j], h.count[j]);
if k1 == k2 {
histogram.add_bucket(k1, v1.wrapping_add(v2));
(i, j) = (i + 1, j + 1);
} else if k1 < k2 {
histogram.add_bucket(k1, v1);
i += 1;
} else {
histogram.add_bucket(k2, v2);
j += 1;
}
}
if i < self.index.len() {
histogram.index.extend(&self.index[i..self.index.len()]);
histogram.count.extend(&self.count[i..self.count.len()]);
}
if j < h.index.len() {
histogram.index.extend(&h.index[j..h.index.len()]);
histogram.count.extend(&h.count[j..h.count.len()]);
}
Ok(histogram)
}
#[allow(clippy::comparison_chain)]
pub fn checked_sub(&self, h: &SparseHistogram) -> Result<SparseHistogram, Error> {
if self.config != h.config {
return Err(Error::IncompatibleParameters);
}
let mut histogram = SparseHistogram::with_config(&self.config);
let (mut i, mut j) = (0, 0);
while i < self.index.len() && j < h.index.len() {
let (k1, v1) = (self.index[i], self.count[i]);
let (k2, v2) = (h.index[j], h.count[j]);
if k1 == k2 {
let v = v1.checked_sub(v2).ok_or(Error::Underflow)?;
if v != 0 {
histogram.add_bucket(k1, v);
}
(i, j) = (i + 1, j + 1);
} else if k1 < k2 {
histogram.add_bucket(k1, v1);
i += 1;
} else {
return Err(Error::InvalidSubset);
}
}
if j < h.index.len() {
return Err(Error::InvalidSubset);
}
if i < self.index.len() {
histogram.index.extend(&self.index[i..self.index.len()]);
histogram.count.extend(&self.count[i..self.count.len()]);
}
Ok(histogram)
}
#[allow(clippy::comparison_chain)]
pub fn wrapping_sub(&self, h: &SparseHistogram) -> Result<SparseHistogram, Error> {
if self.config != h.config {
return Err(Error::IncompatibleParameters);
}
let mut histogram = SparseHistogram::with_config(&self.config);
let (mut i, mut j) = (0, 0);
while i < self.index.len() && j < h.index.len() {
let (k1, v1) = (self.index[i], self.count[i]);
let (k2, v2) = (h.index[j], h.count[j]);
if k1 == k2 {
histogram.add_bucket(k1, v1.wrapping_sub(v2));
(i, j) = (i + 1, j + 1);
} else if k1 < k2 {
histogram.add_bucket(k1, v1);
i += 1;
} else {
return Err(Error::InvalidSubset);
}
}
if i < self.index.len() {
histogram.index.extend(&self.index[i..]);
histogram.count.extend(&self.count[i..]);
}
if j < h.index.len() {
return Err(Error::InvalidSubset);
}
Ok(histogram)
}
#[deprecated(note = "Use the SampleQuantiles trait")]
#[allow(deprecated)]
pub fn percentiles(&self, percentiles: &[f64]) -> Result<Option<Vec<(f64, Bucket)>>, Error> {
Ok(SampleQuantiles::quantiles(self, percentiles)
.map_err(|e| match e {
Error::InvalidQuantile => Error::InvalidPercentile,
other => other,
})?
.map(|qr| {
qr.entries()
.iter()
.map(|(q, b)| (q.as_f64(), b.clone()))
.collect()
}))
}
#[deprecated(note = "Use the SampleQuantiles trait")]
pub fn percentile(&self, percentile: f64) -> Result<Option<Bucket>, Error> {
#[allow(deprecated)]
self.percentiles(&[percentile])
.map(|v| v.map(|x| x.first().unwrap().1.clone()))
}
pub fn downsample(&self, grouping_power: u8) -> Result<SparseHistogram, Error> {
if grouping_power >= self.config.grouping_power() {
return Err(Error::IncompatibleParameters);
}
let config = Config::new(grouping_power, self.config.max_value_power())?;
let mut histogram = SparseHistogram::with_config(&config);
let mut aggregating_idx: u32 = 0;
let mut aggregating_count: u64 = 0;
for (idx, n) in self.index.iter().zip(self.count.iter()) {
let new_idx =
config.value_to_index(self.config.index_to_lower_bound(*idx as usize))? as u32;
if new_idx == aggregating_idx {
aggregating_count = aggregating_count.wrapping_add(*n);
continue;
}
histogram.add_bucket(aggregating_idx, aggregating_count);
aggregating_idx = new_idx;
aggregating_count = *n;
}
histogram.add_bucket(aggregating_idx, aggregating_count);
Ok(histogram)
}
pub fn iter(&self) -> Iter<'_> {
Iter {
index: 0,
histogram: self,
}
}
}
impl SampleQuantiles for SparseHistogram {
fn quantiles(&self, quantiles: &[f64]) -> Result<Option<QuantilesResult>, Error> {
for q in quantiles {
if !(0.0..=1.0).contains(q) {
return Err(Error::InvalidQuantile);
}
}
let total_count: u128 = self.count.iter().map(|v| *v as u128).sum();
if total_count == 0 {
return Ok(None);
}
let mut sorted: Vec<Quantile> = quantiles
.iter()
.map(|&q| Quantile::new(q).unwrap())
.collect();
sorted.sort();
sorted.dedup();
let min = Bucket {
count: self.count[0],
range: self.config.index_to_range(self.index[0] as usize),
};
let last = self.index.len() - 1;
let max = Bucket {
count: self.count[last],
range: self.config.index_to_range(self.index[last] as usize),
};
let mut idx = 0;
let mut partial_sum = self.count[0] as u128;
let mut entries = BTreeMap::new();
for quantile in &sorted {
let count = std::cmp::max(1, (quantile.as_f64() * total_count as f64).ceil() as u128);
loop {
if partial_sum >= count {
entries.insert(
*quantile,
Bucket {
count: self.count[idx],
range: self.config.index_to_range(self.index[idx] as usize),
},
);
break;
}
if idx == (self.index.len() - 1) {
break;
}
idx += 1;
partial_sum += self.count[idx] as u128;
}
}
Ok(Some(QuantilesResult::new(entries, total_count, min, max)))
}
}
impl<'a> IntoIterator for &'a SparseHistogram {
type Item = Bucket;
type IntoIter = Iter<'a>;
fn into_iter(self) -> Self::IntoIter {
Iter {
index: 0,
histogram: self,
}
}
}
pub struct Iter<'a> {
index: usize,
histogram: &'a SparseHistogram,
}
impl Iterator for Iter<'_> {
type Item = Bucket;
fn next(&mut self) -> Option<<Self as std::iter::Iterator>::Item> {
if self.index >= self.histogram.index.len() {
return None;
}
let bucket = Bucket {
count: self.histogram.count[self.index],
range: self
.histogram
.config
.index_to_range(self.histogram.index[self.index] as usize),
};
self.index += 1;
Some(bucket)
}
}
impl ExactSizeIterator for Iter<'_> {
fn len(&self) -> usize {
self.histogram.index.len() - self.index
}
}
impl std::iter::FusedIterator for Iter<'_> {}
impl From<&Histogram> for SparseHistogram {
fn from(histogram: &Histogram) -> Self {
let mut index = Vec::new();
let mut count = Vec::new();
for (idx, n) in histogram.as_slice().iter().enumerate() {
if *n > 0 {
index.push(idx as u32);
count.push(*n);
}
}
Self {
config: histogram.config(),
index,
count,
}
}
}
#[cfg(test)]
mod tests {
use rand::RngExt;
use std::collections::HashMap;
use super::*;
use crate::standard::Histogram;
#[test]
fn checked_add() {
let config = Config::new(7, 32).unwrap();
let h1 = SparseHistogram::from_parts(config, vec![1, 3, 5], vec![6, 12, 7]).unwrap();
let h2 = SparseHistogram::from_parts(config, vec![2, 3], vec![5, 7]).unwrap();
let h = h1.checked_add(&h2).unwrap();
assert_eq!(h.index(), &[1, 2, 3, 5]);
assert_eq!(h.count(), &[6, 5, 19, 7]);
let h_max = SparseHistogram::from_parts(config, vec![3], vec![u64::MAX]).unwrap();
assert_eq!(h1.checked_add(&h_max), Err(Error::Overflow));
}
#[test]
fn wrapping_add() {
let config = Config::new(7, 32).unwrap();
let h1 = SparseHistogram::from_parts(config, vec![1, 3, 5], vec![6, 12, 7]).unwrap();
let h2 = SparseHistogram::with_config(&config);
let h3 = SparseHistogram::from_parts(config, vec![2, 3, 6, 11, 13], vec![5, 7, 3, 15, 6])
.unwrap();
let hdiff = SparseHistogram::new(6, 16).unwrap();
let h = h1.wrapping_add(&hdiff);
assert_eq!(h, Err(Error::IncompatibleParameters));
let h = h1.wrapping_add(&h2).unwrap();
assert_eq!(h.index(), &[1, 3, 5]);
assert_eq!(h.count(), &[6, 12, 7]);
let h = h2.wrapping_add(&h3).unwrap();
assert_eq!(h.index(), &[2, 3, 6, 11, 13]);
assert_eq!(h.count(), &[5, 7, 3, 15, 6]);
let h = h1.wrapping_add(&h3).unwrap();
assert_eq!(h.index(), &[1, 2, 3, 5, 6, 11, 13]);
assert_eq!(h.count(), &[6, 5, 19, 7, 3, 15, 6]);
}
#[test]
fn checked_sub() {
let config = Config::new(7, 32).unwrap();
let h1 = SparseHistogram::from_parts(config, vec![1, 3, 5], vec![6, 12, 7]).unwrap();
let hparams = SparseHistogram::new(6, 16).unwrap();
let h = h1.checked_sub(&hparams);
assert_eq!(h, Err(Error::IncompatibleParameters));
let hempty = SparseHistogram::with_config(&config);
let h = h1.checked_sub(&hempty).unwrap();
assert_eq!(h.index(), &[1, 3, 5]);
assert_eq!(h.count(), &[6, 12, 7]);
let hclone = h1.clone();
let h = h1.checked_sub(&hclone).unwrap();
assert!(h.index().is_empty());
assert!(h.count().is_empty());
let hlarger = SparseHistogram::from_parts(config, vec![1, 3, 5], vec![4, 13, 7]).unwrap();
let h = h1.checked_sub(&hlarger);
assert_eq!(h, Err(Error::Underflow));
let hmore = SparseHistogram::from_parts(config, vec![1, 5, 7], vec![4, 7, 1]).unwrap();
let h = h1.checked_sub(&hmore);
assert_eq!(h, Err(Error::InvalidSubset));
let hdiff = SparseHistogram::from_parts(config, vec![1, 2, 5], vec![4, 1, 7]).unwrap();
let h = h1.checked_sub(&hdiff);
assert_eq!(h, Err(Error::InvalidSubset));
let hsubset = SparseHistogram::from_parts(config, vec![1, 3], vec![5, 9]).unwrap();
let h = h1.checked_sub(&hsubset).unwrap();
assert_eq!(h.index(), &[1, 3, 5]);
assert_eq!(h.count(), &[1, 3, 7]);
}
#[test]
fn wrapping_sub() {
let config = Config::new(7, 32).unwrap();
let h1 = SparseHistogram::from_parts(config, vec![1, 3, 5], vec![6, 12, 7]).unwrap();
let h2 = SparseHistogram::from_parts(config, vec![1, 3], vec![4, 5]).unwrap();
let h = h1.wrapping_sub(&h2).unwrap();
assert_eq!(h.index(), &[1, 3, 5]);
assert_eq!(h.count(), &[2, 7, 7]);
let h3 = SparseHistogram::from_parts(config, vec![1], vec![10]).unwrap();
let h = h1.wrapping_sub(&h3).unwrap();
assert_eq!(h.count()[0], 6u64.wrapping_sub(10));
let h4 = SparseHistogram::from_parts(config, vec![2], vec![1]).unwrap();
assert_eq!(h1.wrapping_sub(&h4), Err(Error::InvalidSubset));
}
#[test]
fn wrapping_add_overflow() {
let config = Config::new(7, 32).unwrap();
let h1 = SparseHistogram::from_parts(config, vec![1], vec![u64::MAX]).unwrap();
let h2 = SparseHistogram::from_parts(config, vec![1], vec![1]).unwrap();
let h = h1.wrapping_add(&h2).unwrap();
assert!(h.index().is_empty());
}
#[allow(deprecated)]
#[test]
fn percentiles() {
let mut hstandard = Histogram::new(4, 10).unwrap();
let hempty = SparseHistogram::from(&hstandard);
for v in 1..1024 {
let _ = hstandard.increment(v);
}
let hsparse = SparseHistogram::from(&hstandard);
let percentiles = [0.01, 0.10, 0.25, 0.50, 0.75, 0.90, 0.99, 0.999];
for percentile in percentiles {
let bempty = hempty.percentile(percentile).unwrap();
let bstandard = hstandard.percentile(percentile).unwrap();
let bsparse = hsparse.percentile(percentile).unwrap();
assert_eq!(bempty, None);
assert_eq!(bsparse, bstandard);
}
assert_eq!(hempty.percentiles(&percentiles), Ok(None));
assert_eq!(
hstandard.percentiles(&percentiles).unwrap(),
hsparse.percentiles(&percentiles).unwrap()
);
}
#[allow(deprecated)]
#[test]
fn min() {
let mut histogram = Histogram::new(7, 64).unwrap();
let h = SparseHistogram::from(&histogram);
assert_eq!(h.percentile(0.0).unwrap(), None);
let _ = histogram.increment(10);
let h = SparseHistogram::from(&histogram);
assert_eq!(h.percentile(0.0).map(|b| b.unwrap().end()), Ok(10));
let _ = histogram.increment(4);
let h = SparseHistogram::from(&histogram);
assert_eq!(h.percentile(0.0).map(|b| b.unwrap().end()), Ok(4));
}
fn compare_histograms(hstandard: &Histogram, hsparse: &SparseHistogram) {
assert_eq!(hstandard.config(), hsparse.config());
let mut buckets: HashMap<u32, u64> = HashMap::new();
for (idx, count) in hsparse.index().iter().zip(hsparse.count().iter()) {
let _ = buckets.insert(*idx, *count);
}
for (idx, count) in hstandard.as_slice().iter().enumerate() {
if *count > 0 {
let v = buckets.get(&(idx as u32)).unwrap();
assert_eq!(*v, *count);
}
}
}
#[test]
fn snapshot() {
let mut hstandard = Histogram::new(5, 10).unwrap();
for v in 1..1024 {
let _ = hstandard.increment(v);
}
let hsparse = SparseHistogram::from(&hstandard);
compare_histograms(&hstandard, &hsparse);
}
#[test]
fn downsample() {
let mut histogram = Histogram::new(8, 32).unwrap();
let mut rng = rand::rng();
for _ in 0..10000 {
let v: u64 = rng.random_range(1..2_u64.pow(histogram.config.max_value_power() as u32));
let _ = histogram.increment(v);
}
let hsparse = SparseHistogram::from(&histogram);
compare_histograms(&histogram, &hsparse);
let grouping_power = histogram.config.grouping_power();
for factor in 1..grouping_power {
let reduced_gp = grouping_power - factor;
let h1 = histogram.downsample(reduced_gp).unwrap();
let h2 = hsparse.downsample(reduced_gp).unwrap();
compare_histograms(&h1, &h2);
}
}
}