opentelemetry_spanprocessor_any/sdk/metrics/aggregators/
histogram.rs

1use crate::metrics::{AtomicNumber, Descriptor, MetricsError, Number, NumberKind, Result};
2use crate::sdk::export::metrics::{Buckets, Count, Histogram, Sum};
3use crate::sdk::metrics::export::metrics::Aggregator;
4use std::mem;
5use std::sync::{Arc, RwLock};
6
7/// Create a new histogram for the given descriptor with the given boundaries
8pub fn histogram(_desc: &Descriptor, boundaries: &[f64]) -> HistogramAggregator {
9    let mut sorted_boundaries = boundaries.to_owned();
10    sorted_boundaries.sort_by(|a, b| a.partial_cmp(b).unwrap());
11    let state = State::empty(&sorted_boundaries);
12
13    HistogramAggregator {
14        inner: RwLock::new(Inner {
15            boundaries: sorted_boundaries,
16            state,
17        }),
18    }
19}
20
21/// This aggregator observes events and counts them in pre-determined buckets. It
22/// also calculates the sum and count of all events.
23#[derive(Debug)]
24pub struct HistogramAggregator {
25    inner: RwLock<Inner>,
26}
27
28#[derive(Debug)]
29struct Inner {
30    boundaries: Vec<f64>,
31    state: State,
32}
33
34#[derive(Debug)]
35struct State {
36    bucket_counts: Vec<f64>,
37    count: AtomicNumber,
38    sum: AtomicNumber,
39}
40
41impl State {
42    fn empty(boundaries: &[f64]) -> Self {
43        State {
44            bucket_counts: vec![0.0; boundaries.len() + 1],
45            count: NumberKind::U64.zero().to_atomic(),
46            sum: NumberKind::U64.zero().to_atomic(),
47        }
48    }
49}
50
51impl Sum for HistogramAggregator {
52    fn sum(&self) -> Result<Number> {
53        self.inner
54            .read()
55            .map_err(From::from)
56            .map(|inner| inner.state.sum.load())
57    }
58}
59
60impl Count for HistogramAggregator {
61    fn count(&self) -> Result<u64> {
62        self.inner
63            .read()
64            .map_err(From::from)
65            .map(|inner| inner.state.count.load().to_u64(&NumberKind::U64))
66    }
67}
68
69impl Histogram for HistogramAggregator {
70    fn histogram(&self) -> Result<Buckets> {
71        self.inner
72            .read()
73            .map_err(From::from)
74            .map(|inner| Buckets::new(inner.boundaries.clone(), inner.state.bucket_counts.clone()))
75    }
76}
77
78impl Aggregator for HistogramAggregator {
79    fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()> {
80        self.inner.write().map_err(From::from).map(|mut inner| {
81            let kind = descriptor.number_kind();
82            let as_float = number.to_f64(kind);
83
84            let mut bucket_id = inner.boundaries.len();
85            for (idx, boundary) in inner.boundaries.iter().enumerate() {
86                if as_float < *boundary {
87                    bucket_id = idx;
88                    break;
89                }
90            }
91
92            inner.state.count.fetch_add(&NumberKind::U64, &1u64.into());
93            inner.state.sum.fetch_add(kind, number);
94            inner.state.bucket_counts[bucket_id] += 1.0;
95        })
96    }
97
98    fn synchronized_move(
99        &self,
100        other: &Arc<dyn Aggregator + Send + Sync>,
101        _descriptor: &crate::metrics::Descriptor,
102    ) -> Result<()> {
103        if let Some(other) = other.as_any().downcast_ref::<Self>() {
104            self.inner
105                .write()
106                .map_err(From::from)
107                .and_then(|mut inner| {
108                    other.inner.write().map_err(From::from).map(|mut other| {
109                        let empty = State::empty(&inner.boundaries);
110                        other.state = mem::replace(&mut inner.state, empty)
111                    })
112                })
113        } else {
114            Err(MetricsError::InconsistentAggregator(format!(
115                "Expected {:?}, got: {:?}",
116                self, other
117            )))
118        }
119    }
120
121    fn merge(&self, other: &(dyn Aggregator + Send + Sync), desc: &Descriptor) -> Result<()> {
122        if let Some(other) = other.as_any().downcast_ref::<HistogramAggregator>() {
123            self.inner
124                .write()
125                .map_err(From::from)
126                .and_then(|mut inner| {
127                    other.inner.read().map_err(From::from).map(|other| {
128                        inner
129                            .state
130                            .sum
131                            .fetch_add(desc.number_kind(), &other.state.sum.load());
132                        inner
133                            .state
134                            .count
135                            .fetch_add(&NumberKind::U64, &other.state.count.load());
136
137                        for idx in 0..inner.state.bucket_counts.len() {
138                            inner.state.bucket_counts[idx] += other.state.bucket_counts[idx];
139                        }
140                    })
141                })
142        } else {
143            Err(MetricsError::InconsistentAggregator(format!(
144                "Expected {:?}, got: {:?}",
145                self, other
146            )))
147        }
148    }
149
150    fn as_any(&self) -> &dyn std::any::Any {
151        self
152    }
153}