metrics_exporter_prometheus/
distribution.rs

1use std::num::NonZeroU32;
2use std::time::Duration;
3use std::{collections::HashMap, sync::Arc};
4
5use quanta::Instant;
6
7use crate::common::Matcher;
8use crate::native_histogram::{NativeHistogram, NativeHistogramConfig};
9
10use metrics_util::{
11    storage::{Histogram, Summary},
12    Quantile,
13};
14
15const DEFAULT_SUMMARY_BUCKET_COUNT: NonZeroU32 = match NonZeroU32::new(3) {
16    Some(v) => v,
17    None => unreachable!(),
18};
19const DEFAULT_SUMMARY_BUCKET_DURATION: Duration = Duration::from_secs(20);
20
21/// Distribution type.
22#[derive(Clone, Debug)]
23pub enum Distribution {
24    /// A Prometheus histogram.
25    ///
26    /// Exposes "bucketed" values to Prometheus, counting the number of samples
27    /// below a given threshold i.e. 100 requests faster than 20ms, 1000 requests
28    /// faster than 50ms, etc.
29    Histogram(Histogram),
30    /// A Prometheus summary.
31    ///
32    /// Computes and exposes value quantiles directly to Prometheus i.e. 50% of
33    /// requests were faster than 200ms, and 99% of requests were faster than
34    /// 1000ms, etc.
35    Summary(RollingSummary, Arc<Vec<Quantile>>, f64),
36    /// A Prometheus native histogram.
37    ///
38    /// Uses exponential buckets to efficiently represent histogram data without
39    /// requiring predefined bucket boundaries.
40    NativeHistogram(NativeHistogram),
41}
42
43impl Distribution {
44    /// Creates a histogram distribution.
45    ///
46    /// # Panics
47    ///
48    /// Panics if `buckets` is empty.
49    pub fn new_histogram(buckets: &[f64]) -> Distribution {
50        let hist = Histogram::new(buckets).expect("buckets should never be empty");
51        Distribution::Histogram(hist)
52    }
53
54    /// Creates a summary distribution.
55    pub fn new_summary(
56        quantiles: Arc<Vec<Quantile>>,
57        bucket_duration: Duration,
58        bucket_count: NonZeroU32,
59    ) -> Distribution {
60        Distribution::Summary(RollingSummary::new(bucket_count, bucket_duration), quantiles, 0.0)
61    }
62
63    /// Creates a native histogram distribution.
64    pub fn new_native_histogram(config: NativeHistogramConfig) -> Distribution {
65        let hist = NativeHistogram::new(config);
66        Distribution::NativeHistogram(hist)
67    }
68
69    /// Records the given `samples` in the current distribution.
70    pub fn record_samples(&mut self, samples: &[(f64, Instant)]) {
71        match self {
72            Distribution::Histogram(hist) => {
73                hist.record_many(samples.iter().map(|(sample, _ts)| sample));
74            }
75            Distribution::Summary(hist, _, sum) => {
76                for (sample, ts) in samples {
77                    hist.add(*sample, *ts);
78                    *sum += *sample;
79                }
80            }
81            Distribution::NativeHistogram(hist) => {
82                for (sample, _ts) in samples {
83                    hist.observe(*sample);
84                }
85            }
86        }
87    }
88}
89
90/// Builds distributions for metric names based on a set of configured overrides.
91#[derive(Debug)]
92pub struct DistributionBuilder {
93    quantiles: Arc<Vec<Quantile>>,
94    buckets: Option<Vec<f64>>,
95    bucket_duration: Option<Duration>,
96    bucket_count: Option<NonZeroU32>,
97    bucket_overrides: Option<Vec<(Matcher, Vec<f64>)>>,
98    native_histogram_overrides: Option<Vec<(Matcher, NativeHistogramConfig)>>,
99}
100
101impl DistributionBuilder {
102    /// Creates a new instance of `DistributionBuilder`.
103    pub fn new(
104        quantiles: Vec<Quantile>,
105        bucket_duration: Option<Duration>,
106        buckets: Option<Vec<f64>>,
107        bucket_count: Option<NonZeroU32>,
108        bucket_overrides: Option<HashMap<Matcher, Vec<f64>>>,
109        native_histogram_overrides: Option<HashMap<Matcher, NativeHistogramConfig>>,
110    ) -> DistributionBuilder {
111        DistributionBuilder {
112            quantiles: Arc::new(quantiles),
113            bucket_duration,
114            buckets,
115            bucket_count,
116            bucket_overrides: bucket_overrides.map(|entries| {
117                let mut matchers = entries.into_iter().collect::<Vec<_>>();
118                matchers.sort_by(|a, b| a.0.cmp(&b.0));
119                matchers
120            }),
121            native_histogram_overrides: native_histogram_overrides.map(|entries| {
122                let mut matchers = entries.into_iter().collect::<Vec<_>>();
123                matchers.sort_by(|a, b| a.0.cmp(&b.0));
124                matchers
125            }),
126        }
127    }
128
129    /// Returns a distribution for the given metric key.
130    pub fn get_distribution(&self, name: &str) -> Distribution {
131        // Check for native histogram overrides first (highest priority)
132        if let Some(ref overrides) = self.native_histogram_overrides {
133            for (matcher, config) in overrides {
134                if matcher.matches(name) {
135                    return Distribution::new_native_histogram(config.clone());
136                }
137            }
138        }
139
140        // Check for histogram bucket overrides
141        if let Some(ref overrides) = self.bucket_overrides {
142            for (matcher, buckets) in overrides {
143                if matcher.matches(name) {
144                    return Distribution::new_histogram(buckets);
145                }
146            }
147        }
148
149        // Check for global histogram buckets
150        if let Some(ref buckets) = self.buckets {
151            return Distribution::new_histogram(buckets);
152        }
153
154        // Default to summary
155        let b_duration = self.bucket_duration.map_or(DEFAULT_SUMMARY_BUCKET_DURATION, |d| d);
156        let b_count = self.bucket_count.map_or(DEFAULT_SUMMARY_BUCKET_COUNT, |c| c);
157
158        Distribution::new_summary(self.quantiles.clone(), b_duration, b_count)
159    }
160
161    /// Returns the distribution type for the given metric key.
162    pub fn get_distribution_type(&self, name: &str) -> &'static str {
163        // Check for native histogram overrides first (highest priority)
164        if let Some(ref overrides) = self.native_histogram_overrides {
165            for (matcher, _) in overrides {
166                if matcher.matches(name) {
167                    return "native_histogram";
168                }
169            }
170        }
171
172        // Check for regular histogram buckets
173        if self.buckets.is_some() {
174            return "histogram";
175        }
176
177        if let Some(ref overrides) = self.bucket_overrides {
178            for (matcher, _) in overrides {
179                if matcher.matches(name) {
180                    return "histogram";
181                }
182            }
183        }
184
185        "summary"
186    }
187}
188
189#[derive(Clone, Debug)]
190struct Bucket {
191    begin: Instant,
192    summary: Summary,
193}
194
195/// A `RollingSummary` manages a list of [Summary] so that old results can be expired.
196#[derive(Clone, Debug)]
197pub struct RollingSummary {
198    // Buckets are ordered with the latest buckets first.  The buckets are kept in alignment based
199    // on the instant of the first added bucket and the bucket_duration.  There may be gaps in the
200    // bucket list.
201    buckets: Vec<Bucket>,
202    // Maximum number of buckets to track.
203    max_buckets: usize,
204    // Duration of values stored per bucket.
205    bucket_duration: Duration,
206    // This is the maximum duration a bucket will be kept.
207    max_bucket_duration: Duration,
208    // Total samples since creation of this summary.  This is separate from the Summary since it is
209    // never reset.
210    count: usize,
211}
212
213impl Default for RollingSummary {
214    fn default() -> Self {
215        RollingSummary::new(DEFAULT_SUMMARY_BUCKET_COUNT, DEFAULT_SUMMARY_BUCKET_DURATION)
216    }
217}
218
219impl RollingSummary {
220    /// Create a new `RollingSummary` with the given number of `buckets` and `bucket-duration`.
221    ///
222    /// The summary will store quantiles over `buckets * bucket_duration` seconds.
223    pub fn new(buckets: std::num::NonZeroU32, bucket_duration: Duration) -> RollingSummary {
224        assert!(!bucket_duration.is_zero());
225        let max_bucket_duration = bucket_duration * buckets.get();
226        let max_buckets = buckets.get() as usize;
227
228        RollingSummary {
229            buckets: Vec::with_capacity(max_buckets),
230            max_buckets,
231            bucket_duration,
232            max_bucket_duration,
233            count: 0,
234        }
235    }
236
237    /// Add a sample `value` to the `RollingSummary` at the time `now`.
238    ///
239    /// Any values that expire at the `value_ts` are removed from the `RollingSummary`.
240    pub fn add(&mut self, value: f64, now: Instant) {
241        // The count is incremented even if this value is too old to be saved in any bucket.
242        self.count += 1;
243
244        // If we can find a bucket that this value belongs in, then we can just add it in and be
245        // done.
246        for bucket in &mut self.buckets {
247            let end = bucket.begin + self.bucket_duration;
248
249            // If this value belongs in a future bucket...
250            if now > bucket.begin + self.bucket_duration {
251                break;
252            }
253
254            if now >= bucket.begin && now < end {
255                bucket.summary.add(value);
256                return;
257            }
258        }
259
260        // Remove any expired buckets.
261        if let Some(cutoff) = now.checked_sub(self.max_bucket_duration) {
262            self.buckets.retain(|b| b.begin > cutoff);
263        }
264
265        if self.buckets.is_empty() {
266            let mut summary = Summary::with_defaults();
267            summary.add(value);
268            self.buckets.push(Bucket { begin: now, summary });
269            return;
270        }
271
272        // Take the first bucket time as a reference.  Other buckets will be created at an offset
273        // of this time.  We know this time is close to the value_ts, if it were much older the
274        // bucket would have been removed.
275        let reftime = self.buckets[0].begin;
276
277        let mut summary = Summary::with_defaults();
278        summary.add(value);
279
280        // If the value is newer than the first bucket then count upwards to the new bucket time.
281        let mut begin;
282        if now > reftime {
283            begin = reftime + self.bucket_duration;
284            let mut end = begin + self.bucket_duration;
285            while now < begin || now >= end {
286                begin += self.bucket_duration;
287                end += self.bucket_duration;
288            }
289
290            self.buckets.truncate(self.max_buckets - 1);
291            self.buckets.insert(0, Bucket { begin, summary });
292        }
293    }
294
295    /// Return a merged Summary of all items that are valid at `now`.
296    ///
297    /// # Warning
298    ///
299    /// The snapshot `Summary::count()` contains the total number of values considered in the
300    /// Snapshot, which is not the full count of the `RollingSummary`.  Use `RollingSummary::count()`
301    /// instead.
302    pub fn snapshot(&self, now: Instant) -> Summary {
303        let cutoff = now.checked_sub(self.max_bucket_duration);
304        let mut acc = Summary::with_defaults();
305        self.buckets
306            .iter()
307            .filter(|b| if let Some(cutoff) = cutoff { b.begin > cutoff } else { true })
308            .map(|b| &b.summary)
309            .fold(&mut acc, |acc, item| {
310                acc.merge(item).expect("merge can only fail if summary config inconsistent");
311                acc
312            });
313        acc
314    }
315
316    /// Whether or not this summary is empty.
317    pub fn is_empty(&self) -> bool {
318        self.count() == 0
319    }
320
321    /// Gets the totoal number of samples this summary has seen so far.
322    pub fn count(&self) -> usize {
323        self.count
324    }
325
326    #[cfg(test)]
327    fn buckets(&self) -> &Vec<Bucket> {
328        &self.buckets
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    use quanta::Clock;
337
338    #[test]
339    fn new_rolling_summary() {
340        let summary = RollingSummary::default();
341
342        assert_eq!(0, summary.buckets().len());
343        assert_eq!(0, summary.count());
344        assert!(summary.is_empty());
345    }
346
347    #[test]
348    fn empty_snapshot() {
349        let (clock, _mock) = Clock::mock();
350        let summary = RollingSummary::default();
351        let snapshot = summary.snapshot(clock.now());
352
353        assert_eq!(0, snapshot.count());
354        #[allow(clippy::float_cmp)]
355        {
356            assert_eq!(f64::INFINITY, snapshot.min());
357            assert_eq!(f64::NEG_INFINITY, snapshot.max());
358        }
359        assert_eq!(None, snapshot.quantile(0.5));
360    }
361
362    #[test]
363    fn snapshot() {
364        let (clock, mock) = Clock::mock();
365        mock.increment(Duration::from_secs(3600));
366
367        let mut summary = RollingSummary::default();
368        summary.add(42.0, clock.now());
369        mock.increment(Duration::from_secs(20));
370        summary.add(42.0, clock.now());
371        mock.increment(Duration::from_secs(20));
372        summary.add(42.0, clock.now());
373
374        let snapshot = summary.snapshot(clock.now());
375
376        #[allow(clippy::float_cmp)]
377        {
378            assert_eq!(42.0, snapshot.min());
379            assert_eq!(42.0, snapshot.max());
380        }
381        // 42 +/- (42 * 0.0001)
382        assert!(Some(41.9958) < snapshot.quantile(0.5));
383        assert!(Some(42.0042) > snapshot.quantile(0.5));
384    }
385
386    #[test]
387    fn add_first_value() {
388        let (clock, mock) = Clock::mock();
389        mock.increment(Duration::from_secs(3600));
390
391        let mut summary = RollingSummary::default();
392        summary.add(42.0, clock.now());
393
394        assert_eq!(1, summary.buckets().len());
395        assert_eq!(1, summary.count());
396        assert!(!summary.is_empty());
397    }
398
399    #[test]
400    fn add_new_head() {
401        let (clock, mock) = Clock::mock();
402        mock.increment(Duration::from_secs(3600));
403
404        let mut summary = RollingSummary::default();
405        summary.add(42.0, clock.now());
406        mock.increment(Duration::from_secs(20));
407        summary.add(42.0, clock.now());
408
409        assert_eq!(2, summary.buckets().len());
410    }
411
412    #[test]
413    fn truncate_old_buckets() {
414        let (clock, mock) = Clock::mock();
415        mock.increment(Duration::from_secs(3600));
416
417        let mut summary = RollingSummary::default();
418        summary.add(42.0, clock.now());
419
420        for _ in 0..3 {
421            mock.increment(Duration::from_secs(20));
422            summary.add(42.0, clock.now());
423        }
424
425        assert_eq!(3, summary.buckets().len());
426    }
427
428    #[test]
429    fn add_value_ts_before_first_bucket() {
430        let (clock, mock) = Clock::mock();
431        mock.increment(Duration::from_secs(4));
432
433        let bucket_count = NonZeroU32::new(2).unwrap();
434        let bucket_width = Duration::from_secs(5);
435
436        let mut summary = RollingSummary::new(bucket_count, bucket_width);
437        assert_eq!(0, summary.buckets().len());
438        assert_eq!(0, summary.count());
439
440        // Add a single value to create our first bucket.
441        summary.add(42.0, clock.now());
442
443        // Make sure the value got added.
444        assert_eq!(1, summary.buckets().len());
445        assert_eq!(1, summary.count());
446        assert!(!summary.is_empty());
447
448        // Our first bucket is now marked as begin=4/width=5, so make sure that if we add a version
449        // with now=3, the count goes up but it's not actually added.
450        mock.decrement(Duration::from_secs(1));
451
452        summary.add(43.0, clock.now());
453
454        assert_eq!(1, summary.buckets().len());
455        assert_eq!(2, summary.count());
456        assert!(!summary.is_empty());
457    }
458}