opentelemetry_spanprocessor_any/sdk/metrics/aggregators/
histogram.rs1use crate::metrics::{AtomicNumber, Descriptor, MetricsError, Number, NumberKind, Result};
2use crate::sdk::export::metrics::{Buckets, Count, Histogram, Sum};
3use crate::sdk::metrics::export::metrics::Aggregator;
4use std::mem;
5use std::sync::{Arc, RwLock};
6
7pub fn histogram(_desc: &Descriptor, boundaries: &[f64]) -> HistogramAggregator {
9 let mut sorted_boundaries = boundaries.to_owned();
10 sorted_boundaries.sort_by(|a, b| a.partial_cmp(b).unwrap());
11 let state = State::empty(&sorted_boundaries);
12
13 HistogramAggregator {
14 inner: RwLock::new(Inner {
15 boundaries: sorted_boundaries,
16 state,
17 }),
18 }
19}
20
21#[derive(Debug)]
24pub struct HistogramAggregator {
25 inner: RwLock<Inner>,
26}
27
28#[derive(Debug)]
29struct Inner {
30 boundaries: Vec<f64>,
31 state: State,
32}
33
34#[derive(Debug)]
35struct State {
36 bucket_counts: Vec<f64>,
37 count: AtomicNumber,
38 sum: AtomicNumber,
39}
40
41impl State {
42 fn empty(boundaries: &[f64]) -> Self {
43 State {
44 bucket_counts: vec![0.0; boundaries.len() + 1],
45 count: NumberKind::U64.zero().to_atomic(),
46 sum: NumberKind::U64.zero().to_atomic(),
47 }
48 }
49}
50
51impl Sum for HistogramAggregator {
52 fn sum(&self) -> Result<Number> {
53 self.inner
54 .read()
55 .map_err(From::from)
56 .map(|inner| inner.state.sum.load())
57 }
58}
59
60impl Count for HistogramAggregator {
61 fn count(&self) -> Result<u64> {
62 self.inner
63 .read()
64 .map_err(From::from)
65 .map(|inner| inner.state.count.load().to_u64(&NumberKind::U64))
66 }
67}
68
69impl Histogram for HistogramAggregator {
70 fn histogram(&self) -> Result<Buckets> {
71 self.inner
72 .read()
73 .map_err(From::from)
74 .map(|inner| Buckets::new(inner.boundaries.clone(), inner.state.bucket_counts.clone()))
75 }
76}
77
78impl Aggregator for HistogramAggregator {
79 fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()> {
80 self.inner.write().map_err(From::from).map(|mut inner| {
81 let kind = descriptor.number_kind();
82 let as_float = number.to_f64(kind);
83
84 let mut bucket_id = inner.boundaries.len();
85 for (idx, boundary) in inner.boundaries.iter().enumerate() {
86 if as_float < *boundary {
87 bucket_id = idx;
88 break;
89 }
90 }
91
92 inner.state.count.fetch_add(&NumberKind::U64, &1u64.into());
93 inner.state.sum.fetch_add(kind, number);
94 inner.state.bucket_counts[bucket_id] += 1.0;
95 })
96 }
97
98 fn synchronized_move(
99 &self,
100 other: &Arc<dyn Aggregator + Send + Sync>,
101 _descriptor: &crate::metrics::Descriptor,
102 ) -> Result<()> {
103 if let Some(other) = other.as_any().downcast_ref::<Self>() {
104 self.inner
105 .write()
106 .map_err(From::from)
107 .and_then(|mut inner| {
108 other.inner.write().map_err(From::from).map(|mut other| {
109 let empty = State::empty(&inner.boundaries);
110 other.state = mem::replace(&mut inner.state, empty)
111 })
112 })
113 } else {
114 Err(MetricsError::InconsistentAggregator(format!(
115 "Expected {:?}, got: {:?}",
116 self, other
117 )))
118 }
119 }
120
121 fn merge(&self, other: &(dyn Aggregator + Send + Sync), desc: &Descriptor) -> Result<()> {
122 if let Some(other) = other.as_any().downcast_ref::<HistogramAggregator>() {
123 self.inner
124 .write()
125 .map_err(From::from)
126 .and_then(|mut inner| {
127 other.inner.read().map_err(From::from).map(|other| {
128 inner
129 .state
130 .sum
131 .fetch_add(desc.number_kind(), &other.state.sum.load());
132 inner
133 .state
134 .count
135 .fetch_add(&NumberKind::U64, &other.state.count.load());
136
137 for idx in 0..inner.state.bucket_counts.len() {
138 inner.state.bucket_counts[idx] += other.state.bucket_counts[idx];
139 }
140 })
141 })
142 } else {
143 Err(MetricsError::InconsistentAggregator(format!(
144 "Expected {:?}, got: {:?}",
145 self, other
146 )))
147 }
148 }
149
150 fn as_any(&self) -> &dyn std::any::Any {
151 self
152 }
153}