opentelemetry_spanprocessor_any/metrics/
async_instrument.rs

1//! Async metrics
2use crate::{
3    global,
4    metrics::{sdk_api, MetricsError, Number},
5    KeyValue,
6};
7use std::fmt;
8use std::marker;
9use std::sync::Arc;
10
11/// Observation is used for reporting an asynchronous batch of metric values.
12/// Instances of this type should be created by asynchronous instruments (e.g.,
13/// [ValueObserver::observation]).
14///
15/// [ValueObserver::observation]: crate::metrics::ValueObserver::observation()
16#[derive(Debug)]
17pub struct Observation {
18    number: Number,
19    instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
20}
21
22impl Observation {
23    /// Create a new observation for an instrument
24    pub(crate) fn new(number: Number, instrument: Arc<dyn sdk_api::AsyncInstrumentCore>) -> Self {
25        Observation { number, instrument }
26    }
27
28    /// The value of this observation
29    pub fn number(&self) -> &Number {
30        &self.number
31    }
32    /// The instrument used to record this observation
33    pub fn instrument(&self) -> &Arc<dyn sdk_api::AsyncInstrumentCore> {
34        &self.instrument
35    }
36}
37
38/// A type of callback that `f64` observers run.
39type F64ObserverCallback = Box<dyn Fn(ObserverResult<f64>) + Send + Sync>;
40
41/// A type of callback that `u64` observers run.
42type U64ObserverCallback = Box<dyn Fn(ObserverResult<u64>) + Send + Sync>;
43
44/// A type of callback that `u64` observers run.
45type I64ObserverCallback = Box<dyn Fn(ObserverResult<i64>) + Send + Sync>;
46
47/// A callback argument for use with any Observer instrument that will be
48/// reported as a batch of observations.
49type BatchObserverCallback = Box<dyn Fn(BatchObserverResult) + Send + Sync>;
50
51/// Data passed to an observer callback to capture observations for one
52/// asynchronous metric instrument.
53pub struct ObserverResult<T> {
54    instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
55    f: fn(&[KeyValue], &[Observation]),
56    _marker: marker::PhantomData<T>,
57}
58
59impl<T> ObserverResult<T>
60where
61    T: Into<Number>,
62{
63    /// New observer result for a given metric instrument
64    fn new(
65        instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
66        f: fn(&[KeyValue], &[Observation]),
67    ) -> Self {
68        ObserverResult {
69            instrument,
70            f,
71            _marker: marker::PhantomData,
72        }
73    }
74
75    /// Observe captures a single value from the associated instrument callback,
76    /// with the given attributes.
77    pub fn observe(&self, value: T, attributes: &[KeyValue]) {
78        (self.f)(
79            attributes,
80            &[Observation {
81                number: value.into(),
82                instrument: self.instrument.clone(),
83            }],
84        )
85    }
86}
87
88impl<T> fmt::Debug for ObserverResult<T> {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        f.debug_struct("ObserverResult")
91            .field("instrument", &self.instrument)
92            .field("f", &"fn(&[KeyValue], &[Observation])")
93            .finish()
94    }
95}
96
97/// Passed to a batch observer callback to capture observations for multiple
98/// asynchronous instruments.
99pub struct BatchObserverResult {
100    f: fn(&[KeyValue], &[Observation]),
101}
102
103impl fmt::Debug for BatchObserverResult {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        f.debug_struct("BatchObserverResult")
106            .field("f", &"fn(&[KeyValue], &[Observation])")
107            .finish()
108    }
109}
110
111impl BatchObserverResult {
112    /// New observer result for a given metric instrument
113    fn new(f: fn(&[KeyValue], &[Observation])) -> Self {
114        BatchObserverResult { f }
115    }
116
117    /// Captures multiple observations from the associated batch instrument
118    /// callback, with the given attributes.
119    pub fn observe(&self, attributes: &[KeyValue], observations: &[Observation]) {
120        (self.f)(attributes, observations)
121    }
122}
123
124/// Called when collecting async instruments
125pub enum AsyncRunner {
126    /// Callback for `f64` observed values
127    F64(F64ObserverCallback),
128    /// Callback for `i64` observed values
129    I64(I64ObserverCallback),
130    /// Callback for `u64` observed values
131    U64(U64ObserverCallback),
132    /// Callback for batch observed values
133    Batch(BatchObserverCallback),
134}
135
136impl AsyncRunner {
137    /// Run accepts a single instrument and function for capturing observations
138    /// of that instrument. Each call to the function receives one captured
139    /// observation. (The function accepts multiple observations so the same
140    /// implementation can be used for batch runners.)
141    pub fn run(
142        &self,
143        instrument: &Option<Arc<dyn sdk_api::AsyncInstrumentCore>>,
144        f: fn(&[KeyValue], &[Observation]),
145    ) {
146        match (instrument, self) {
147            (Some(i), AsyncRunner::F64(run)) => run(ObserverResult::new(i.clone(), f)),
148            (Some(i), AsyncRunner::I64(run)) => run(ObserverResult::new(i.clone(), f)),
149            (Some(i), AsyncRunner::U64(run)) => run(ObserverResult::new(i.clone(), f)),
150            (None, AsyncRunner::Batch(run)) => run(BatchObserverResult::new(f)),
151            _ => global::handle_error(MetricsError::Other(
152                "Invalid async runner / instrument pair".into(),
153            )),
154        }
155    }
156}
157
158impl fmt::Debug for AsyncRunner {
159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160        match self {
161            AsyncRunner::F64(_) => f
162                .debug_struct("AsyncRunner::F64")
163                .field("closure", &"Fn(ObserverResult)")
164                .finish(),
165            AsyncRunner::I64(_) => f
166                .debug_struct("AsyncRunner::I64")
167                .field("closure", &"Fn(ObserverResult)")
168                .finish(),
169            AsyncRunner::U64(_) => f
170                .debug_struct("AsyncRunner::U64")
171                .field("closure", &"Fn(ObserverResult)")
172                .finish(),
173            AsyncRunner::Batch(_) => f
174                .debug_struct("AsyncRunner::Batch")
175                .field("closure", &"Fn(BatchObserverResult)")
176                .finish(),
177        }
178    }
179}