use std::sync::atomic::{AtomicU64, Ordering};
pub const DEFAULT_BUCKETS_US: &[u64] = &[
10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000, 5_000_000, 10_000_000, ];
pub struct AtomicHistogram {
boundaries: &'static [u64],
buckets: Vec<AtomicU64>,
count: AtomicU64,
sum: AtomicU64,
}
impl AtomicHistogram {
pub fn new() -> Self {
Self::with_buckets(DEFAULT_BUCKETS_US)
}
pub fn with_buckets(boundaries: &'static [u64]) -> Self {
let buckets = (0..boundaries.len()).map(|_| AtomicU64::new(0)).collect();
Self {
boundaries,
buckets,
count: AtomicU64::new(0),
sum: AtomicU64::new(0),
}
}
pub fn observe(&self, value_us: u64) {
self.count.fetch_add(1, Ordering::Relaxed);
self.sum.fetch_add(value_us, Ordering::Relaxed);
for (i, &boundary) in self.boundaries.iter().enumerate() {
if value_us <= boundary {
self.buckets[i].fetch_add(1, Ordering::Relaxed);
return;
}
}
}
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
pub fn sum_us(&self) -> u64 {
self.sum.load(Ordering::Relaxed)
}
pub fn percentile(&self, p: f64) -> u64 {
let total = self.count.load(Ordering::Relaxed);
if total == 0 {
return 0;
}
let target = (p * total as f64) as u64;
let mut cumulative = 0u64;
let mut prev_boundary = 0u64;
for (i, &boundary) in self.boundaries.iter().enumerate() {
let bucket_count = self.buckets[i].load(Ordering::Relaxed);
cumulative += bucket_count;
if cumulative >= target {
let bucket_start = prev_boundary;
let bucket_width = boundary - bucket_start;
if bucket_count == 0 {
return boundary;
}
let fraction = if cumulative > target {
(bucket_count - (cumulative - target)) as f64 / bucket_count as f64
} else {
1.0
};
return bucket_start + (fraction * bucket_width as f64) as u64;
}
prev_boundary = boundary;
}
self.boundaries.last().copied().unwrap_or(0)
}
pub fn write_prometheus(&self, out: &mut String, name: &str, help: &str) {
use std::fmt::Write;
let _ = writeln!(out, "# HELP {name} {help}");
let _ = writeln!(out, "# TYPE {name} histogram");
let mut cumulative = 0u64;
for (i, &boundary) in self.boundaries.iter().enumerate() {
cumulative += self.buckets[i].load(Ordering::Relaxed);
let le = format!("{}", boundary as f64 / 1_000_000.0);
let _ = writeln!(out, "{name}_bucket{{le=\"{le}\"}} {cumulative}");
}
let total = self.count.load(Ordering::Relaxed);
let _ = writeln!(out, "{name}_bucket{{le=\"+Inf\"}} {total}");
let _ = writeln!(out, "{name}_sum {}", self.sum_us() as f64 / 1_000_000.0);
let _ = writeln!(out, "{name}_count {total}");
}
}
impl Default for AtomicHistogram {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for AtomicHistogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AtomicHistogram")
.field("count", &self.count.load(Ordering::Relaxed))
.field("sum_us", &self.sum.load(Ordering::Relaxed))
.field("buckets", &self.boundaries.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_observation() {
let h = AtomicHistogram::new();
h.observe(50); h.observe(500); h.observe(5000); assert_eq!(h.count(), 3);
assert_eq!(h.sum_us(), 5550);
}
#[test]
fn percentile_estimation() {
let h = AtomicHistogram::new();
for _ in 0..100 {
h.observe(800); }
let p50 = h.percentile(0.5);
assert!((500..=1000).contains(&p50), "p50={p50}");
}
#[test]
fn prometheus_output() {
let h = AtomicHistogram::new();
h.observe(100);
h.observe(5000);
let mut out = String::new();
h.write_prometheus(&mut out, "nodedb_query_latency_seconds", "Query latency");
assert!(out.contains("# TYPE nodedb_query_latency_seconds histogram"));
assert!(out.contains("nodedb_query_latency_seconds_count 2"));
assert!(out.contains("le=\"+Inf\""));
}
#[test]
fn overflow_beyond_all_buckets() {
let h = AtomicHistogram::new();
h.observe(99_000_000); assert_eq!(h.count(), 1);
let mut found_in_bucket = false;
for i in 0..DEFAULT_BUCKETS_US.len() {
if h.buckets[i].load(Ordering::Relaxed) > 0 {
found_in_bucket = true;
}
}
assert!(!found_in_bucket);
}
#[test]
fn empty_histogram() {
let h = AtomicHistogram::new();
assert_eq!(h.count(), 0);
assert_eq!(h.percentile(0.5), 0);
}
}