use crate::{Config, Error, Histogram};
use core::sync::atomic::{AtomicU64, Ordering};
pub struct AtomicHistogram {
config: Config,
buckets: Box<[AtomicU64]>,
}
impl AtomicHistogram {
pub fn new(grouping_power: u8, max_value_power: u8) -> Result<Self, Error> {
let config = Config::new(grouping_power, max_value_power)?;
Ok(Self::with_config(&config))
}
pub fn with_config(config: &Config) -> Self {
let mut buckets = Vec::with_capacity(config.total_buckets());
buckets.resize_with(config.total_buckets(), || AtomicU64::new(0));
Self {
config: *config,
buckets: buckets.into(),
}
}
pub fn increment(&self, value: u64) -> Result<(), Error> {
self.add(value, 1)
}
pub fn add(&self, value: u64, count: u64) -> Result<(), Error> {
let index = self.config.value_to_index(value)?;
self.buckets[index].fetch_add(count, Ordering::Relaxed);
Ok(())
}
#[cfg(target_has_atomic = "64")]
pub fn drain(&self) -> Histogram {
let buckets: Vec<u64> = self
.buckets
.iter()
.map(|bucket| bucket.swap(0, Ordering::Relaxed))
.collect();
Histogram {
config: self.config,
buckets: buckets.into(),
}
}
pub fn config(&self) -> Config {
self.config
}
pub fn load(&self) -> Histogram {
let buckets: Vec<u64> = self
.buckets
.iter()
.map(|bucket| bucket.load(Ordering::Relaxed))
.collect();
Histogram {
config: self.config,
buckets: buckets.into(),
}
}
}
impl std::fmt::Debug for AtomicHistogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AtomicHistogram")
.field("config", &self.config)
.finish()
}
}
#[cfg(test)]
mod tests {
use crate::*;
#[cfg(target_pointer_width = "64")]
#[test]
fn size() {
assert_eq!(std::mem::size_of::<AtomicHistogram>(), 48);
}
#[cfg(target_has_atomic = "64")]
#[test]
fn drain() {
let histogram = AtomicHistogram::new(7, 64).unwrap();
for i in 0..=100 {
let _ = histogram.increment(i);
}
let snapshot = histogram.drain();
let result = snapshot.quantile(0.50).unwrap().unwrap();
assert_eq!(
result.get(&Quantile::new(0.50).unwrap()),
Some(&Bucket {
count: 1,
range: 50..=50,
})
);
histogram.increment(1000).unwrap();
let snapshot = histogram.drain();
let result = snapshot.quantile(0.50).unwrap().unwrap();
assert_eq!(
result.get(&Quantile::new(0.50).unwrap()),
Some(&Bucket {
count: 1,
range: 1000..=1003,
})
);
}
#[test]
fn quantiles() {
let histogram = AtomicHistogram::new(7, 64).unwrap();
let qs = [0.25, 0.50, 0.75, 0.90, 0.99];
assert_eq!(histogram.load().quantiles(&qs).unwrap(), None);
assert_eq!(histogram.load().quantile(0.5).unwrap(), None);
for i in 0..=100 {
let _ = histogram.increment(i);
let result = histogram.load().quantile(0.0).unwrap().unwrap();
assert_eq!(
result.get(&Quantile::new(0.0).unwrap()),
Some(&Bucket {
count: 1,
range: 0..=0,
})
);
let result = histogram.load().quantile(1.0).unwrap().unwrap();
assert_eq!(
result.get(&Quantile::new(1.0).unwrap()),
Some(&Bucket {
count: 1,
range: i..=i,
})
);
}
for q in qs {
let result = histogram.load().quantile(q).unwrap().unwrap();
let bucket = result.get(&Quantile::new(q).unwrap()).unwrap();
assert_eq!(bucket.end(), (q * 100.0) as u64);
}
let result = histogram.load().quantile(0.999).unwrap().unwrap();
assert_eq!(
result.get(&Quantile::new(0.999).unwrap()).unwrap().end(),
100
);
assert_eq!(
histogram.load().quantiles(&[-1.0]),
Err(Error::InvalidQuantile)
);
assert_eq!(
histogram.load().quantiles(&[1.01]),
Err(Error::InvalidQuantile)
);
let result = histogram
.load()
.quantiles(&[0.5, 0.9, 0.99, 0.999])
.unwrap()
.unwrap();
let values: Vec<(f64, u64)> = result
.entries()
.iter()
.map(|(q, b)| (q.as_f64(), b.end()))
.collect();
assert_eq!(values, vec![(0.5, 50), (0.9, 90), (0.99, 99), (0.999, 100)]);
let _ = histogram.increment(1024);
let result = histogram.load().quantile(0.999).unwrap().unwrap();
assert_eq!(
result.get(&Quantile::new(0.999).unwrap()),
Some(&Bucket {
count: 1,
range: 1024..=1031,
})
);
}
}