metrics_runtime/data/
histogram.rs1use crate::common::{Delta, ValueHandle};
2use crate::helper::duration_as_nanos;
3use atomic_shim::AtomicU64;
4use crossbeam_utils::Backoff;
5use metrics_util::{AtomicBucket, StreamingIntegers};
6use quanta::Clock;
7use std::cmp;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::Duration;
10
11#[derive(Clone)]
15pub struct Histogram {
16 handle: ValueHandle,
17}
18
19impl Histogram {
20 pub fn record_timing<D: Delta>(&self, start: D, end: D) {
22 let value = end.delta(start);
23 self.handle.update_histogram(value);
24 }
25
26 pub fn record_value(&self, value: u64) {
28 self.handle.update_histogram(value);
29 }
30}
31
32impl From<ValueHandle> for Histogram {
33 fn from(handle: ValueHandle) -> Self {
34 Self { handle }
35 }
36}
37
38#[derive(Debug)]
44pub struct AtomicWindowedHistogram {
45 buckets: Vec<AtomicBucket<u64>>,
46 bucket_count: usize,
47 granularity: u64,
48 upkeep_index: AtomicUsize,
49 index: AtomicUsize,
50 next_upkeep: AtomicU64,
51 clock: Clock,
52}
53
54impl AtomicWindowedHistogram {
55 pub fn new(window: Duration, granularity: Duration, clock: Clock) -> Self {
68 let window_ns = duration_as_nanos(window);
69 let granularity_ns = duration_as_nanos(granularity);
70 assert!(window_ns > granularity_ns);
71 let now = clock.recent();
72
73 let bucket_count = ((window_ns / granularity_ns) as usize) + 1;
74 let mut buckets = Vec::new();
75 for _ in 0..bucket_count {
76 buckets.push(AtomicBucket::new());
77 }
78
79 let next_upkeep = now + granularity_ns;
80
81 AtomicWindowedHistogram {
82 buckets,
83 bucket_count,
84 granularity: granularity_ns,
85 upkeep_index: AtomicUsize::new(0),
86 index: AtomicUsize::new(0),
87 next_upkeep: AtomicU64::new(next_upkeep),
88 clock,
89 }
90 }
91
92 pub fn snapshot(&self) -> StreamingIntegers {
98 let index = self.upkeep();
100
101 let mut streaming = StreamingIntegers::new();
102
103 for i in 0..self.bucket_count {
106 let bucket_index = (index + i + 1) % self.bucket_count;
107 let bucket = &self.buckets[bucket_index];
108 bucket.data_with(|block| streaming.compress(block));
109 }
110 streaming
111 }
112
113 pub fn record(&self, value: u64) {
115 let index = self.upkeep();
116 self.buckets[index].push(value);
117 }
118
119 fn upkeep(&self) -> usize {
120 let backoff = Backoff::new();
121
122 loop {
123 let now = self.clock.recent();
125 let next_upkeep = self.next_upkeep.load(Ordering::Acquire);
126 if now <= next_upkeep {
127 let index = self.index.load(Ordering::Acquire);
128 let actual_index = index % self.bucket_count;
129
130 return actual_index;
131 }
132
133 let mut upkeep_in_progress = false;
137 let mut index;
138 loop {
139 index = self.index.load(Ordering::Acquire);
140 let upkeep_index = self.upkeep_index.load(Ordering::Acquire);
141 if index == upkeep_index {
142 break;
143 }
144
145 upkeep_in_progress = true;
146 backoff.snooze();
147 }
148
149 if upkeep_in_progress {
152 continue;
153 }
154
155 let delta = now - next_upkeep;
159 let bucket_depth = cmp::min((delta / self.granularity) as usize, self.bucket_count) + 1;
160
161 let new_index = index + bucket_depth;
164 let prev_index = self
165 .index
166 .compare_and_swap(index, new_index, Ordering::SeqCst);
167 if prev_index == index {
168 let clear_index = new_index % self.bucket_count;
173 self.buckets[clear_index].clear();
174
175 let now = self.clock.now();
176 let next_upkeep = now + self.granularity;
177 self.next_upkeep.store(next_upkeep, Ordering::Release);
178
179 let last_index = new_index - 1;
184 while index < last_index {
185 index += 1;
186 let clear_index = index % self.bucket_count;
187 self.buckets[clear_index].clear();
188 }
189
190 self.upkeep_index.store(new_index, Ordering::Release);
193 }
194 }
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::{AtomicWindowedHistogram, Clock};
201 use crossbeam_utils::thread;
202 use std::time::Duration;
203
204 #[test]
205 fn test_histogram_simple_update() {
206 let (clock, _ctl) = Clock::mock();
207 let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
208
209 h.record(1245);
210
211 let snapshot = h.snapshot();
212 assert_eq!(snapshot.len(), 1);
213
214 let values = snapshot.decompress();
215 assert_eq!(values.len(), 1);
216 assert_eq!(values.get(0).unwrap(), &1245);
217 }
218
219 #[test]
220 fn test_histogram_complex_update() {
221 let (clock, _ctl) = Clock::mock();
222 let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
223
224 h.record(1245);
225 h.record(213);
226 h.record(1022);
227 h.record(1248);
228
229 let snapshot = h.snapshot();
230 assert_eq!(snapshot.len(), 4);
231
232 let values = snapshot.decompress();
233 assert_eq!(values.len(), 4);
234 assert_eq!(values.get(0).unwrap(), &1245);
235 assert_eq!(values.get(1).unwrap(), &213);
236 assert_eq!(values.get(2).unwrap(), &1022);
237 assert_eq!(values.get(3).unwrap(), &1248);
238 }
239
240 #[test]
241 fn test_windowed_histogram_rollover() {
242 let (clock, ctl) = Clock::mock();
243
244 let h =
248 AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_millis(999), clock);
249
250 let snapshot = h.snapshot();
252 assert_eq!(snapshot.len(), 0);
253
254 h.record(1);
256 h.record(2);
257 let snapshot = h.snapshot();
258 assert_eq!(snapshot.len(), 2);
259 let total: u64 = snapshot.decompress().iter().sum();
260 assert_eq!(total, 3);
261
262 ctl.increment(Duration::from_secs(3));
264 let snapshot = h.snapshot();
265 assert_eq!(snapshot.len(), 2);
266 let total: u64 = snapshot.decompress().iter().sum();
267 assert_eq!(total, 3);
268
269 ctl.increment(Duration::from_secs(1));
271 let snapshot = h.snapshot();
272 assert_eq!(snapshot.len(), 2);
273 let total: u64 = snapshot.decompress().iter().sum();
274 assert_eq!(total, 3);
275
276 ctl.increment(Duration::from_secs(1));
278 let snapshot = h.snapshot();
279 assert_eq!(snapshot.len(), 2);
280 let total: u64 = snapshot.decompress().iter().sum();
281 assert_eq!(total, 3);
282
283 h.record(3);
285 h.record(4);
286 h.record(5);
287
288 let snapshot = h.snapshot();
289 assert_eq!(snapshot.len(), 5);
290 let total: u64 = snapshot.decompress().iter().sum();
291 assert_eq!(total, 15);
292
293 ctl.increment(Duration::from_secs(1));
297 let snapshot = h.snapshot();
298 assert_eq!(snapshot.len(), 3);
299 let total: u64 = snapshot.decompress().iter().sum();
300 assert_eq!(total, 12);
301
302 ctl.increment(Duration::from_secs(4));
303 let snapshot = h.snapshot();
304 assert_eq!(snapshot.len(), 3);
305 let total: u64 = snapshot.decompress().iter().sum();
306 assert_eq!(total, 12);
307
308 ctl.increment(Duration::from_secs(1));
309 let snapshot = h.snapshot();
310 assert_eq!(snapshot.len(), 0);
311
312 h.record(42);
317
318 let snapshot = h.snapshot();
319 assert_eq!(snapshot.len(), 1);
320 let total: u64 = snapshot.decompress().iter().sum();
321 assert_eq!(total, 42);
322
323 ctl.increment(Duration::from_secs(1000));
324 let snapshot = h.snapshot();
325 assert_eq!(snapshot.len(), 0);
326 }
327
328 #[test]
329 fn test_histogram_write_gauntlet_mt() {
330 let clock = Clock::new();
331 let clock2 = clock.clone();
332 let target = clock.now() + Duration::from_secs(5).as_nanos() as u64;
333 let h = AtomicWindowedHistogram::new(
334 Duration::from_secs(20),
335 Duration::from_millis(500),
336 clock,
337 );
338
339 thread::scope(|s| {
340 let t1 = s.spawn(|_| {
341 let mut total = 0;
342 while clock2.now() < target {
343 h.record(42);
344 total += 1;
345 }
346 total
347 });
348 let t2 = s.spawn(|_| {
349 let mut total = 0;
350 while clock2.now() < target {
351 h.record(42);
352 total += 1;
353 }
354 total
355 });
356 let t3 = s.spawn(|_| {
357 let mut total = 0;
358 while clock2.now() < target {
359 h.record(42);
360 total += 1;
361 }
362 total
363 });
364
365 let t1_total = t1.join().expect("thread 1 panicked during test");
366 let t2_total = t2.join().expect("thread 2 panicked during test");
367 let t3_total = t3.join().expect("thread 3 panicked during test");
368
369 let total = t1_total + t2_total + t3_total;
370 let snap = h.snapshot();
371 assert_eq!(total, snap.len());
372 })
373 .unwrap();
374 }
375}