opentelemetry_spanprocessor_any/sdk/metrics/aggregators/
array.rs1use crate::metrics::{AtomicNumber, Descriptor, MetricsError, Number, NumberKind, Result};
2use crate::sdk::{
3 export::metrics::{Count, Points},
4 metrics::Aggregator,
5};
6use std::any::Any;
7use std::mem;
8use std::sync::{Arc, Mutex};
9
10pub fn array() -> ArrayAggregator {
12 ArrayAggregator::default()
13}
14
15#[derive(Debug, Default)]
17pub struct ArrayAggregator {
18 inner: Mutex<Inner>,
19}
20
21impl Count for ArrayAggregator {
22 fn count(&self) -> Result<u64> {
23 self.inner
24 .lock()
25 .map_err(Into::into)
26 .map(|inner| inner.points.as_ref().map_or(0, |p| p.len() as u64))
27 }
28}
29
30impl Points for ArrayAggregator {
31 fn points(&self) -> Result<Vec<Number>> {
32 self.inner
33 .lock()
34 .map_err(Into::into)
35 .map(|inner| inner.points.as_ref().map_or_else(Vec::new, |p| p.0.clone()))
36 }
37}
38
39impl Aggregator for ArrayAggregator {
40 fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()> {
41 self.inner.lock().map_err(Into::into).map(|mut inner| {
42 if let Some(points) = inner.points.as_mut() {
43 points.push(number.clone());
44 } else {
45 inner.points = Some(PointsData::with_number(number.clone()));
46 }
47 inner.sum.fetch_add(descriptor.number_kind(), number)
48 })
49 }
50
51 fn synchronized_move(
52 &self,
53 other: &Arc<dyn Aggregator + Send + Sync>,
54 descriptor: &Descriptor,
55 ) -> Result<()> {
56 if let Some(other) = other.as_any().downcast_ref::<Self>() {
57 other
58 .inner
59 .lock()
60 .map_err(Into::into)
61 .and_then(|mut other| {
62 self.inner.lock().map_err(Into::into).map(|mut inner| {
63 other.points = mem::take(&mut inner.points);
64 other.sum = mem::replace(
65 &mut inner.sum,
66 descriptor.number_kind().zero().to_atomic(),
67 );
68
69 if let Some(points) = &mut other.points {
74 points.sort(descriptor.number_kind());
75 }
76 })
77 })
78 } else {
79 Err(MetricsError::InconsistentAggregator(format!(
80 "Expected {:?}, got: {:?}",
81 self, other
82 )))
83 }
84 }
85 fn merge(&self, other: &(dyn Aggregator + Send + Sync), desc: &Descriptor) -> Result<()> {
86 if let Some(other) = other.as_any().downcast_ref::<Self>() {
87 self.inner.lock().map_err(Into::into).and_then(|mut inner| {
88 other.inner.lock().map_err(From::from).map(|other_inner| {
89 inner
93 .sum
94 .fetch_add(desc.number_kind(), &other_inner.sum.load());
95 match (inner.points.as_mut(), other_inner.points.as_ref()) {
96 (Some(points), Some(other_points)) => {
97 points.combine(desc.number_kind(), other_points)
98 }
99 (None, Some(other_points)) => inner.points = Some(other_points.clone()),
100 _ => (),
101 }
102 })
103 })
104 } else {
105 Err(MetricsError::InconsistentAggregator(format!(
106 "Expected {:?}, got: {:?}",
107 self, other
108 )))
109 }
110 }
111
112 fn as_any(&self) -> &dyn Any {
113 self
114 }
115}
116
117#[derive(Debug, Default)]
118struct Inner {
119 sum: AtomicNumber,
120 points: Option<PointsData>,
121}
122
123#[derive(Clone, Debug, Default)]
124struct PointsData(Vec<Number>);
125
126impl PointsData {
127 fn with_number(number: Number) -> Self {
128 PointsData(vec![number])
129 }
130
131 fn len(&self) -> usize {
132 self.0.len()
133 }
134
135 fn push(&mut self, number: Number) {
136 self.0.push(number)
137 }
138
139 fn sort(&mut self, kind: &NumberKind) {
140 match kind {
141 NumberKind::I64 => self.0.sort_by_key(|a| a.to_u64(kind)),
142 NumberKind::F64 => self.0.sort_by(|a, b| {
143 a.to_f64(kind)
144 .partial_cmp(&b.to_f64(kind))
145 .expect("nan values should be rejected. This is a bug.")
146 }),
147 NumberKind::U64 => self.0.sort_by_key(|a| a.to_u64(kind)),
148 }
149 }
150 fn combine(&mut self, kind: &NumberKind, other: &PointsData) {
151 self.0.append(&mut other.0.clone());
152 self.sort(kind)
153 }
154}