prometheus_utils/percentile.rs
1use crate::{LabelValues, Labels};
2use num_traits::Zero;
3use parking_lot::Mutex;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6/// /!\ Magic number warning /!\
7///
8/// 4 is arbitrary. The code will function with even only one window, but any
9/// new datapoints while sampling will be lost. Two is sufficient to capture
10/// data while sampling the old window, simply bumping `current_window` so
11/// samples can continue to be recorded. Four windows is only additionally
12/// meaningful if we ever want to record actual samples of prior sampling
13/// windows.
14const SAMPLING_WINDOWS: usize = 4;
15
16/// [`Windowing`] is a mechanism for rotating between different observations.
17/// It provides an accessor [`Windowing::current`] for the current
18/// observation, and a method [`Windowing::cycle_windows`] which makes the
19/// next observation in the ring current, and returns the observation which
20/// was current prior to the call.
21pub struct Windowing<P> {
22 current_window: AtomicUsize,
23 windows: [Box<P>; SAMPLING_WINDOWS],
24}
25
26impl<P: Default> Windowing<P> {
27 /// Constructor. Initializes its owned ring of `P`s using [`Default::default()`].
28 pub fn new() -> Self {
29 Self {
30 current_window: AtomicUsize::new(0),
31 windows: [
32 Box::new(P::default()),
33 Box::new(P::default()),
34 Box::new(P::default()),
35 Box::new(P::default()),
36 ],
37 }
38 }
39
40 /// Get the current collection. The underling `P` is expected to be
41 /// cycled on some regular interval.
42 ///
43 /// Data integrity guarantees are weak. In some circumstances, the
44 /// returned `P` window may be for the prior interval, if whatever wants
45 /// to write a datapoint races with something replacing the current `P`.
46 /// It is even possible (if extremely unlikely) for a value to be written
47 /// into an old `P` collection, if the writer races with a reader and
48 /// writes after the reader has emptied the collection and released its
49 /// lock.
50 pub fn current(&self) -> &P {
51 &self.windows[self.current_window.load(Ordering::SeqCst)]
52 }
53
54 /// Cycle to the next window. Returns the window which was
55 /// active before the call.
56 pub fn cycle_windows(&self) -> &P {
57 let old_idx = self.current_window.load(Ordering::SeqCst);
58 self.current_window
59 .store((old_idx + 1) % self.windows.len(), Ordering::SeqCst);
60 &self.windows[old_idx]
61 }
62}
63
64/// Since this is a constant shared for all ObservationSet, it currently must be tuned for the
65/// busiest stat so as to not drop samples. An appropriate value for `WINDOW_SIZE` must be decided
66/// in conjunction with the window sampling rate - currently at 15 seconds, this means the busiest
67/// `ObservationSet` can handle ~4369 (65536 / 15) events per second.
68const WINDOW_SIZE: usize = 65536;
69
70struct ObservationSet<T: Ord + Zero + Copy> {
71 idx: usize,
72 wraps: usize,
73 data: Box<[T]>,
74}
75
76impl<T: Ord + Zero + Copy> ObservationSet<T> {
77 pub fn new() -> Self {
78 Self {
79 idx: 0,
80 wraps: 0,
81 // Construct in a manner that doesnt use stack space - Box::new([0; WINDOW_SIZE]) would
82 data: vec![T::zero(); WINDOW_SIZE].into_boxed_slice(),
83 }
84 }
85
86 /// Empty this ring buffer. The underlying data remains unchanged, but will be overwritten by
87 /// at least `idx` entries when asking for the next sample, so it will not be visible in the
88 /// future.
89 fn clear(&mut self) {
90 self.idx = 0;
91 self.wraps = 0;
92 }
93
94 fn wraps(&self) -> usize {
95 self.wraps
96 }
97
98 fn sorted_data(&mut self) -> &[T] {
99 let data = &mut self.data[..self.idx];
100 data.sort_unstable();
101 data
102 }
103
104 fn add(&mut self, observation: T) {
105 self.data[self.idx] = observation;
106
107 self.idx = (self.idx + 1) % WINDOW_SIZE;
108 if self.idx == 0 {
109 // next_idx starts at 0, which means if we just added one and see zero, the index
110 // wrapped.
111 self.wraps = self.wraps.saturating_add(1);
112 }
113 }
114
115 // While not currently used, `size` has a not-exactly-obvious implementation and is left here
116 // in case a curious reader needs it.
117 #[allow(dead_code)]
118 fn size(&self) -> usize {
119 if self.wraps > 0 {
120 // the index wrapped at least once, so the ring buffer is definitely full.
121 self.data.len()
122 } else {
123 // index hasn't wrapped yet, so it counts the number of samples recorded in this buffer.
124 self.idx
125 }
126 }
127}
128
129/// A sample of the state in [`Observations`].
130#[derive(Debug, PartialEq, Eq)]
131pub struct Sample<T: Ord + Zero + Copy> {
132 /// Number of observations dropped due to lock contention
133 pub dropped: usize,
134 /// Number of times the observation window wrapped around
135 pub wraps: usize,
136 /// 25th percentile observation
137 pub p25: T,
138 /// 50th percentile observation
139 pub p50: T,
140 /// 75th percentile observation
141 pub p75: T,
142 /// 90th percentile observation
143 pub p90: T,
144 /// 95th percentile observation
145 pub p95: T,
146 /// 99th percentile observation
147 pub p99: T,
148 /// 99.9th percentile observation
149 pub p99p9: T,
150 /// Maximum observation
151 pub max: T,
152 /// Number of observations
153 pub count: usize,
154}
155
156/// Collect observations, which are sampled as a [`Sample`].
157pub struct Observations<T: Ord + Zero + Copy> {
158 observations: Mutex<ObservationSet<T>>,
159 drops: AtomicUsize,
160 name: &'static str,
161}
162
163impl<T: Ord + Zero + Copy> Observations<T> {
164 /// Constructor. The `name` parameter has no semantic meaning, and is only
165 /// exposed by [`Observations::name()`].
166 pub fn new(name: &'static str) -> Self {
167 Self {
168 observations: Mutex::new(ObservationSet::new()),
169 drops: AtomicUsize::new(0),
170 name,
171 }
172 }
173
174 /// Name associated with the observations, as provided in constructor.
175 pub fn name(&self) -> &'static str {
176 self.name
177 }
178
179 /// Take a sample of the observations. Calculates a [`Sample`] corresponding to the current
180 /// state, and then clears that state.
181 pub fn sample(&self) -> Sample<T> {
182 let mut observations = self.observations.lock();
183 let wraps = observations.wraps();
184 let sorted = observations.sorted_data();
185
186 fn percentile<T: Ord + Zero + Copy>(sorted_ts: &[T], p: f64) -> T {
187 if sorted_ts.len() == 0 {
188 T::zero()
189 } else {
190 let percentile_idx = ((sorted_ts.len() as f64 * p) / 100.0) as usize;
191 sorted_ts[percentile_idx]
192 }
193 }
194 let p25 = percentile(&sorted, 25.0);
195 let p50 = percentile(&sorted, 50.0);
196 let p75 = percentile(&sorted, 75.0);
197 let p90 = percentile(&sorted, 90.0);
198 let p95 = percentile(&sorted, 95.0);
199 let p99 = percentile(&sorted, 99.0);
200 let p99p9 = percentile(&sorted, 99.9);
201 let max = sorted.last().map(|x| *x).unwrap_or_else(|| T::zero());
202 let count = sorted.len();
203 observations.clear();
204 std::mem::drop(observations);
205
206 // now that we've unblocked writing new observations, no more will be dropped, and we can
207 // reset the drop count to 0
208 let dropped = self.drops.swap(0, Ordering::SeqCst);
209 Sample {
210 dropped,
211 wraps,
212 p25,
213 p50,
214 p75,
215 p90,
216 p95,
217 p99,
218 p99p9,
219 max,
220 count,
221 }
222 }
223
224 /// Attempt to record this `T` as part of the collection of observations. "Attempt", because if
225 /// a reader is currently using this `ObservationSet`, the observation is dropped. This
226 /// prevents recording from being a blocking operation
227 pub fn record(&self, observation: T) {
228 if let Some(mut observations) = self.observations.try_lock() {
229 observations.add(observation);
230 } else {
231 // something else is using the data right now, just drop the observation
232 self.drops.fetch_add(1, Ordering::SeqCst);
233 }
234 }
235}
236
237crate::label_enum! {
238 /// Labels corresponding to the fields in [`Sample`]
239 pub enum TimingBucket {
240 /// 25th percentile observation
241 P25,
242 /// 50th percentile observation
243 P50,
244 /// 75th percentile observation
245 P75,
246 /// 90th percentile observation
247 P90,
248 /// 95th percentile observation
249 P95,
250 /// 99th percentile observation
251 P99,
252 /// 99.9th percentile observation
253 P99P9,
254 /// Maximum observation
255 Max,
256 /// Number of observations
257 Count,
258 }
259}
260
261impl Labels for TimingBucket {
262 fn label_names() -> Vec<&'static str> {
263 vec!["bucket"]
264 }
265 fn possible_label_values() -> Vec<LabelValues<'static>> {
266 Self::all_variants()
267 .into_iter()
268 .map(|b| vec![b.as_str()])
269 .collect()
270 }
271 fn label_values(&self) -> LabelValues {
272 vec![self.as_str()]
273 }
274}
275
276impl<T: Ord + Zero + Copy + Into<i64>> Sample<T> {
277 /// Returns each member of the struct along with its [`TimingBucket`]
278 /// label. Each percentile is given as an i64.
279 pub fn as_bucket_pairs(&self) -> Vec<(TimingBucket, i64)> {
280 vec![
281 (TimingBucket::P25, self.p25.into()),
282 (TimingBucket::P50, self.p50.into()),
283 (TimingBucket::P75, self.p75.into()),
284 (TimingBucket::P90, self.p90.into()),
285 (TimingBucket::P95, self.p95.into()),
286 (TimingBucket::P99, self.p99.into()),
287 (TimingBucket::P99P9, self.p99p9.into()),
288 (TimingBucket::Max, self.max.into()),
289 (TimingBucket::Count, self.count as i64),
290 ]
291 }
292
293 /// Returns the number of observations dropped due to the observation lock
294 /// being held.
295 pub fn dropped(&self) -> usize {
296 self.dropped
297 }
298
299 /// Returns the number of times the observation count exceeded the available
300 /// window size.
301 pub fn wraps(&self) -> usize {
302 self.wraps
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::{Observations, Sample, WINDOW_SIZE};
309
310 #[test]
311 fn test_wraps_are_reported() {
312 let observations = Observations::new("test");
313
314 for i in 0..WINDOW_SIZE {
315 observations.record(i);
316 }
317
318 observations.record(500);
319 observations.record(501);
320 observations.record(502);
321 observations.record(503);
322
323 let sample = observations.sample();
324
325 // `dropped` counts the number of samples that didn't make it to the underlying ring
326 // buffer, but excessive samples are not dropped! overflows start writing over the start of
327 // the buffer, and increment `wraps`.
328 assert_eq!(sample.dropped, 0);
329 assert_eq!(sample.wraps, 1);
330
331 // sample again to confirm that defaults are zero and that wraps have not occurred since
332 // the last sample.
333 let sample = observations.sample();
334
335 assert_eq!(
336 sample,
337 Sample {
338 dropped: 0,
339 wraps: 0,
340 p25: 0,
341 p50: 0,
342 p75: 0,
343 p90: 0,
344 p95: 0,
345 p99: 0,
346 p99p9: 0,
347 max: 0,
348 count: 0,
349 }
350 );
351 }
352
353 #[test]
354 fn test_percentiles_are_reported() {
355 #[rustfmt::skip]
356 let data = [
357 1, 2, 3, 4, 5, 6, 7, 8, 9,
358 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
359 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
360 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
361 40, 41, 42, 43, 44, 45, 46, 47, 48, 49,
362 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
363 60, 61, 62, 63, 64, 65, 66, 67, 68, 69,
364 70, 71, 72, 73, 74, 75, 76, 77, 78, 79,
365 80, 81, 82, 83, 84, 85, 86, 87, 88, 89,
366 90, 91, 92, 93, 94, 95, 96, 97, 98, 99,
367 ];
368
369 let observations = Observations::new("test");
370
371 for datum in data.iter().cloned() {
372 observations.record(datum);
373 }
374
375 let sample = observations.sample();
376
377 assert_eq!(
378 sample,
379 Sample {
380 dropped: 0,
381 wraps: 0,
382 p25: 25,
383 p50: 50,
384 p75: 75,
385 p90: 90,
386 p95: 95,
387 p99: 99,
388 p99p9: 99,
389 max: 99,
390 count: 99,
391 }
392 );
393 }
394
395 #[test]
396 fn test_small_sampleset() {
397 let observations = Observations::new("test");
398
399 observations.record(500);
400 observations.record(501);
401 observations.record(502);
402 observations.record(503);
403 observations.record(504);
404
405 let sample = observations.sample();
406
407 assert_eq!(
408 sample,
409 Sample {
410 dropped: 0,
411 wraps: 0,
412 p25: 501,
413 p50: 502,
414 p75: 503,
415 p90: 504,
416 p95: 504,
417 p99: 504,
418 p99p9: 504,
419 max: 504,
420 count: 5,
421 }
422 );
423 }
424
425 #[test]
426 fn test_overflow_wraps_writes() {
427 let observations = Observations::new("test");
428
429 for _ in 0..WINDOW_SIZE {
430 observations.record(1);
431 }
432
433 // at this point, we've wrapped the window, and start overwriting `1` samples.
434 for _ in 0..(WINDOW_SIZE / 2) {
435 observations.record(2);
436 }
437
438 for _ in 0..(WINDOW_SIZE / 10) {
439 observations.record(3);
440 }
441
442 let sample = observations.sample();
443
444 assert_eq!(
445 sample,
446 Sample {
447 dropped: 0,
448 wraps: 1,
449 p25: 2,
450 p50: 2,
451 p75: 2,
452 p90: 3,
453 p95: 3,
454 p99: 3,
455 p99p9: 3,
456 max: 3,
457 count: WINDOW_SIZE / 2 + WINDOW_SIZE / 10,
458 }
459 );
460 }
461}