metrics_exporter_prometheus/
distribution.rs1use std::num::NonZeroU32;
2use std::time::Duration;
3use std::{collections::HashMap, sync::Arc};
4
5use quanta::Instant;
6
7use crate::common::Matcher;
8use crate::native_histogram::{NativeHistogram, NativeHistogramConfig};
9
10use metrics_util::{
11 storage::{Histogram, Summary},
12 Quantile,
13};
14
15const DEFAULT_SUMMARY_BUCKET_COUNT: NonZeroU32 = match NonZeroU32::new(3) {
16 Some(v) => v,
17 None => unreachable!(),
18};
19const DEFAULT_SUMMARY_BUCKET_DURATION: Duration = Duration::from_secs(20);
20
21#[derive(Clone, Debug)]
23pub enum Distribution {
24 Histogram(Histogram),
30 Summary(RollingSummary, Arc<Vec<Quantile>>, f64),
36 NativeHistogram(NativeHistogram),
41}
42
43impl Distribution {
44 pub fn new_histogram(buckets: &[f64]) -> Distribution {
50 let hist = Histogram::new(buckets).expect("buckets should never be empty");
51 Distribution::Histogram(hist)
52 }
53
54 pub fn new_summary(
56 quantiles: Arc<Vec<Quantile>>,
57 bucket_duration: Duration,
58 bucket_count: NonZeroU32,
59 ) -> Distribution {
60 Distribution::Summary(RollingSummary::new(bucket_count, bucket_duration), quantiles, 0.0)
61 }
62
63 pub fn new_native_histogram(config: NativeHistogramConfig) -> Distribution {
65 let hist = NativeHistogram::new(config);
66 Distribution::NativeHistogram(hist)
67 }
68
69 pub fn record_samples(&mut self, samples: &[(f64, Instant)]) {
71 match self {
72 Distribution::Histogram(hist) => {
73 hist.record_many(samples.iter().map(|(sample, _ts)| sample));
74 }
75 Distribution::Summary(hist, _, sum) => {
76 for (sample, ts) in samples {
77 hist.add(*sample, *ts);
78 *sum += *sample;
79 }
80 }
81 Distribution::NativeHistogram(hist) => {
82 for (sample, _ts) in samples {
83 hist.observe(*sample);
84 }
85 }
86 }
87 }
88}
89
90#[derive(Debug)]
92pub struct DistributionBuilder {
93 quantiles: Arc<Vec<Quantile>>,
94 buckets: Option<Vec<f64>>,
95 bucket_duration: Option<Duration>,
96 bucket_count: Option<NonZeroU32>,
97 bucket_overrides: Option<Vec<(Matcher, Vec<f64>)>>,
98 native_histogram_overrides: Option<Vec<(Matcher, NativeHistogramConfig)>>,
99}
100
101impl DistributionBuilder {
102 pub fn new(
104 quantiles: Vec<Quantile>,
105 bucket_duration: Option<Duration>,
106 buckets: Option<Vec<f64>>,
107 bucket_count: Option<NonZeroU32>,
108 bucket_overrides: Option<HashMap<Matcher, Vec<f64>>>,
109 native_histogram_overrides: Option<HashMap<Matcher, NativeHistogramConfig>>,
110 ) -> DistributionBuilder {
111 DistributionBuilder {
112 quantiles: Arc::new(quantiles),
113 bucket_duration,
114 buckets,
115 bucket_count,
116 bucket_overrides: bucket_overrides.map(|entries| {
117 let mut matchers = entries.into_iter().collect::<Vec<_>>();
118 matchers.sort_by(|a, b| a.0.cmp(&b.0));
119 matchers
120 }),
121 native_histogram_overrides: native_histogram_overrides.map(|entries| {
122 let mut matchers = entries.into_iter().collect::<Vec<_>>();
123 matchers.sort_by(|a, b| a.0.cmp(&b.0));
124 matchers
125 }),
126 }
127 }
128
129 pub fn get_distribution(&self, name: &str) -> Distribution {
131 if let Some(ref overrides) = self.native_histogram_overrides {
133 for (matcher, config) in overrides {
134 if matcher.matches(name) {
135 return Distribution::new_native_histogram(config.clone());
136 }
137 }
138 }
139
140 if let Some(ref overrides) = self.bucket_overrides {
142 for (matcher, buckets) in overrides {
143 if matcher.matches(name) {
144 return Distribution::new_histogram(buckets);
145 }
146 }
147 }
148
149 if let Some(ref buckets) = self.buckets {
151 return Distribution::new_histogram(buckets);
152 }
153
154 let b_duration = self.bucket_duration.map_or(DEFAULT_SUMMARY_BUCKET_DURATION, |d| d);
156 let b_count = self.bucket_count.map_or(DEFAULT_SUMMARY_BUCKET_COUNT, |c| c);
157
158 Distribution::new_summary(self.quantiles.clone(), b_duration, b_count)
159 }
160
161 pub fn get_distribution_type(&self, name: &str) -> &'static str {
163 if let Some(ref overrides) = self.native_histogram_overrides {
165 for (matcher, _) in overrides {
166 if matcher.matches(name) {
167 return "native_histogram";
168 }
169 }
170 }
171
172 if self.buckets.is_some() {
174 return "histogram";
175 }
176
177 if let Some(ref overrides) = self.bucket_overrides {
178 for (matcher, _) in overrides {
179 if matcher.matches(name) {
180 return "histogram";
181 }
182 }
183 }
184
185 "summary"
186 }
187}
188
189#[derive(Clone, Debug)]
190struct Bucket {
191 begin: Instant,
192 summary: Summary,
193}
194
195#[derive(Clone, Debug)]
197pub struct RollingSummary {
198 buckets: Vec<Bucket>,
202 max_buckets: usize,
204 bucket_duration: Duration,
206 max_bucket_duration: Duration,
208 count: usize,
211}
212
213impl Default for RollingSummary {
214 fn default() -> Self {
215 RollingSummary::new(DEFAULT_SUMMARY_BUCKET_COUNT, DEFAULT_SUMMARY_BUCKET_DURATION)
216 }
217}
218
219impl RollingSummary {
220 pub fn new(buckets: std::num::NonZeroU32, bucket_duration: Duration) -> RollingSummary {
224 assert!(!bucket_duration.is_zero());
225 let max_bucket_duration = bucket_duration * buckets.get();
226 let max_buckets = buckets.get() as usize;
227
228 RollingSummary {
229 buckets: Vec::with_capacity(max_buckets),
230 max_buckets,
231 bucket_duration,
232 max_bucket_duration,
233 count: 0,
234 }
235 }
236
237 pub fn add(&mut self, value: f64, now: Instant) {
241 self.count += 1;
243
244 for bucket in &mut self.buckets {
247 let end = bucket.begin + self.bucket_duration;
248
249 if now > bucket.begin + self.bucket_duration {
251 break;
252 }
253
254 if now >= bucket.begin && now < end {
255 bucket.summary.add(value);
256 return;
257 }
258 }
259
260 if let Some(cutoff) = now.checked_sub(self.max_bucket_duration) {
262 self.buckets.retain(|b| b.begin > cutoff);
263 }
264
265 if self.buckets.is_empty() {
266 let mut summary = Summary::with_defaults();
267 summary.add(value);
268 self.buckets.push(Bucket { begin: now, summary });
269 return;
270 }
271
272 let reftime = self.buckets[0].begin;
276
277 let mut summary = Summary::with_defaults();
278 summary.add(value);
279
280 let mut begin;
282 if now > reftime {
283 begin = reftime + self.bucket_duration;
284 let mut end = begin + self.bucket_duration;
285 while now < begin || now >= end {
286 begin += self.bucket_duration;
287 end += self.bucket_duration;
288 }
289
290 self.buckets.truncate(self.max_buckets - 1);
291 self.buckets.insert(0, Bucket { begin, summary });
292 }
293 }
294
295 pub fn snapshot(&self, now: Instant) -> Summary {
303 let cutoff = now.checked_sub(self.max_bucket_duration);
304 let mut acc = Summary::with_defaults();
305 self.buckets
306 .iter()
307 .filter(|b| if let Some(cutoff) = cutoff { b.begin > cutoff } else { true })
308 .map(|b| &b.summary)
309 .fold(&mut acc, |acc, item| {
310 acc.merge(item).expect("merge can only fail if summary config inconsistent");
311 acc
312 });
313 acc
314 }
315
316 pub fn is_empty(&self) -> bool {
318 self.count() == 0
319 }
320
321 pub fn count(&self) -> usize {
323 self.count
324 }
325
326 #[cfg(test)]
327 fn buckets(&self) -> &Vec<Bucket> {
328 &self.buckets
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 use quanta::Clock;
337
338 #[test]
339 fn new_rolling_summary() {
340 let summary = RollingSummary::default();
341
342 assert_eq!(0, summary.buckets().len());
343 assert_eq!(0, summary.count());
344 assert!(summary.is_empty());
345 }
346
347 #[test]
348 fn empty_snapshot() {
349 let (clock, _mock) = Clock::mock();
350 let summary = RollingSummary::default();
351 let snapshot = summary.snapshot(clock.now());
352
353 assert_eq!(0, snapshot.count());
354 #[allow(clippy::float_cmp)]
355 {
356 assert_eq!(f64::INFINITY, snapshot.min());
357 assert_eq!(f64::NEG_INFINITY, snapshot.max());
358 }
359 assert_eq!(None, snapshot.quantile(0.5));
360 }
361
362 #[test]
363 fn snapshot() {
364 let (clock, mock) = Clock::mock();
365 mock.increment(Duration::from_secs(3600));
366
367 let mut summary = RollingSummary::default();
368 summary.add(42.0, clock.now());
369 mock.increment(Duration::from_secs(20));
370 summary.add(42.0, clock.now());
371 mock.increment(Duration::from_secs(20));
372 summary.add(42.0, clock.now());
373
374 let snapshot = summary.snapshot(clock.now());
375
376 #[allow(clippy::float_cmp)]
377 {
378 assert_eq!(42.0, snapshot.min());
379 assert_eq!(42.0, snapshot.max());
380 }
381 assert!(Some(41.9958) < snapshot.quantile(0.5));
383 assert!(Some(42.0042) > snapshot.quantile(0.5));
384 }
385
386 #[test]
387 fn add_first_value() {
388 let (clock, mock) = Clock::mock();
389 mock.increment(Duration::from_secs(3600));
390
391 let mut summary = RollingSummary::default();
392 summary.add(42.0, clock.now());
393
394 assert_eq!(1, summary.buckets().len());
395 assert_eq!(1, summary.count());
396 assert!(!summary.is_empty());
397 }
398
399 #[test]
400 fn add_new_head() {
401 let (clock, mock) = Clock::mock();
402 mock.increment(Duration::from_secs(3600));
403
404 let mut summary = RollingSummary::default();
405 summary.add(42.0, clock.now());
406 mock.increment(Duration::from_secs(20));
407 summary.add(42.0, clock.now());
408
409 assert_eq!(2, summary.buckets().len());
410 }
411
412 #[test]
413 fn truncate_old_buckets() {
414 let (clock, mock) = Clock::mock();
415 mock.increment(Duration::from_secs(3600));
416
417 let mut summary = RollingSummary::default();
418 summary.add(42.0, clock.now());
419
420 for _ in 0..3 {
421 mock.increment(Duration::from_secs(20));
422 summary.add(42.0, clock.now());
423 }
424
425 assert_eq!(3, summary.buckets().len());
426 }
427
428 #[test]
429 fn add_value_ts_before_first_bucket() {
430 let (clock, mock) = Clock::mock();
431 mock.increment(Duration::from_secs(4));
432
433 let bucket_count = NonZeroU32::new(2).unwrap();
434 let bucket_width = Duration::from_secs(5);
435
436 let mut summary = RollingSummary::new(bucket_count, bucket_width);
437 assert_eq!(0, summary.buckets().len());
438 assert_eq!(0, summary.count());
439
440 summary.add(42.0, clock.now());
442
443 assert_eq!(1, summary.buckets().len());
445 assert_eq!(1, summary.count());
446 assert!(!summary.is_empty());
447
448 mock.decrement(Duration::from_secs(1));
451
452 summary.add(43.0, clock.now());
453
454 assert_eq!(1, summary.buckets().len());
455 assert_eq!(2, summary.count());
456 assert!(!summary.is_empty());
457 }
458}