prometheus_utils/
percentile.rs

1use crate::{LabelValues, Labels};
2use num_traits::Zero;
3use parking_lot::Mutex;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6/// /!\ Magic number warning /!\
7///
8/// 4 is arbitrary. The code will function with even only one window, but any
9/// new datapoints while sampling will be lost. Two is sufficient to capture
10/// data while sampling the old window, simply bumping `current_window` so
11/// samples can continue to be recorded. Four windows is only additionally
12/// meaningful if we ever want to record actual samples of prior sampling
13/// windows.
14const SAMPLING_WINDOWS: usize = 4;
15
16/// [`Windowing`] is a mechanism for rotating between different observations.
17/// It provides an accessor [`Windowing::current`] for the current
18/// observation, and a method [`Windowing::cycle_windows`] which makes the
19/// next observation in the ring current, and returns the observation which
20/// was current prior to the call.
21pub struct Windowing<P> {
22    current_window: AtomicUsize,
23    windows: [Box<P>; SAMPLING_WINDOWS],
24}
25
26impl<P: Default> Windowing<P> {
27    /// Constructor. Initializes its owned ring of `P`s using [`Default::default()`].
28    pub fn new() -> Self {
29        Self {
30            current_window: AtomicUsize::new(0),
31            windows: [
32                Box::new(P::default()),
33                Box::new(P::default()),
34                Box::new(P::default()),
35                Box::new(P::default()),
36            ],
37        }
38    }
39
40    /// Get the current collection. The underling `P` is expected to be
41    /// cycled on some regular interval.
42    ///
43    /// Data integrity guarantees are weak. In some circumstances, the
44    /// returned `P` window may be for the prior interval, if whatever wants
45    /// to write a datapoint races with something replacing the current `P`.
46    /// It is even possible (if extremely unlikely) for a value to be written
47    /// into an old `P` collection, if the writer races with a reader and
48    /// writes after the reader has emptied the collection and released its
49    /// lock.
50    pub fn current(&self) -> &P {
51        &self.windows[self.current_window.load(Ordering::SeqCst)]
52    }
53
54    /// Cycle to the next window. Returns the window which was
55    /// active before the call.
56    pub fn cycle_windows(&self) -> &P {
57        let old_idx = self.current_window.load(Ordering::SeqCst);
58        self.current_window
59            .store((old_idx + 1) % self.windows.len(), Ordering::SeqCst);
60        &self.windows[old_idx]
61    }
62}
63
64/// Since this is a constant shared for all ObservationSet, it currently must be tuned for the
65/// busiest stat so as to not drop samples. An appropriate value for `WINDOW_SIZE` must be decided
66/// in conjunction with the window sampling rate - currently at 15 seconds, this means the busiest
67/// `ObservationSet` can handle ~4369 (65536 / 15) events per second.
68const WINDOW_SIZE: usize = 65536;
69
70struct ObservationSet<T: Ord + Zero + Copy> {
71    idx: usize,
72    wraps: usize,
73    data: Box<[T]>,
74}
75
76impl<T: Ord + Zero + Copy> ObservationSet<T> {
77    pub fn new() -> Self {
78        Self {
79            idx: 0,
80            wraps: 0,
81            // Construct in a manner that doesnt use stack space - Box::new([0; WINDOW_SIZE]) would
82            data: vec![T::zero(); WINDOW_SIZE].into_boxed_slice(),
83        }
84    }
85
86    /// Empty this ring buffer. The underlying data remains unchanged, but will be overwritten by
87    /// at least `idx` entries when asking for the next sample, so it will not be visible in the
88    /// future.
89    fn clear(&mut self) {
90        self.idx = 0;
91        self.wraps = 0;
92    }
93
94    fn wraps(&self) -> usize {
95        self.wraps
96    }
97
98    fn sorted_data(&mut self) -> &[T] {
99        let data = &mut self.data[..self.idx];
100        data.sort_unstable();
101        data
102    }
103
104    fn add(&mut self, observation: T) {
105        self.data[self.idx] = observation;
106
107        self.idx = (self.idx + 1) % WINDOW_SIZE;
108        if self.idx == 0 {
109            // next_idx starts at 0, which means if we just added one and see zero, the index
110            // wrapped.
111            self.wraps = self.wraps.saturating_add(1);
112        }
113    }
114
115    // While not currently used, `size` has a not-exactly-obvious implementation and is left here
116    // in case a curious reader needs it.
117    #[allow(dead_code)]
118    fn size(&self) -> usize {
119        if self.wraps > 0 {
120            // the index wrapped at least once, so the ring buffer is definitely full.
121            self.data.len()
122        } else {
123            // index hasn't wrapped yet, so it counts the number of samples recorded in this buffer.
124            self.idx
125        }
126    }
127}
128
129/// A sample of the state in [`Observations`].
130#[derive(Debug, PartialEq, Eq)]
131pub struct Sample<T: Ord + Zero + Copy> {
132    /// Number of observations dropped due to lock contention
133    pub dropped: usize,
134    /// Number of times the observation window wrapped around
135    pub wraps: usize,
136    /// 25th percentile observation
137    pub p25: T,
138    /// 50th percentile observation
139    pub p50: T,
140    /// 75th percentile observation
141    pub p75: T,
142    /// 90th percentile observation
143    pub p90: T,
144    /// 95th percentile observation
145    pub p95: T,
146    /// 99th percentile observation
147    pub p99: T,
148    /// 99.9th percentile observation
149    pub p99p9: T,
150    /// Maximum observation
151    pub max: T,
152    /// Number of observations
153    pub count: usize,
154}
155
156/// Collect observations, which are sampled as a [`Sample`].
157pub struct Observations<T: Ord + Zero + Copy> {
158    observations: Mutex<ObservationSet<T>>,
159    drops: AtomicUsize,
160    name: &'static str,
161}
162
163impl<T: Ord + Zero + Copy> Observations<T> {
164    /// Constructor. The `name` parameter has no semantic meaning, and is only
165    /// exposed by [`Observations::name()`].
166    pub fn new(name: &'static str) -> Self {
167        Self {
168            observations: Mutex::new(ObservationSet::new()),
169            drops: AtomicUsize::new(0),
170            name,
171        }
172    }
173
174    /// Name associated with the observations, as provided in constructor.
175    pub fn name(&self) -> &'static str {
176        self.name
177    }
178
179    /// Take a sample of the observations. Calculates a [`Sample`] corresponding to the current
180    /// state, and then clears that state.
181    pub fn sample(&self) -> Sample<T> {
182        let mut observations = self.observations.lock();
183        let wraps = observations.wraps();
184        let sorted = observations.sorted_data();
185
186        fn percentile<T: Ord + Zero + Copy>(sorted_ts: &[T], p: f64) -> T {
187            if sorted_ts.len() == 0 {
188                T::zero()
189            } else {
190                let percentile_idx = ((sorted_ts.len() as f64 * p) / 100.0) as usize;
191                sorted_ts[percentile_idx]
192            }
193        }
194        let p25 = percentile(&sorted, 25.0);
195        let p50 = percentile(&sorted, 50.0);
196        let p75 = percentile(&sorted, 75.0);
197        let p90 = percentile(&sorted, 90.0);
198        let p95 = percentile(&sorted, 95.0);
199        let p99 = percentile(&sorted, 99.0);
200        let p99p9 = percentile(&sorted, 99.9);
201        let max = sorted.last().map(|x| *x).unwrap_or_else(|| T::zero());
202        let count = sorted.len();
203        observations.clear();
204        std::mem::drop(observations);
205
206        // now that we've unblocked writing new observations, no more will be dropped, and we can
207        // reset the drop count to 0
208        let dropped = self.drops.swap(0, Ordering::SeqCst);
209        Sample {
210            dropped,
211            wraps,
212            p25,
213            p50,
214            p75,
215            p90,
216            p95,
217            p99,
218            p99p9,
219            max,
220            count,
221        }
222    }
223
224    /// Attempt to record this `T` as part of the collection of observations. "Attempt", because if
225    /// a reader is currently using this `ObservationSet`, the observation is dropped. This
226    /// prevents recording from being a blocking operation
227    pub fn record(&self, observation: T) {
228        if let Some(mut observations) = self.observations.try_lock() {
229            observations.add(observation);
230        } else {
231            // something else is using the data right now, just drop the observation
232            self.drops.fetch_add(1, Ordering::SeqCst);
233        }
234    }
235}
236
237crate::label_enum! {
238    /// Labels corresponding to the fields in [`Sample`]
239    pub enum TimingBucket {
240        /// 25th percentile observation
241        P25,
242        /// 50th percentile observation
243        P50,
244        /// 75th percentile observation
245        P75,
246        /// 90th percentile observation
247        P90,
248        /// 95th percentile observation
249        P95,
250        /// 99th percentile observation
251        P99,
252        /// 99.9th percentile observation
253        P99P9,
254        /// Maximum observation
255        Max,
256        /// Number of observations
257        Count,
258    }
259}
260
261impl Labels for TimingBucket {
262    fn label_names() -> Vec<&'static str> {
263        vec!["bucket"]
264    }
265    fn possible_label_values() -> Vec<LabelValues<'static>> {
266        Self::all_variants()
267            .into_iter()
268            .map(|b| vec![b.as_str()])
269            .collect()
270    }
271    fn label_values(&self) -> LabelValues {
272        vec![self.as_str()]
273    }
274}
275
276impl<T: Ord + Zero + Copy + Into<i64>> Sample<T> {
277    /// Returns each member of the struct along with its [`TimingBucket`]
278    /// label.  Each percentile is given as an i64.
279    pub fn as_bucket_pairs(&self) -> Vec<(TimingBucket, i64)> {
280        vec![
281            (TimingBucket::P25, self.p25.into()),
282            (TimingBucket::P50, self.p50.into()),
283            (TimingBucket::P75, self.p75.into()),
284            (TimingBucket::P90, self.p90.into()),
285            (TimingBucket::P95, self.p95.into()),
286            (TimingBucket::P99, self.p99.into()),
287            (TimingBucket::P99P9, self.p99p9.into()),
288            (TimingBucket::Max, self.max.into()),
289            (TimingBucket::Count, self.count as i64),
290        ]
291    }
292
293    /// Returns the number of observations dropped due to the observation lock
294    /// being held.
295    pub fn dropped(&self) -> usize {
296        self.dropped
297    }
298
299    /// Returns the number of times the observation count exceeded the available
300    /// window size.
301    pub fn wraps(&self) -> usize {
302        self.wraps
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::{Observations, Sample, WINDOW_SIZE};
309
310    #[test]
311    fn test_wraps_are_reported() {
312        let observations = Observations::new("test");
313
314        for i in 0..WINDOW_SIZE {
315            observations.record(i);
316        }
317
318        observations.record(500);
319        observations.record(501);
320        observations.record(502);
321        observations.record(503);
322
323        let sample = observations.sample();
324
325        // `dropped` counts the number of samples that didn't make it to the underlying ring
326        // buffer, but excessive samples are not dropped! overflows start writing over the start of
327        // the buffer, and increment `wraps`.
328        assert_eq!(sample.dropped, 0);
329        assert_eq!(sample.wraps, 1);
330
331        // sample again to confirm that defaults are zero and that wraps have not occurred since
332        // the last sample.
333        let sample = observations.sample();
334
335        assert_eq!(
336            sample,
337            Sample {
338                dropped: 0,
339                wraps: 0,
340                p25: 0,
341                p50: 0,
342                p75: 0,
343                p90: 0,
344                p95: 0,
345                p99: 0,
346                p99p9: 0,
347                max: 0,
348                count: 0,
349            }
350        );
351    }
352
353    #[test]
354    fn test_percentiles_are_reported() {
355        #[rustfmt::skip]
356        let data = [
357                 1,  2,  3,  4,  5,  6,  7,  8,  9,
358            10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
359            20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
360            30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
361            40, 41, 42, 43, 44, 45, 46, 47, 48, 49,
362            50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
363            60, 61, 62, 63, 64, 65, 66, 67, 68, 69,
364            70, 71, 72, 73, 74, 75, 76, 77, 78, 79,
365            80, 81, 82, 83, 84, 85, 86, 87, 88, 89,
366            90, 91, 92, 93, 94, 95, 96, 97, 98, 99,
367        ];
368
369        let observations = Observations::new("test");
370
371        for datum in data.iter().cloned() {
372            observations.record(datum);
373        }
374
375        let sample = observations.sample();
376
377        assert_eq!(
378            sample,
379            Sample {
380                dropped: 0,
381                wraps: 0,
382                p25: 25,
383                p50: 50,
384                p75: 75,
385                p90: 90,
386                p95: 95,
387                p99: 99,
388                p99p9: 99,
389                max: 99,
390                count: 99,
391            }
392        );
393    }
394
395    #[test]
396    fn test_small_sampleset() {
397        let observations = Observations::new("test");
398
399        observations.record(500);
400        observations.record(501);
401        observations.record(502);
402        observations.record(503);
403        observations.record(504);
404
405        let sample = observations.sample();
406
407        assert_eq!(
408            sample,
409            Sample {
410                dropped: 0,
411                wraps: 0,
412                p25: 501,
413                p50: 502,
414                p75: 503,
415                p90: 504,
416                p95: 504,
417                p99: 504,
418                p99p9: 504,
419                max: 504,
420                count: 5,
421            }
422        );
423    }
424
425    #[test]
426    fn test_overflow_wraps_writes() {
427        let observations = Observations::new("test");
428
429        for _ in 0..WINDOW_SIZE {
430            observations.record(1);
431        }
432
433        // at this point, we've wrapped the window, and start overwriting `1` samples.
434        for _ in 0..(WINDOW_SIZE / 2) {
435            observations.record(2);
436        }
437
438        for _ in 0..(WINDOW_SIZE / 10) {
439            observations.record(3);
440        }
441
442        let sample = observations.sample();
443
444        assert_eq!(
445            sample,
446            Sample {
447                dropped: 0,
448                wraps: 1,
449                p25: 2,
450                p50: 2,
451                p75: 2,
452                p90: 3,
453                p95: 3,
454                p99: 3,
455                p99p9: 3,
456                max: 3,
457                count: WINDOW_SIZE / 2 + WINDOW_SIZE / 10,
458            }
459        );
460    }
461}