1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
//! Lock-free histogram implementation for latency tracking.
//!
//! # PERF-001: Lock-Free Histogram
//!
//! Uses atomic counters in logarithmic buckets to avoid mutex contention.
//! Provides approximate percentiles without allocations in the hot path.
//!
//! # Design
//!
//! - 64 buckets covering 1µs to ~18 hours (log2 scale)
//! - Each bucket is an `AtomicU64` counter
//! - `record()` is wait-free (single atomic increment)
//! - `percentile()` requires reading all buckets (still lock-free)
use std::sync::atomic::{AtomicU64, Ordering};
/// Number of histogram buckets (covers 1µs to ~18h with log2 scale).
const NUM_BUCKETS: usize = 64;
/// Lock-free histogram for latency measurements.
///
/// Uses logarithmic bucketing for memory efficiency while maintaining
/// accuracy across a wide range of latencies (1µs to hours).
#[derive(Debug)]
pub struct LockFreeHistogram {
/// Bucket counters (log2 scale, each bucket is 2x the previous)
buckets: [AtomicU64; NUM_BUCKETS],
/// Total count of all recorded values
count: AtomicU64,
/// Sum of all recorded values (for mean calculation)
sum: AtomicU64,
/// Minimum value seen
min: AtomicU64,
/// Maximum value seen
max: AtomicU64,
}
impl Default for LockFreeHistogram {
fn default() -> Self {
Self::new()
}
}
impl LockFreeHistogram {
/// Creates a new empty histogram.
#[must_use]
pub fn new() -> Self {
Self {
buckets: std::array::from_fn(|_| AtomicU64::new(0)),
count: AtomicU64::new(0),
sum: AtomicU64::new(0),
min: AtomicU64::new(u64::MAX),
max: AtomicU64::new(0),
}
}
/// Records a value in microseconds. Wait-free operation.
#[inline]
pub fn record(&self, value_us: u64) {
let bucket = Self::bucket_for(value_us);
self.buckets[bucket].fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
self.sum.fetch_add(value_us, Ordering::Relaxed);
// Update min (CAS loop)
let mut current_min = self.min.load(Ordering::Relaxed);
while value_us < current_min {
match self.min.compare_exchange_weak(
current_min,
value_us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_min = actual,
}
}
// Update max (CAS loop)
let mut current_max = self.max.load(Ordering::Relaxed);
while value_us > current_max {
match self.max.compare_exchange_weak(
current_max,
value_us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_max = actual,
}
}
}
/// Returns the bucket index for a value (log2 scale).
#[inline]
fn bucket_for(value_us: u64) -> usize {
if value_us == 0 {
0
} else {
// log2(value) clamped to bucket range
(64 - value_us.leading_zeros()) as usize - 1
}
.min(NUM_BUCKETS - 1)
}
/// Returns the approximate value for a bucket (midpoint).
#[inline]
fn value_for_bucket(bucket: usize) -> u64 {
if bucket == 0 {
1
} else {
// Midpoint of bucket range: 2^bucket + 2^(bucket-1)
(1u64 << bucket) + (1u64 << (bucket.saturating_sub(1)))
}
}
/// Returns the total count of recorded values.
#[must_use]
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
/// Returns true if no values have been recorded.
#[must_use]
pub fn is_empty(&self) -> bool {
self.count.load(Ordering::Relaxed) == 0
}
/// Returns the minimum recorded value.
#[must_use]
pub fn min(&self) -> u64 {
let min = self.min.load(Ordering::Relaxed);
if min == u64::MAX {
0
} else {
min
}
}
/// Returns the maximum recorded value.
#[must_use]
pub fn max(&self) -> u64 {
self.max.load(Ordering::Relaxed)
}
/// Returns the mean of recorded values.
#[must_use]
pub fn mean(&self) -> u64 {
let count = self.count.load(Ordering::Relaxed);
if count == 0 {
return 0;
}
self.sum.load(Ordering::Relaxed) / count
}
/// Returns an approximate percentile value (0-100).
///
/// Uses linear interpolation within buckets for accuracy.
/// Result is capped by the actual max value recorded.
#[must_use]
pub fn percentile(&self, p: u8) -> u64 {
let total = self.count.load(Ordering::Relaxed);
if total == 0 {
return 0;
}
let max_val = self.max();
#[allow(clippy::cast_possible_truncation)]
let target = (u128::from(total) * u128::from(p.min(100)) / 100) as u64;
let mut cumulative = 0u64;
for (i, bucket) in self.buckets.iter().enumerate() {
let bucket_count = bucket.load(Ordering::Relaxed);
cumulative += bucket_count;
if cumulative >= target {
// Cap by actual max to avoid bucket approximation exceeding real max
return Self::value_for_bucket(i).min(max_val);
}
}
max_val
}
/// Resets all counters to zero.
pub fn reset(&self) {
for bucket in &self.buckets {
bucket.store(0, Ordering::Relaxed);
}
self.count.store(0, Ordering::Relaxed);
self.sum.store(0, Ordering::Relaxed);
self.min.store(u64::MAX, Ordering::Relaxed);
self.max.store(0, Ordering::Relaxed);
}
}