1use std::{
5 collections::HashMap,
6 sync::{Arc, Mutex, MutexGuard},
7 time,
8};
9
10use libdd_common::tag::Tag;
11use libdd_ddsketch::DDSketch;
12use serde::{Deserialize, Serialize};
13
14use crate::data::{self, metrics};
15
16fn unix_timestamp_now() -> u64 {
17 time::SystemTime::UNIX_EPOCH
18 .elapsed()
19 .map_or(0, |d| d.as_secs())
20}
21
22#[derive(Debug)]
23struct MetricBucket {
24 aggr: MetricAggr,
25}
26
27#[derive(Debug)]
28enum MetricAggr {
29 Count { count: f64 },
30 Gauge { value: f64 },
31}
32
33impl MetricBucket {
34 fn add_point(&mut self, point: f64) {
35 match &mut self.aggr {
36 MetricAggr::Count { count } => *count += point,
37 MetricAggr::Gauge { value } => *value = point,
38 }
39 }
40
41 fn value(&self) -> f64 {
42 match self.aggr {
43 MetricAggr::Count { count } => count,
44 MetricAggr::Gauge { value } => value,
45 }
46 }
47}
48
49#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
50#[repr(C)]
51pub struct ContextKey(u32, metrics::MetricType);
52
53#[derive(Debug, PartialEq, Eq, Hash)]
54struct BucketKey {
55 context_key: ContextKey,
56 extra_tags: Vec<Tag>,
57}
58
59#[derive(Debug, Default)]
60pub struct MetricBuckets {
61 buckets: HashMap<BucketKey, MetricBucket>,
62 series: HashMap<BucketKey, Vec<(u64, f64)>>,
63 distributions: HashMap<BucketKey, DDSketch>,
64}
65
66#[derive(Debug, Default, Serialize, Deserialize)]
67pub struct MetricBucketStats {
68 pub buckets: u32,
69 pub series: u32,
70 pub series_points: u32,
71 pub distributions: u32,
72 pub distributions_points: u32,
73}
74
75impl MetricBuckets {
76 pub const METRICS_FLUSH_INTERVAL: time::Duration = time::Duration::from_secs(10);
77
78 pub fn flush_aggregates(&mut self) {
79 let timestamp = unix_timestamp_now();
80 for (key, bucket) in self.buckets.drain() {
81 self.series
82 .entry(key)
83 .or_default()
84 .push((timestamp, bucket.value()))
85 }
86 }
87
88 pub fn flush_series(
89 &mut self,
90 ) -> impl Iterator<Item = (ContextKey, Vec<Tag>, Vec<(u64, f64)>)> + '_ {
91 self.series.drain().map(
92 |(
93 BucketKey {
94 context_key,
95 extra_tags,
96 },
97 points,
98 )| (context_key, extra_tags, points),
99 )
100 }
101
102 pub fn flush_distributions(
103 &mut self,
104 ) -> impl Iterator<Item = (ContextKey, Vec<Tag>, DDSketch)> + '_ {
105 self.distributions.drain().map(
106 |(
107 BucketKey {
108 context_key,
109 extra_tags,
110 },
111 points,
112 )| (context_key, extra_tags, points),
113 )
114 }
115
116 pub fn add_point(&mut self, context_key: ContextKey, point: f64, extra_tags: Vec<Tag>) {
117 let bucket_key = BucketKey {
118 context_key,
119 extra_tags,
120 };
121 match context_key.1 {
122 metrics::MetricType::Count => self
123 .buckets
124 .entry(bucket_key)
125 .or_insert_with(|| MetricBucket {
126 aggr: MetricAggr::Count { count: 0.0 },
127 })
128 .add_point(point),
129 metrics::MetricType::Gauge => self
130 .buckets
131 .entry(bucket_key)
132 .or_insert_with(|| MetricBucket {
133 aggr: MetricAggr::Gauge { value: 0.0 },
134 })
135 .add_point(point),
136 metrics::MetricType::Distribution => {
137 let _ = self.distributions.entry(bucket_key).or_default().add(point);
138 }
139 }
140 }
141
142 pub fn stats(&self) -> MetricBucketStats {
143 MetricBucketStats {
144 buckets: self.buckets.len() as u32,
145 series: self.series.len() as u32,
146 series_points: self.series.values().map(|v| v.len() as u32).sum(),
147 distributions: self.distributions.len() as u32,
148 distributions_points: self
149 .distributions
150 .values()
151 .flat_map(|sketch| {
152 sketch
153 .ordered_bins()
154 .into_iter()
155 .map(|(_, weight)| weight as u32)
156 })
157 .sum(),
158 }
159 }
160}
161
162#[derive(Debug, Serialize, Deserialize)]
163pub struct MetricContext {
164 pub namespace: data::metrics::MetricNamespace,
165 pub name: String,
166 pub tags: Vec<Tag>,
167 pub metric_type: data::metrics::MetricType,
168 pub common: bool,
169}
170
171pub struct MetricContextGuard<'a> {
172 guard: MutexGuard<'a, InnerMetricContexts>,
173}
174
175impl MetricContextGuard<'_> {
176 pub fn read(&self, key: ContextKey) -> Option<&MetricContext> {
177 self.guard.store.get(key.0 as usize)
178 }
179
180 pub fn is_empty(&self) -> bool {
181 self.guard.store.is_empty()
182 }
183
184 pub fn len(&self) -> usize {
185 self.guard.store.len()
186 }
187}
188
189#[derive(Debug, Default)]
190struct InnerMetricContexts {
191 store: Vec<MetricContext>,
192}
193
194#[derive(Debug, Clone, Default)]
195pub struct MetricContexts {
196 inner: Arc<Mutex<InnerMetricContexts>>,
197}
198
199impl MetricContexts {
200 pub fn register_metric_context(
201 &self,
202 name: String,
203 tags: Vec<Tag>,
204 metric_type: data::metrics::MetricType,
205 common: bool,
206 namespace: data::metrics::MetricNamespace,
207 ) -> ContextKey {
208 #[allow(clippy::unwrap_used)]
209 let mut contexts = self.inner.lock().unwrap();
210 let key = ContextKey(contexts.store.len() as u32, metric_type);
211 contexts.store.push(MetricContext {
212 name,
213 tags,
214 metric_type,
215 common,
216 namespace,
217 });
218 key
219 }
220
221 pub fn lock(&self) -> MetricContextGuard<'_> {
222 #[allow(clippy::unwrap_used)]
223 MetricContextGuard {
224 guard: self.inner.as_ref().lock().unwrap(),
225 }
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use libdd_common::tag;
232 use std::fmt::Debug;
233
234 use super::*;
235 use crate::data::metrics::{MetricNamespace, MetricType};
236
237 macro_rules! assert_approx_eq {
239 ($a:expr, $b:expr) => {{
240 let (a, b) = (&$a, &$b);
241 assert!(
242 (*a - *b).abs() < 1.0e-6,
243 "{} is not approximately equal to {}",
244 *a,
245 *b
246 );
247 }};
248 ($a:expr, $b:expr, $precision:expr) => {{
249 let (a, b) = (&$a, &$b);
250 assert!(
251 (*a - *b).abs() < $precision,
252 "{} is not approximately equal to {}",
253 *a,
254 *b
255 );
256 }};
257 }
258
259 fn check_iter<'a, U: 'a + Debug, T: Iterator<Item = &'a U>>(
261 elements: T,
262 assertions: &[&dyn Fn(&U) -> bool],
263 ) {
264 let mut used = vec![false; assertions.len()];
265 for e in elements {
266 let mut found = false;
267 for (i, &a) in assertions.iter().enumerate() {
268 if a(e) {
269 if used[i] {
270 panic!("Assertion {i} has been used multiple times");
271 }
272 found = true;
273 used[i] = true;
274 break;
275 }
276 }
277 if !found {
278 panic!("No assertion found for elem {e:?}")
279 }
280 }
281 }
282
283 #[test]
284 fn test_bucket_flushes() {
285 let mut buckets = MetricBuckets::default();
286 let contexts = MetricContexts::default();
287
288 let context_key_1 = contexts.register_metric_context(
289 "metric1".into(),
290 Vec::new(),
291 MetricType::Gauge,
292 false,
293 MetricNamespace::Tracers,
294 );
295 let context_key_2 = contexts.register_metric_context(
296 "metric2".into(),
297 Vec::new(),
298 MetricType::Gauge,
299 false,
300 MetricNamespace::Tracers,
301 );
302 let extra_tags = vec![tag!("service", "foobar")];
303
304 buckets.add_point(context_key_1, 0.1, Vec::new());
305 buckets.add_point(context_key_1, 0.2, Vec::new());
306 assert_eq!(buckets.buckets.len(), 1);
307
308 buckets.add_point(context_key_2, 0.3, Vec::new());
309 assert_eq!(buckets.buckets.len(), 2);
310
311 buckets.add_point(context_key_2, 0.4, extra_tags.clone());
312 assert_eq!(buckets.buckets.len(), 3);
313
314 buckets.flush_aggregates();
315 assert_eq!(buckets.buckets.len(), 0);
316 assert_eq!(buckets.series.len(), 3);
317
318 buckets.add_point(context_key_1, 0.5, Vec::new());
319 buckets.add_point(context_key_2, 0.6, extra_tags);
320 assert_eq!(buckets.buckets.len(), 2);
321
322 buckets.flush_aggregates();
323 assert_eq!(buckets.buckets.len(), 0);
324 assert_eq!(buckets.series.len(), 3);
325
326 let series: Vec<_> = buckets.flush_series().collect();
327 assert_eq!(buckets.buckets.len(), 0);
328 assert_eq!(buckets.series.len(), 0);
329 assert_eq!(series.len(), 3);
330
331 check_iter(
332 series.iter(),
333 &[
334 &|(c, t, points)| {
335 if !(c == &context_key_1 && t.is_empty()) {
336 return false;
337 }
338 assert_eq!(points.len(), 2);
339 assert_approx_eq!(points[0].1, 0.2);
340 assert_approx_eq!(points[1].1, 0.5);
341 true
342 },
343 &|(c, t, points)| {
344 if !(c == &context_key_2 && t.is_empty()) {
345 return false;
346 }
347 assert_eq!(points.len(), 1);
348 assert_approx_eq!(points[0].1, 0.3);
349 true
350 },
351 &|(c, t, points)| {
352 if !(c == &context_key_2 && !t.is_empty()) {
353 return false;
354 }
355 assert_eq!(points.len(), 2);
356 assert_approx_eq!(points[0].1, 0.4);
357 assert_approx_eq!(points[1].1, 0.6);
358 true
359 },
360 ],
361 );
362 }
363
364 #[test]
365 fn test_distributions() {
366 let mut buckets = MetricBuckets::default();
367 let contexts = MetricContexts::default();
368
369 let context_key_distribution = contexts.register_metric_context(
370 "metric_distribution".into(),
371 Vec::new(),
372 MetricType::Distribution,
373 false,
374 MetricNamespace::Tracers,
375 );
376 let context_key_distribution_2 = contexts.register_metric_context(
377 "metric_distribution_2".into(),
378 Vec::new(),
379 MetricType::Distribution,
380 false,
381 MetricNamespace::Tracers,
382 );
383 let extra_tags = vec![tag!("service", "foo")];
384
385 buckets.add_point(context_key_distribution, 1.0, Vec::new());
387 buckets.add_point(context_key_distribution, 1.0, Vec::new());
388 buckets.add_point(context_key_distribution, 100.0, Vec::new());
389 buckets.add_point(context_key_distribution, 1000.0, Vec::new());
390
391 buckets.add_point(context_key_distribution_2, 2.0, Vec::new());
392 buckets.add_point(context_key_distribution_2, 200.0, Vec::new());
393
394 buckets.add_point(context_key_distribution_2, 3.0, extra_tags.clone());
395 buckets.add_point(context_key_distribution_2, 300.0, extra_tags.clone());
396
397 let distributions: Vec<_> = buckets.flush_distributions().collect();
398
399 check_iter(
400 distributions.iter(),
401 &[
402 &|(c, t, points)| {
403 if !(c == &context_key_distribution && t.is_empty()) {
404 return false;
405 }
406 let bins: Vec<_> = points
407 .ordered_bins()
408 .into_iter()
409 .filter(|(_, w)| *w != 0.0)
410 .collect();
411 assert_eq!(bins.len(), 3);
412 assert_approx_eq!(bins[0].0, 1.0, 1.0e-1);
415 assert_approx_eq!(bins[0].1, 2.0);
416 assert_approx_eq!(bins[1].0, 100.0, 1.0);
417 assert_approx_eq!(bins[1].1, 1.0);
418 assert_approx_eq!(bins[2].0, 1000.0, 10.0);
419 assert_approx_eq!(bins[2].1, 1.0);
420 true
421 },
422 &|(c, t, points)| {
423 if !(c == &context_key_distribution_2 && t.is_empty()) {
424 return false;
425 }
426 let bins: Vec<_> = points
427 .ordered_bins()
428 .into_iter()
429 .filter(|(_, w)| *w != 0.0)
430 .collect();
431 assert_eq!(bins.len(), 2);
432 assert_approx_eq!(bins[0].0, 2.0, 1.0e-1);
433 assert_approx_eq!(bins[0].1, 1.0);
434 assert_approx_eq!(bins[1].0, 200.0, 1.0);
435 assert_approx_eq!(bins[1].1, 1.0);
436 true
437 },
438 &|(c, t, points)| {
439 if !(c == &context_key_distribution_2 && !t.is_empty()) {
440 return false;
441 }
442 let bins: Vec<_> = points
443 .ordered_bins()
444 .into_iter()
445 .filter(|(_, w)| *w != 0.0)
446 .collect();
447 assert_eq!(bins.len(), 2);
448 assert_approx_eq!(bins[0].0, 3.0, 1.0e-1);
449 assert_approx_eq!(bins[0].1, 1.0);
450 assert_approx_eq!(bins[1].0, 300.0, 1.0);
451 assert_approx_eq!(bins[1].1, 1.0);
452 true
453 },
454 ],
455 )
456 }
457
458 #[test]
459 fn test_stats() {
460 let mut buckets = MetricBuckets::default();
461 let contexts = MetricContexts::default();
462
463 let context_key_1 = contexts.register_metric_context(
464 "metric1".into(),
465 Vec::new(),
466 MetricType::Count,
467 false,
468 MetricNamespace::Tracers,
469 );
470
471 let context_key_2 = contexts.register_metric_context(
472 "metric2".into(),
473 Vec::new(),
474 MetricType::Gauge,
475 false,
476 MetricNamespace::Tracers,
477 );
478
479 let context_key_distribution = contexts.register_metric_context(
480 "metric_distribution".into(),
481 Vec::new(),
482 MetricType::Distribution,
483 false,
484 MetricNamespace::Tracers,
485 );
486
487 let context_key_distribution_2 = contexts.register_metric_context(
488 "metric_distribution_2".into(),
489 Vec::new(),
490 MetricType::Distribution,
491 false,
492 MetricNamespace::Tracers,
493 );
494
495 buckets.add_point(context_key_1, 1.0, Vec::new());
497 buckets.add_point(context_key_2, 2.0, Vec::new());
498 buckets.flush_aggregates();
499
500 buckets.add_point(context_key_1, 1.0, Vec::new());
501 buckets.add_point(context_key_2, 2.0, Vec::new());
502 buckets.flush_aggregates();
503
504 buckets.add_point(context_key_1, 1.1, Vec::new());
505 buckets.add_point(context_key_1, 2.1, Vec::new());
506 buckets.flush_aggregates();
507
508 buckets.add_point(context_key_1, 1.0, Vec::new());
510 buckets.add_point(context_key_2, 2.0, Vec::new());
511
512 buckets.add_point(context_key_distribution, 1.0, Vec::new());
514 buckets.add_point(context_key_distribution, 1.1, Vec::new());
515 buckets.add_point(context_key_distribution, 1.2, Vec::new());
516
517 buckets.add_point(context_key_distribution_2, 2.0, Vec::new());
518 buckets.add_point(context_key_distribution_2, 2.1, Vec::new());
519
520 let stats = buckets.stats();
521
522 assert_eq!(stats.buckets, 2);
523 assert_eq!(stats.series, 2);
524 assert_eq!(stats.series_points, 5);
525 assert_eq!(stats.distributions, 2);
526 assert_eq!(stats.distributions_points, 5);
527 }
528}