use std::sync::atomic::{AtomicU64, Ordering};
const POSITIVE_BUCKETS: usize = 64;
pub struct ExpBucketsSnapshot {
pub positive: [u64; POSITIVE_BUCKETS],
pub zero_count: u64,
pub sum: u64,
pub count: u64,
}
impl ExpBucketsSnapshot {
pub fn bucket_midpoint(i: usize) -> u64 {
if i == 0 { 1 } else { 3u64 << (i - 1) }
}
pub fn iter_samples(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
let zero = (self.zero_count > 0).then_some((0u64, self.zero_count));
let positive = self.positive.iter().enumerate().filter_map(|(i, &count)| {
if count > 0 {
Some((Self::bucket_midpoint(i), count))
} else {
None
}
});
zero.into_iter().chain(positive)
}
pub fn min(&self) -> Option<u64> {
if self.zero_count > 0 {
return Some(0);
}
for (i, &c) in self.positive.iter().enumerate() {
if c > 0 {
return Some(1u64 << i);
}
}
None
}
pub fn max(&self) -> Option<u64> {
for i in (0..POSITIVE_BUCKETS).rev() {
if self.positive[i] > 0 {
if i == 63 {
return Some(u64::MAX);
}
return Some((1u64 << (i + 1)) - 1);
}
}
if self.zero_count > 0 {
return Some(0);
}
None
}
}
pub(crate) struct ExpBuckets {
positive: [AtomicU64; POSITIVE_BUCKETS],
zero_count: AtomicU64,
sum: AtomicU64,
count: AtomicU64,
}
impl ExpBuckets {
pub(crate) fn new() -> Self {
Self {
positive: std::array::from_fn(|_| AtomicU64::new(0)),
zero_count: AtomicU64::new(0),
sum: AtomicU64::new(0),
count: AtomicU64::new(0),
}
}
#[inline]
pub(crate) fn record(&self, value: u64) {
if value == 0 {
self.zero_count.fetch_add(1, Ordering::Relaxed);
} else {
let idx = bucket_index(value);
self.positive[idx].fetch_add(1, Ordering::Relaxed);
}
self.sum.fetch_add(value, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn get_count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
pub(crate) fn get_sum(&self) -> u64 {
self.sum.load(Ordering::Relaxed)
}
pub(crate) fn get_zero_count(&self) -> u64 {
self.zero_count.load(Ordering::Relaxed)
}
pub(crate) fn get_positive_buckets(&self) -> [u64; POSITIVE_BUCKETS] {
let mut out = [0u64; POSITIVE_BUCKETS];
for (i, bucket) in self.positive.iter().enumerate() {
out[i] = bucket.load(Ordering::Relaxed);
}
out
}
}
#[inline]
fn bucket_index(value: u64) -> usize {
debug_assert!(value > 0, "bucket_index called with zero");
63 - value.leading_zeros() as usize
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn bucket_index_powers_of_two() {
assert_eq!(bucket_index(1), 0); assert_eq!(bucket_index(2), 1); assert_eq!(bucket_index(4), 2); assert_eq!(bucket_index(1024), 10); assert_eq!(bucket_index(1 << 63), 63);
}
#[test]
fn bucket_index_non_powers() {
assert_eq!(bucket_index(3), 1); assert_eq!(bucket_index(5), 2); assert_eq!(bucket_index(7), 2);
assert_eq!(bucket_index(255), 7);
assert_eq!(bucket_index(256), 8);
assert_eq!(bucket_index(u64::MAX), 63);
}
#[test]
fn record_zero() {
let b = ExpBuckets::new();
b.record(0);
b.record(0);
assert_eq!(b.get_zero_count(), 2);
assert_eq!(b.get_count(), 2);
assert_eq!(b.get_sum(), 0);
let buckets = b.get_positive_buckets();
assert!(buckets.iter().all(|&c| c == 0));
}
#[test]
fn record_nonzero() {
let b = ExpBuckets::new();
b.record(1); b.record(2); b.record(3); b.record(100);
assert_eq!(b.get_count(), 4);
assert_eq!(b.get_sum(), 106);
assert_eq!(b.get_zero_count(), 0);
let buckets = b.get_positive_buckets();
assert_eq!(buckets[0], 1); assert_eq!(buckets[1], 2); assert_eq!(buckets[6], 1); }
#[test]
fn snapshot_min_max() {
let snap = ExpBucketsSnapshot {
positive: {
let mut a = [0u64; 64];
a[3] = 5; a[7] = 2; a
},
zero_count: 0,
sum: 500,
count: 7,
};
assert_eq!(snap.min(), Some(8)); assert_eq!(snap.max(), Some(255)); }
#[test]
fn snapshot_min_max_with_zero() {
let snap = ExpBucketsSnapshot {
positive: [0u64; 64],
zero_count: 3,
sum: 0,
count: 3,
};
assert_eq!(snap.min(), Some(0));
assert_eq!(snap.max(), Some(0));
}
#[test]
fn snapshot_empty() {
let snap = ExpBucketsSnapshot {
positive: [0u64; 64],
zero_count: 0,
sum: 0,
count: 0,
};
assert_eq!(snap.min(), None);
assert_eq!(snap.max(), None);
}
#[test]
fn concurrent_recording() {
use std::sync::Arc;
let b = Arc::new(ExpBuckets::new());
let threads: Vec<_> = (0..8)
.map(|_| {
let b = Arc::clone(&b);
std::thread::spawn(move || {
for i in 1..=1000u64 {
b.record(i);
}
})
})
.collect();
for t in threads {
t.join().expect("thread panicked");
}
assert_eq!(b.get_count(), 8000);
assert_eq!(b.get_sum(), 8 * (1000 * 1001 / 2)); }
}