libdd_telemetry/
metrics.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    sync::{Arc, Mutex, MutexGuard},
7    time,
8};
9
10use libdd_common::tag::Tag;
11use libdd_ddsketch::DDSketch;
12use serde::{Deserialize, Serialize};
13
14use crate::data::{self, metrics};
15
16fn unix_timestamp_now() -> u64 {
17    time::SystemTime::UNIX_EPOCH
18        .elapsed()
19        .map_or(0, |d| d.as_secs())
20}
21
22#[derive(Debug)]
23struct MetricBucket {
24    aggr: MetricAggr,
25}
26
27#[derive(Debug)]
28enum MetricAggr {
29    Count { count: f64 },
30    Gauge { value: f64 },
31}
32
33impl MetricBucket {
34    fn add_point(&mut self, point: f64) {
35        match &mut self.aggr {
36            MetricAggr::Count { count } => *count += point,
37            MetricAggr::Gauge { value } => *value = point,
38        }
39    }
40
41    fn value(&self) -> f64 {
42        match self.aggr {
43            MetricAggr::Count { count } => count,
44            MetricAggr::Gauge { value } => value,
45        }
46    }
47}
48
49#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
50#[repr(C)]
51pub struct ContextKey(u32, metrics::MetricType);
52
53#[derive(Debug, PartialEq, Eq, Hash)]
54struct BucketKey {
55    context_key: ContextKey,
56    extra_tags: Vec<Tag>,
57}
58
59#[derive(Debug, Default)]
60pub struct MetricBuckets {
61    buckets: HashMap<BucketKey, MetricBucket>,
62    series: HashMap<BucketKey, Vec<(u64, f64)>>,
63    distributions: HashMap<BucketKey, DDSketch>,
64}
65
66#[derive(Debug, Default, Serialize, Deserialize)]
67pub struct MetricBucketStats {
68    pub buckets: u32,
69    pub series: u32,
70    pub series_points: u32,
71    pub distributions: u32,
72    pub distributions_points: u32,
73}
74
75impl MetricBuckets {
76    pub const METRICS_FLUSH_INTERVAL: time::Duration = time::Duration::from_secs(10);
77
78    pub fn flush_aggregates(&mut self) {
79        let timestamp = unix_timestamp_now();
80        for (key, bucket) in self.buckets.drain() {
81            self.series
82                .entry(key)
83                .or_default()
84                .push((timestamp, bucket.value()))
85        }
86    }
87
88    pub fn flush_series(
89        &mut self,
90    ) -> impl Iterator<Item = (ContextKey, Vec<Tag>, Vec<(u64, f64)>)> + '_ {
91        self.series.drain().map(
92            |(
93                BucketKey {
94                    context_key,
95                    extra_tags,
96                },
97                points,
98            )| (context_key, extra_tags, points),
99        )
100    }
101
102    pub fn flush_distributions(
103        &mut self,
104    ) -> impl Iterator<Item = (ContextKey, Vec<Tag>, DDSketch)> + '_ {
105        self.distributions.drain().map(
106            |(
107                BucketKey {
108                    context_key,
109                    extra_tags,
110                },
111                points,
112            )| (context_key, extra_tags, points),
113        )
114    }
115
116    pub fn add_point(&mut self, context_key: ContextKey, point: f64, extra_tags: Vec<Tag>) {
117        let bucket_key = BucketKey {
118            context_key,
119            extra_tags,
120        };
121        match context_key.1 {
122            metrics::MetricType::Count => self
123                .buckets
124                .entry(bucket_key)
125                .or_insert_with(|| MetricBucket {
126                    aggr: MetricAggr::Count { count: 0.0 },
127                })
128                .add_point(point),
129            metrics::MetricType::Gauge => self
130                .buckets
131                .entry(bucket_key)
132                .or_insert_with(|| MetricBucket {
133                    aggr: MetricAggr::Gauge { value: 0.0 },
134                })
135                .add_point(point),
136            metrics::MetricType::Distribution => {
137                let _ = self.distributions.entry(bucket_key).or_default().add(point);
138            }
139        }
140    }
141
142    pub fn stats(&self) -> MetricBucketStats {
143        MetricBucketStats {
144            buckets: self.buckets.len() as u32,
145            series: self.series.len() as u32,
146            series_points: self.series.values().map(|v| v.len() as u32).sum(),
147            distributions: self.distributions.len() as u32,
148            distributions_points: self
149                .distributions
150                .values()
151                .flat_map(|sketch| {
152                    sketch
153                        .ordered_bins()
154                        .into_iter()
155                        .map(|(_, weight)| weight as u32)
156                })
157                .sum(),
158        }
159    }
160}
161
162#[derive(Debug, Serialize, Deserialize)]
163pub struct MetricContext {
164    pub namespace: data::metrics::MetricNamespace,
165    pub name: String,
166    pub tags: Vec<Tag>,
167    pub metric_type: data::metrics::MetricType,
168    pub common: bool,
169}
170
171pub struct MetricContextGuard<'a> {
172    guard: MutexGuard<'a, InnerMetricContexts>,
173}
174
175impl MetricContextGuard<'_> {
176    pub fn read(&self, key: ContextKey) -> Option<&MetricContext> {
177        self.guard.store.get(key.0 as usize)
178    }
179
180    pub fn is_empty(&self) -> bool {
181        self.guard.store.is_empty()
182    }
183
184    pub fn len(&self) -> usize {
185        self.guard.store.len()
186    }
187}
188
189#[derive(Debug, Default)]
190struct InnerMetricContexts {
191    store: Vec<MetricContext>,
192}
193
194#[derive(Debug, Clone, Default)]
195pub struct MetricContexts {
196    inner: Arc<Mutex<InnerMetricContexts>>,
197}
198
199impl MetricContexts {
200    pub fn register_metric_context(
201        &self,
202        name: String,
203        tags: Vec<Tag>,
204        metric_type: data::metrics::MetricType,
205        common: bool,
206        namespace: data::metrics::MetricNamespace,
207    ) -> ContextKey {
208        #[allow(clippy::unwrap_used)]
209        let mut contexts = self.inner.lock().unwrap();
210        let key = ContextKey(contexts.store.len() as u32, metric_type);
211        contexts.store.push(MetricContext {
212            name,
213            tags,
214            metric_type,
215            common,
216            namespace,
217        });
218        key
219    }
220
221    pub fn lock(&self) -> MetricContextGuard<'_> {
222        #[allow(clippy::unwrap_used)]
223        MetricContextGuard {
224            guard: self.inner.as_ref().lock().unwrap(),
225        }
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use libdd_common::tag;
232    use std::fmt::Debug;
233
234    use super::*;
235    use crate::data::metrics::{MetricNamespace, MetricType};
236
237    /// Check if a and b are approximately equal with the given precision or 1.0e-6 by default
238    macro_rules! assert_approx_eq {
239        ($a:expr, $b:expr) => {{
240            let (a, b) = (&$a, &$b);
241            assert!(
242                (*a - *b).abs() < 1.0e-6,
243                "{} is not approximately equal to {}",
244                *a,
245                *b
246            );
247        }};
248        ($a:expr, $b:expr, $precision:expr) => {{
249            let (a, b) = (&$a, &$b);
250            assert!(
251                (*a - *b).abs() < $precision,
252                "{} is not approximately equal to {}",
253                *a,
254                *b
255            );
256        }};
257    }
258
259    // Test util used to run assertions against an unsorted list
260    fn check_iter<'a, U: 'a + Debug, T: Iterator<Item = &'a U>>(
261        elements: T,
262        assertions: &[&dyn Fn(&U) -> bool],
263    ) {
264        let mut used = vec![false; assertions.len()];
265        for e in elements {
266            let mut found = false;
267            for (i, &a) in assertions.iter().enumerate() {
268                if a(e) {
269                    if used[i] {
270                        panic!("Assertion {i} has been used multiple times");
271                    }
272                    found = true;
273                    used[i] = true;
274                    break;
275                }
276            }
277            if !found {
278                panic!("No assertion found for elem {e:?}")
279            }
280        }
281    }
282
283    #[test]
284    fn test_bucket_flushes() {
285        let mut buckets = MetricBuckets::default();
286        let contexts = MetricContexts::default();
287
288        let context_key_1 = contexts.register_metric_context(
289            "metric1".into(),
290            Vec::new(),
291            MetricType::Gauge,
292            false,
293            MetricNamespace::Tracers,
294        );
295        let context_key_2 = contexts.register_metric_context(
296            "metric2".into(),
297            Vec::new(),
298            MetricType::Gauge,
299            false,
300            MetricNamespace::Tracers,
301        );
302        let extra_tags = vec![tag!("service", "foobar")];
303
304        buckets.add_point(context_key_1, 0.1, Vec::new());
305        buckets.add_point(context_key_1, 0.2, Vec::new());
306        assert_eq!(buckets.buckets.len(), 1);
307
308        buckets.add_point(context_key_2, 0.3, Vec::new());
309        assert_eq!(buckets.buckets.len(), 2);
310
311        buckets.add_point(context_key_2, 0.4, extra_tags.clone());
312        assert_eq!(buckets.buckets.len(), 3);
313
314        buckets.flush_aggregates();
315        assert_eq!(buckets.buckets.len(), 0);
316        assert_eq!(buckets.series.len(), 3);
317
318        buckets.add_point(context_key_1, 0.5, Vec::new());
319        buckets.add_point(context_key_2, 0.6, extra_tags);
320        assert_eq!(buckets.buckets.len(), 2);
321
322        buckets.flush_aggregates();
323        assert_eq!(buckets.buckets.len(), 0);
324        assert_eq!(buckets.series.len(), 3);
325
326        let series: Vec<_> = buckets.flush_series().collect();
327        assert_eq!(buckets.buckets.len(), 0);
328        assert_eq!(buckets.series.len(), 0);
329        assert_eq!(series.len(), 3);
330
331        check_iter(
332            series.iter(),
333            &[
334                &|(c, t, points)| {
335                    if !(c == &context_key_1 && t.is_empty()) {
336                        return false;
337                    }
338                    assert_eq!(points.len(), 2);
339                    assert_approx_eq!(points[0].1, 0.2);
340                    assert_approx_eq!(points[1].1, 0.5);
341                    true
342                },
343                &|(c, t, points)| {
344                    if !(c == &context_key_2 && t.is_empty()) {
345                        return false;
346                    }
347                    assert_eq!(points.len(), 1);
348                    assert_approx_eq!(points[0].1, 0.3);
349                    true
350                },
351                &|(c, t, points)| {
352                    if !(c == &context_key_2 && !t.is_empty()) {
353                        return false;
354                    }
355                    assert_eq!(points.len(), 2);
356                    assert_approx_eq!(points[0].1, 0.4);
357                    assert_approx_eq!(points[1].1, 0.6);
358                    true
359                },
360            ],
361        );
362    }
363
364    #[test]
365    fn test_distributions() {
366        let mut buckets = MetricBuckets::default();
367        let contexts = MetricContexts::default();
368
369        let context_key_distribution = contexts.register_metric_context(
370            "metric_distribution".into(),
371            Vec::new(),
372            MetricType::Distribution,
373            false,
374            MetricNamespace::Tracers,
375        );
376        let context_key_distribution_2 = contexts.register_metric_context(
377            "metric_distribution_2".into(),
378            Vec::new(),
379            MetricType::Distribution,
380            false,
381            MetricNamespace::Tracers,
382        );
383        let extra_tags = vec![tag!("service", "foo")];
384
385        // Create 2 distributions with 2 and 3 points
386        buckets.add_point(context_key_distribution, 1.0, Vec::new());
387        buckets.add_point(context_key_distribution, 1.0, Vec::new());
388        buckets.add_point(context_key_distribution, 100.0, Vec::new());
389        buckets.add_point(context_key_distribution, 1000.0, Vec::new());
390
391        buckets.add_point(context_key_distribution_2, 2.0, Vec::new());
392        buckets.add_point(context_key_distribution_2, 200.0, Vec::new());
393
394        buckets.add_point(context_key_distribution_2, 3.0, extra_tags.clone());
395        buckets.add_point(context_key_distribution_2, 300.0, extra_tags.clone());
396
397        let distributions: Vec<_> = buckets.flush_distributions().collect();
398
399        check_iter(
400            distributions.iter(),
401            &[
402                &|(c, t, points)| {
403                    if !(c == &context_key_distribution && t.is_empty()) {
404                        return false;
405                    }
406                    let bins: Vec<_> = points
407                        .ordered_bins()
408                        .into_iter()
409                        .filter(|(_, w)| *w != 0.0)
410                        .collect();
411                    assert_eq!(bins.len(), 3);
412                    // The precision is quite low since it is up to the ddsketch implementation to
413                    // test the precision
414                    assert_approx_eq!(bins[0].0, 1.0, 1.0e-1);
415                    assert_approx_eq!(bins[0].1, 2.0);
416                    assert_approx_eq!(bins[1].0, 100.0, 1.0);
417                    assert_approx_eq!(bins[1].1, 1.0);
418                    assert_approx_eq!(bins[2].0, 1000.0, 10.0);
419                    assert_approx_eq!(bins[2].1, 1.0);
420                    true
421                },
422                &|(c, t, points)| {
423                    if !(c == &context_key_distribution_2 && t.is_empty()) {
424                        return false;
425                    }
426                    let bins: Vec<_> = points
427                        .ordered_bins()
428                        .into_iter()
429                        .filter(|(_, w)| *w != 0.0)
430                        .collect();
431                    assert_eq!(bins.len(), 2);
432                    assert_approx_eq!(bins[0].0, 2.0, 1.0e-1);
433                    assert_approx_eq!(bins[0].1, 1.0);
434                    assert_approx_eq!(bins[1].0, 200.0, 1.0);
435                    assert_approx_eq!(bins[1].1, 1.0);
436                    true
437                },
438                &|(c, t, points)| {
439                    if !(c == &context_key_distribution_2 && !t.is_empty()) {
440                        return false;
441                    }
442                    let bins: Vec<_> = points
443                        .ordered_bins()
444                        .into_iter()
445                        .filter(|(_, w)| *w != 0.0)
446                        .collect();
447                    assert_eq!(bins.len(), 2);
448                    assert_approx_eq!(bins[0].0, 3.0, 1.0e-1);
449                    assert_approx_eq!(bins[0].1, 1.0);
450                    assert_approx_eq!(bins[1].0, 300.0, 1.0);
451                    assert_approx_eq!(bins[1].1, 1.0);
452                    true
453                },
454            ],
455        )
456    }
457
458    #[test]
459    fn test_stats() {
460        let mut buckets = MetricBuckets::default();
461        let contexts = MetricContexts::default();
462
463        let context_key_1 = contexts.register_metric_context(
464            "metric1".into(),
465            Vec::new(),
466            MetricType::Count,
467            false,
468            MetricNamespace::Tracers,
469        );
470
471        let context_key_2 = contexts.register_metric_context(
472            "metric2".into(),
473            Vec::new(),
474            MetricType::Gauge,
475            false,
476            MetricNamespace::Tracers,
477        );
478
479        let context_key_distribution = contexts.register_metric_context(
480            "metric_distribution".into(),
481            Vec::new(),
482            MetricType::Distribution,
483            false,
484            MetricNamespace::Tracers,
485        );
486
487        let context_key_distribution_2 = contexts.register_metric_context(
488            "metric_distribution_2".into(),
489            Vec::new(),
490            MetricType::Distribution,
491            false,
492            MetricNamespace::Tracers,
493        );
494
495        // Create 2 series with 2 and 3 points
496        buckets.add_point(context_key_1, 1.0, Vec::new());
497        buckets.add_point(context_key_2, 2.0, Vec::new());
498        buckets.flush_aggregates();
499
500        buckets.add_point(context_key_1, 1.0, Vec::new());
501        buckets.add_point(context_key_2, 2.0, Vec::new());
502        buckets.flush_aggregates();
503
504        buckets.add_point(context_key_1, 1.1, Vec::new());
505        buckets.add_point(context_key_1, 2.1, Vec::new());
506        buckets.flush_aggregates();
507
508        // Create 2 buckets
509        buckets.add_point(context_key_1, 1.0, Vec::new());
510        buckets.add_point(context_key_2, 2.0, Vec::new());
511
512        // Create 2 distributions with 2 and 3 points
513        buckets.add_point(context_key_distribution, 1.0, Vec::new());
514        buckets.add_point(context_key_distribution, 1.1, Vec::new());
515        buckets.add_point(context_key_distribution, 1.2, Vec::new());
516
517        buckets.add_point(context_key_distribution_2, 2.0, Vec::new());
518        buckets.add_point(context_key_distribution_2, 2.1, Vec::new());
519
520        let stats = buckets.stats();
521
522        assert_eq!(stats.buckets, 2);
523        assert_eq!(stats.series, 2);
524        assert_eq!(stats.series_points, 5);
525        assert_eq!(stats.distributions, 2);
526        assert_eq!(stats.distributions_points, 5);
527    }
528}