use atomic::Atomic;
use std::fmt::{Display, Formatter};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
const MAXI64: i64 = i64::MAX;
#[derive(Debug)]
pub struct Histogram {
bounds: Arc<Vec<Atomic<f64>>>,
count: AtomicI64,
count_per_bucket: Arc<Vec<AtomicI64>>,
min: AtomicI64,
max: AtomicI64,
sum: AtomicI64,
}
impl Histogram {
pub fn new(bounds: Vec<f64>) -> Self {
let bounds = bounds.into_iter().map(Atomic::new).collect::<Vec<_>>();
let mut cpb = init_cpb(bounds.len() + 1);
cpb.shrink_to_fit();
Histogram {
bounds: Arc::new(bounds),
count: AtomicI64::new(0),
count_per_bucket: Arc::new(cpb),
min: AtomicI64::new(MAXI64),
max: AtomicI64::new(0),
sum: AtomicI64::new(0),
}
}
pub fn update(&self, val: i64) {
let _ = self
.max
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |max| {
if val > max {
Some(val)
} else {
None
}
});
let _ = self
.min
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |min| {
if val < min {
Some(val)
} else {
None
}
});
self.sum.fetch_add(val, Ordering::SeqCst);
self.count.fetch_add(1, Ordering::SeqCst);
for idx in 0..=self.bounds.len() {
if idx == self.bounds.len() {
self.count_per_bucket[idx].fetch_add(1, Ordering::SeqCst);
break;
}
if val < (self.bounds[idx].load(Ordering::SeqCst) as i64) {
self.count_per_bucket[idx].fetch_add(1, Ordering::SeqCst);
break;
}
}
}
#[allow(dead_code)]
pub fn mean(&self) -> f64 {
if self.count.load(Ordering::SeqCst) == 0 {
0f64
} else {
(self.sum.load(Ordering::SeqCst) as f64) / (self.count.load(Ordering::SeqCst) as f64)
}
}
pub fn percentile(&self, p: f64) -> f64 {
let count = self.count.load(Ordering::SeqCst);
if count == 0 {
self.bounds[0].load(Ordering::SeqCst)
} else {
let mut pval = ((count as f64) * p) as i64;
for (idx, val) in self.count_per_bucket.iter().enumerate() {
pval -= val.load(Ordering::SeqCst);
if pval <= 0 {
if idx == self.bounds.len() {
break;
}
return self.bounds[idx].load(Ordering::SeqCst);
}
}
self.bounds[self.bounds.len() - 1].load(Ordering::SeqCst)
}
}
pub fn clear(&self) {
self.count.store(0, Ordering::SeqCst);
self.count_per_bucket
.iter()
.for_each(|val| val.store(0, Ordering::SeqCst));
self.sum.store(0, Ordering::SeqCst);
self.max.store(0, Ordering::SeqCst);
self.min.store(0, Ordering::SeqCst);
}
}
impl Display for Histogram {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut buf = Vec::<u8>::new();
buf.extend("\n -- Histogram:\n".as_bytes());
buf.extend(format!("Min value: {}\n", self.min.load(Ordering::SeqCst)).as_bytes());
buf.extend(format!("Max value: {}\n", self.max.load(Ordering::SeqCst)).as_bytes());
buf.extend(format!("Count: {}\n", self.count.load(Ordering::SeqCst)).as_bytes());
buf.extend(format!("50p: {}\n", self.percentile(0.5)).as_bytes());
buf.extend(format!("75p: {}\n", self.percentile(0.75)).as_bytes());
buf.extend(format!("90p: {}\n", self.percentile(0.9)).as_bytes());
let num_bounds = self.bounds.len();
let count = self.count.load(Ordering::SeqCst);
let mut cum = 0f64;
for (idx, ct) in self.count_per_bucket.iter().enumerate() {
let ct = ct.load(Ordering::SeqCst);
if ct == 0 {
continue;
}
if idx == self.count_per_bucket.len() - 1 {
let lb = self.bounds[num_bounds - 1].load(Ordering::SeqCst) as u64;
let page = (ct * 100) as f64 / (count as f64);
cum += page;
buf.extend(
format!("[{}, {}) {} {:.2}% {:.2}%\n", lb, "infinity", ct, page, cum)
.as_bytes(),
);
continue;
}
let ub = self.bounds[idx].load(Ordering::SeqCst) as u64;
let mut lb = 0u64;
if idx > 0 {
lb = self.bounds[idx - 1].load(Ordering::SeqCst) as u64;
}
let page = (ct * 100) as f64 / (count as f64);
cum += page;
buf.extend(format!("[{}, {}) {} {:.2}% {:.2}%\n", lb, ub, ct, page, cum).as_bytes())
}
buf.extend(" --\n".as_bytes());
write!(f, "{}", String::from_utf8(buf).unwrap())
}
}
impl Clone for Histogram {
fn clone(&self) -> Self {
Self {
bounds: self.bounds.clone(),
count: AtomicI64::new(self.count.load(Ordering::SeqCst)),
count_per_bucket: self.count_per_bucket.clone(),
min: AtomicI64::new(self.min.load(Ordering::SeqCst)),
max: AtomicI64::new(self.max.load(Ordering::SeqCst)),
sum: AtomicI64::new(self.sum.load(Ordering::SeqCst)),
}
}
}
fn init_cpb(num: usize) -> Vec<AtomicI64> {
vec![0; num]
.into_iter()
.map(AtomicI64::new)
.collect::<Vec<_>>()
}
#[cfg(test)]
mod test {
use crate::histogram::Histogram;
struct PercentileTestCase {
upper_bound: i64,
lower_bound: i64,
step: i64,
loops: u64,
percent: f64,
expect: f64,
}
fn init_histogram(lb: f64, ub: f64, step: f64) -> Histogram {
let size = ((ub - lb) / step).ceil() as usize;
let mut bounds = vec![0f64; size + 1];
let mut prev = 0f64;
bounds.iter_mut().enumerate().for_each(|(idx, val)| {
if idx == 0 {
*val = lb;
prev = lb;
} else if idx == size {
*val = ub;
} else {
*val = prev + step;
prev = *val;
}
});
Histogram::new(bounds)
}
fn assert_histogram_percentiles(ps: Vec<PercentileTestCase>) {
let h = init_histogram(32.0, 514.0, 4.0);
ps.iter().for_each(|case| {
(case.lower_bound..=case.upper_bound)
.filter(|x| *x % case.step == 0)
.for_each(|v| {
(0..case.loops).for_each(|_| {
h.update(v);
})
});
assert_eq!(
h.percentile(case.percent),
case.expect,
"bad: p: {}",
case.percent
);
h.clear();
});
}
#[test]
fn test_mean() {
let h = init_histogram(0.0, 16.0, 4.0);
(0..=16).filter(|x| *x % 4 == 0).for_each(|v| {
h.update(v);
});
assert_eq!(h.mean(), 40f64 / 5f64);
}
#[test]
fn test_percentile() {
let cases = vec![
PercentileTestCase {
upper_bound: 1024,
lower_bound: 0,
step: 4,
loops: 1000,
percent: 0.0,
expect: 32.0,
},
PercentileTestCase {
upper_bound: 512,
lower_bound: 0,
step: 4,
loops: 1000,
percent: 0.99,
expect: 512.0,
},
PercentileTestCase {
upper_bound: 1024,
lower_bound: 0,
step: 4,
loops: 1000,
percent: 1.0,
expect: 514.0,
},
];
assert_histogram_percentiles(cases);
}
#[test]
fn test_fmt() {
let h = init_histogram(0.0, 16.0, 4.0);
(0..=16).filter(|x| *x % 4 == 0).for_each(|v| {
h.update(v);
});
let f = "\n -- Histogram:
Min value: 0
Max value: 16
Count: 5
50p: 8
75p: 12
90p: 16
[0, 4) 1 20.00% 20.00%
[4, 8) 1 20.00% 40.00%
[8, 12) 1 20.00% 60.00%
[12, 16) 1 20.00% 80.00%
[16, infinity) 1 20.00% 100.00%
--\n";
assert_eq!(format!("{}", h), f)
}
}