opentelemetry_spanprocessor_any/sdk/metrics/aggregators/
array.rs

1use crate::metrics::{AtomicNumber, Descriptor, MetricsError, Number, NumberKind, Result};
2use crate::sdk::{
3    export::metrics::{Count, Points},
4    metrics::Aggregator,
5};
6use std::any::Any;
7use std::mem;
8use std::sync::{Arc, Mutex};
9
10/// Create a new default `ArrayAggregator`
11pub fn array() -> ArrayAggregator {
12    ArrayAggregator::default()
13}
14
15/// An aggregator which stores metrics in an array.
16#[derive(Debug, Default)]
17pub struct ArrayAggregator {
18    inner: Mutex<Inner>,
19}
20
21impl Count for ArrayAggregator {
22    fn count(&self) -> Result<u64> {
23        self.inner
24            .lock()
25            .map_err(Into::into)
26            .map(|inner| inner.points.as_ref().map_or(0, |p| p.len() as u64))
27    }
28}
29
30impl Points for ArrayAggregator {
31    fn points(&self) -> Result<Vec<Number>> {
32        self.inner
33            .lock()
34            .map_err(Into::into)
35            .map(|inner| inner.points.as_ref().map_or_else(Vec::new, |p| p.0.clone()))
36    }
37}
38
39impl Aggregator for ArrayAggregator {
40    fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()> {
41        self.inner.lock().map_err(Into::into).map(|mut inner| {
42            if let Some(points) = inner.points.as_mut() {
43                points.push(number.clone());
44            } else {
45                inner.points = Some(PointsData::with_number(number.clone()));
46            }
47            inner.sum.fetch_add(descriptor.number_kind(), number)
48        })
49    }
50
51    fn synchronized_move(
52        &self,
53        other: &Arc<dyn Aggregator + Send + Sync>,
54        descriptor: &Descriptor,
55    ) -> Result<()> {
56        if let Some(other) = other.as_any().downcast_ref::<Self>() {
57            other
58                .inner
59                .lock()
60                .map_err(Into::into)
61                .and_then(|mut other| {
62                    self.inner.lock().map_err(Into::into).map(|mut inner| {
63                        other.points = mem::take(&mut inner.points);
64                        other.sum = mem::replace(
65                            &mut inner.sum,
66                            descriptor.number_kind().zero().to_atomic(),
67                        );
68
69                        // TODO: This sort should be done lazily, only when quantiles are
70                        // requested. The SDK specification says you can use this aggregator to
71                        // simply list values in the order they were received as an alternative to
72                        // requesting quantile information.
73                        if let Some(points) = &mut other.points {
74                            points.sort(descriptor.number_kind());
75                        }
76                    })
77                })
78        } else {
79            Err(MetricsError::InconsistentAggregator(format!(
80                "Expected {:?}, got: {:?}",
81                self, other
82            )))
83        }
84    }
85    fn merge(&self, other: &(dyn Aggregator + Send + Sync), desc: &Descriptor) -> Result<()> {
86        if let Some(other) = other.as_any().downcast_ref::<Self>() {
87            self.inner.lock().map_err(Into::into).and_then(|mut inner| {
88                other.inner.lock().map_err(From::from).map(|other_inner| {
89                    // Note: Current assumption is that `o` was checkpointed,
90                    // therefore is already sorted.  See the TODO above, since
91                    // this is an open question.
92                    inner
93                        .sum
94                        .fetch_add(desc.number_kind(), &other_inner.sum.load());
95                    match (inner.points.as_mut(), other_inner.points.as_ref()) {
96                        (Some(points), Some(other_points)) => {
97                            points.combine(desc.number_kind(), other_points)
98                        }
99                        (None, Some(other_points)) => inner.points = Some(other_points.clone()),
100                        _ => (),
101                    }
102                })
103            })
104        } else {
105            Err(MetricsError::InconsistentAggregator(format!(
106                "Expected {:?}, got: {:?}",
107                self, other
108            )))
109        }
110    }
111
112    fn as_any(&self) -> &dyn Any {
113        self
114    }
115}
116
117#[derive(Debug, Default)]
118struct Inner {
119    sum: AtomicNumber,
120    points: Option<PointsData>,
121}
122
123#[derive(Clone, Debug, Default)]
124struct PointsData(Vec<Number>);
125
126impl PointsData {
127    fn with_number(number: Number) -> Self {
128        PointsData(vec![number])
129    }
130
131    fn len(&self) -> usize {
132        self.0.len()
133    }
134
135    fn push(&mut self, number: Number) {
136        self.0.push(number)
137    }
138
139    fn sort(&mut self, kind: &NumberKind) {
140        match kind {
141            NumberKind::I64 => self.0.sort_by_key(|a| a.to_u64(kind)),
142            NumberKind::F64 => self.0.sort_by(|a, b| {
143                a.to_f64(kind)
144                    .partial_cmp(&b.to_f64(kind))
145                    .expect("nan values should be rejected. This is a bug.")
146            }),
147            NumberKind::U64 => self.0.sort_by_key(|a| a.to_u64(kind)),
148        }
149    }
150    fn combine(&mut self, kind: &NumberKind, other: &PointsData) {
151        self.0.append(&mut other.0.clone());
152        self.sort(kind)
153    }
154}