metrics_runtime/data/
histogram.rs

1use crate::common::{Delta, ValueHandle};
2use crate::helper::duration_as_nanos;
3use atomic_shim::AtomicU64;
4use crossbeam_utils::Backoff;
5use metrics_util::{AtomicBucket, StreamingIntegers};
6use quanta::Clock;
7use std::cmp;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::Duration;
10
11/// A reference to a [`Histogram`].
12///
13/// A [`Histogram`] is used for directly updating a gauge, without any lookup overhead.
14#[derive(Clone)]
15pub struct Histogram {
16    handle: ValueHandle,
17}
18
19impl Histogram {
20    /// Records a timing for the histogram.
21    pub fn record_timing<D: Delta>(&self, start: D, end: D) {
22        let value = end.delta(start);
23        self.handle.update_histogram(value);
24    }
25
26    /// Records a value for the histogram.
27    pub fn record_value(&self, value: u64) {
28        self.handle.update_histogram(value);
29    }
30}
31
32impl From<ValueHandle> for Histogram {
33    fn from(handle: ValueHandle) -> Self {
34        Self { handle }
35    }
36}
37
38/// An atomic windowed histogram.
39///
40/// This histogram provides a windowed view of values that rolls forward over time, dropping old
41/// values as they exceed the window of the histogram.  Writes into the histogram are lock-free, as
42/// well as snapshots of the histogram.
43#[derive(Debug)]
44pub struct AtomicWindowedHistogram {
45    buckets: Vec<AtomicBucket<u64>>,
46    bucket_count: usize,
47    granularity: u64,
48    upkeep_index: AtomicUsize,
49    index: AtomicUsize,
50    next_upkeep: AtomicU64,
51    clock: Clock,
52}
53
54impl AtomicWindowedHistogram {
55    /// Creates a new [`AtomicWindowedHistogram`].
56    ///
57    /// Internally, a number of buckets will be created, based on how many times `granularity` goes
58    /// into `window`.  As time passes, buckets will be cleared to avoid values older than the
59    /// `window` duration.
60    ///
61    /// As buckets will hold values represneting a period of time up to `granularity`, the
62    /// granularity can be lowered or raised to roll values off more precisely, or less precisely,
63    /// against the provided clock.
64    ///
65    /// # Panics
66    /// Panics if `granularity` is larger than `window`.
67    pub fn new(window: Duration, granularity: Duration, clock: Clock) -> Self {
68        let window_ns = duration_as_nanos(window);
69        let granularity_ns = duration_as_nanos(granularity);
70        assert!(window_ns > granularity_ns);
71        let now = clock.recent();
72
73        let bucket_count = ((window_ns / granularity_ns) as usize) + 1;
74        let mut buckets = Vec::new();
75        for _ in 0..bucket_count {
76            buckets.push(AtomicBucket::new());
77        }
78
79        let next_upkeep = now + granularity_ns;
80
81        AtomicWindowedHistogram {
82            buckets,
83            bucket_count,
84            granularity: granularity_ns,
85            upkeep_index: AtomicUsize::new(0),
86            index: AtomicUsize::new(0),
87            next_upkeep: AtomicU64::new(next_upkeep),
88            clock,
89        }
90    }
91
92    /// Takes a snapshot of the current histogram.
93    ///
94    /// Returns a [`StreamingIntegers`] value, representing all observed values in the
95    /// histogram.  As writes happen concurrently, along with buckets being cleared, a snapshot is
96    /// not guaranteed to have all values present at the time the method was called.
97    pub fn snapshot(&self) -> StreamingIntegers {
98        // Run upkeep to make sure our window reflects any time passage since the last write.
99        let index = self.upkeep();
100
101        let mut streaming = StreamingIntegers::new();
102
103        // Start from the bucket ahead of the currently-being-written-to-bucket so that we outrace
104        // any upkeep and get access to more of the data.
105        for i in 0..self.bucket_count {
106            let bucket_index = (index + i + 1) % self.bucket_count;
107            let bucket = &self.buckets[bucket_index];
108            bucket.data_with(|block| streaming.compress(block));
109        }
110        streaming
111    }
112
113    /// Records a value to the histogram.
114    pub fn record(&self, value: u64) {
115        let index = self.upkeep();
116        self.buckets[index].push(value);
117    }
118
119    fn upkeep(&self) -> usize {
120        let backoff = Backoff::new();
121
122        loop {
123            // Start by figuring out if the histogram needs to perform upkeep.
124            let now = self.clock.recent();
125            let next_upkeep = self.next_upkeep.load(Ordering::Acquire);
126            if now <= next_upkeep {
127                let index = self.index.load(Ordering::Acquire);
128                let actual_index = index % self.bucket_count;
129
130                return actual_index;
131            }
132
133            // We do need to perform upkeep, but someone *else* might actually be doing it already,
134            // so go ahead and wait until the index is caught up with the upkeep index: the upkeep
135            // index will be ahead of index until upkeep is complete.
136            let mut upkeep_in_progress = false;
137            let mut index;
138            loop {
139                index = self.index.load(Ordering::Acquire);
140                let upkeep_index = self.upkeep_index.load(Ordering::Acquire);
141                if index == upkeep_index {
142                    break;
143                }
144
145                upkeep_in_progress = true;
146                backoff.snooze();
147            }
148
149            // If we waited for another upkeep operation to complete, then there's the chance that
150            // enough time has passed that we're due for upkeep again, so restart our loop.
151            if upkeep_in_progress {
152                continue;
153            }
154
155            // Figure out how many buckets, up to the maximum, need to be cleared based on the
156            // delta between the target upkeep time and the actual time.  We always clear at least
157            // one bucket, but may need to clear them all.
158            let delta = now - next_upkeep;
159            let bucket_depth = cmp::min((delta / self.granularity) as usize, self.bucket_count) + 1;
160
161            // Now that we we know how many buckets we need to clear, update the index to pointer
162            // writers at the next bucket past the last one that we will be clearing.
163            let new_index = index + bucket_depth;
164            let prev_index = self
165                .index
166                .compare_and_swap(index, new_index, Ordering::SeqCst);
167            if prev_index == index {
168                // Clear the target bucket first, and then update the upkeep target time so new
169                // writers can proceed.  We may still have other buckets to clean up if we had
170                // multiple rounds worth of upkeep to do, but this will let new writes proceed as
171                // soon as possible.
172                let clear_index = new_index % self.bucket_count;
173                self.buckets[clear_index].clear();
174
175                let now = self.clock.now();
176                let next_upkeep = now + self.granularity;
177                self.next_upkeep.store(next_upkeep, Ordering::Release);
178
179                // Now that we've cleared the actual bucket that writers will use going forward, we
180                // have to clear any older buckets that we skipped over.  If our granularity was 1
181                // second, and we skipped over 4 seconds worth of buckets, we would still have
182                // 3 buckets to clear, etc.
183                let last_index = new_index - 1;
184                while index < last_index {
185                    index += 1;
186                    let clear_index = index % self.bucket_count;
187                    self.buckets[clear_index].clear();
188                }
189
190                // We've cleared the old buckets, so upkeep is done.  Push our upkeep index forward
191                // so that writers who were blocked waiting for upkeep to conclude can restart.
192                self.upkeep_index.store(new_index, Ordering::Release);
193            }
194        }
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::{AtomicWindowedHistogram, Clock};
201    use crossbeam_utils::thread;
202    use std::time::Duration;
203
204    #[test]
205    fn test_histogram_simple_update() {
206        let (clock, _ctl) = Clock::mock();
207        let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
208
209        h.record(1245);
210
211        let snapshot = h.snapshot();
212        assert_eq!(snapshot.len(), 1);
213
214        let values = snapshot.decompress();
215        assert_eq!(values.len(), 1);
216        assert_eq!(values.get(0).unwrap(), &1245);
217    }
218
219    #[test]
220    fn test_histogram_complex_update() {
221        let (clock, _ctl) = Clock::mock();
222        let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
223
224        h.record(1245);
225        h.record(213);
226        h.record(1022);
227        h.record(1248);
228
229        let snapshot = h.snapshot();
230        assert_eq!(snapshot.len(), 4);
231
232        let values = snapshot.decompress();
233        assert_eq!(values.len(), 4);
234        assert_eq!(values.get(0).unwrap(), &1245);
235        assert_eq!(values.get(1).unwrap(), &213);
236        assert_eq!(values.get(2).unwrap(), &1022);
237        assert_eq!(values.get(3).unwrap(), &1248);
238    }
239
240    #[test]
241    fn test_windowed_histogram_rollover() {
242        let (clock, ctl) = Clock::mock();
243
244        // Set our granularity at right below a second, so that when we when add a second, we don't
245        // land on the same exact value, and our "now" time should always be ahead of the upkeep
246        // time when we expect it to be.
247        let h =
248            AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_millis(999), clock);
249
250        // Histogram is empty, snapshot is empty.
251        let snapshot = h.snapshot();
252        assert_eq!(snapshot.len(), 0);
253
254        // Immediately add two values, and observe the histogram and snapshot having two values.
255        h.record(1);
256        h.record(2);
257        let snapshot = h.snapshot();
258        assert_eq!(snapshot.len(), 2);
259        let total: u64 = snapshot.decompress().iter().sum();
260        assert_eq!(total, 3);
261
262        // Roll forward 3 seconds, should still have everything.
263        ctl.increment(Duration::from_secs(3));
264        let snapshot = h.snapshot();
265        assert_eq!(snapshot.len(), 2);
266        let total: u64 = snapshot.decompress().iter().sum();
267        assert_eq!(total, 3);
268
269        // Roll forward 1 second, should still have everything.
270        ctl.increment(Duration::from_secs(1));
271        let snapshot = h.snapshot();
272        assert_eq!(snapshot.len(), 2);
273        let total: u64 = snapshot.decompress().iter().sum();
274        assert_eq!(total, 3);
275
276        // Roll forward 1 second, should still have everything.
277        ctl.increment(Duration::from_secs(1));
278        let snapshot = h.snapshot();
279        assert_eq!(snapshot.len(), 2);
280        let total: u64 = snapshot.decompress().iter().sum();
281        assert_eq!(total, 3);
282
283        // Pump in some new values.  We should have a total of 5 values now.
284        h.record(3);
285        h.record(4);
286        h.record(5);
287
288        let snapshot = h.snapshot();
289        assert_eq!(snapshot.len(), 5);
290        let total: u64 = snapshot.decompress().iter().sum();
291        assert_eq!(total, 15);
292
293        // Roll forward 6 seconds, in increments.  The first one rolls over a single bucket, and
294        // cleans bucket #0, the first one we wrote to.  The second and third ones get us right up
295        // to the last three values, and then clear them out.
296        ctl.increment(Duration::from_secs(1));
297        let snapshot = h.snapshot();
298        assert_eq!(snapshot.len(), 3);
299        let total: u64 = snapshot.decompress().iter().sum();
300        assert_eq!(total, 12);
301
302        ctl.increment(Duration::from_secs(4));
303        let snapshot = h.snapshot();
304        assert_eq!(snapshot.len(), 3);
305        let total: u64 = snapshot.decompress().iter().sum();
306        assert_eq!(total, 12);
307
308        ctl.increment(Duration::from_secs(1));
309        let snapshot = h.snapshot();
310        assert_eq!(snapshot.len(), 0);
311
312        // We should also be able to advance by vast periods of time and observe not only old
313        // values going away but no weird overflow issues or index or anything.  This ensures that
314        // our upkeep code functions not just for under-load single bucket rollovers but also "been
315        // idle for a while and just got a write" scenarios.
316        h.record(42);
317
318        let snapshot = h.snapshot();
319        assert_eq!(snapshot.len(), 1);
320        let total: u64 = snapshot.decompress().iter().sum();
321        assert_eq!(total, 42);
322
323        ctl.increment(Duration::from_secs(1000));
324        let snapshot = h.snapshot();
325        assert_eq!(snapshot.len(), 0);
326    }
327
328    #[test]
329    fn test_histogram_write_gauntlet_mt() {
330        let clock = Clock::new();
331        let clock2 = clock.clone();
332        let target = clock.now() + Duration::from_secs(5).as_nanos() as u64;
333        let h = AtomicWindowedHistogram::new(
334            Duration::from_secs(20),
335            Duration::from_millis(500),
336            clock,
337        );
338
339        thread::scope(|s| {
340            let t1 = s.spawn(|_| {
341                let mut total = 0;
342                while clock2.now() < target {
343                    h.record(42);
344                    total += 1;
345                }
346                total
347            });
348            let t2 = s.spawn(|_| {
349                let mut total = 0;
350                while clock2.now() < target {
351                    h.record(42);
352                    total += 1;
353                }
354                total
355            });
356            let t3 = s.spawn(|_| {
357                let mut total = 0;
358                while clock2.now() < target {
359                    h.record(42);
360                    total += 1;
361                }
362                total
363            });
364
365            let t1_total = t1.join().expect("thread 1 panicked during test");
366            let t2_total = t2.join().expect("thread 2 panicked during test");
367            let t3_total = t3.join().expect("thread 3 panicked during test");
368
369            let total = t1_total + t2_total + t3_total;
370            let snap = h.snapshot();
371            assert_eq!(total, snap.len());
372        })
373        .unwrap();
374    }
375}