opentelemetry_spanprocessor_any/sdk/metrics/aggregators/
sum.rs

1use crate::metrics::{AtomicNumber, Descriptor, MetricsError, Number, Result};
2use crate::sdk::export::metrics::{Aggregator, Subtractor, Sum};
3use std::any::Any;
4use std::sync::Arc;
5
6/// Create a new sum aggregator.
7pub fn sum() -> SumAggregator {
8    SumAggregator::default()
9}
10
11/// An aggregator for counter events.
12#[derive(Debug, Default)]
13pub struct SumAggregator {
14    value: AtomicNumber,
15}
16
17impl Sum for SumAggregator {
18    fn sum(&self) -> Result<Number> {
19        Ok(self.value.load())
20    }
21}
22
23impl Subtractor for SumAggregator {
24    fn subtract(
25        &self,
26        operand: &(dyn Aggregator + Send + Sync),
27        result: &(dyn Aggregator + Send + Sync),
28        descriptor: &Descriptor,
29    ) -> Result<()> {
30        match (
31            operand.as_any().downcast_ref::<Self>(),
32            result.as_any().downcast_ref::<Self>(),
33        ) {
34            (Some(op), Some(res)) => {
35                res.value.store(&self.value.load());
36                res.value
37                    .fetch_add(descriptor.number_kind(), &op.value.load());
38                Ok(())
39            }
40            _ => Err(MetricsError::InconsistentAggregator(format!(
41                "Expected {:?}, got: {:?} and {:?}",
42                self, operand, result
43            ))),
44        }
45    }
46}
47
48impl Aggregator for SumAggregator {
49    fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()> {
50        self.value.fetch_add(descriptor.number_kind(), number);
51        Ok(())
52    }
53    fn synchronized_move(
54        &self,
55        other: &Arc<dyn Aggregator + Send + Sync>,
56        descriptor: &Descriptor,
57    ) -> Result<()> {
58        if let Some(other) = other.as_any().downcast_ref::<Self>() {
59            let kind = descriptor.number_kind();
60            other.value.store(&self.value.load());
61            self.value.store(&kind.zero());
62            Ok(())
63        } else {
64            Err(MetricsError::InconsistentAggregator(format!(
65                "Expected {:?}, got: {:?}",
66                self, other
67            )))
68        }
69    }
70    fn merge(&self, other: &(dyn Aggregator + Send + Sync), descriptor: &Descriptor) -> Result<()> {
71        if let Some(other_sum) = other.as_any().downcast_ref::<SumAggregator>() {
72            self.value
73                .fetch_add(descriptor.number_kind(), &other_sum.value.load())
74        }
75
76        Ok(())
77    }
78    fn as_any(&self) -> &dyn Any {
79        self
80    }
81}