opentelemetry_spanprocessor_any/metrics/
async_instrument.rs1use crate::{
3 global,
4 metrics::{sdk_api, MetricsError, Number},
5 KeyValue,
6};
7use std::fmt;
8use std::marker;
9use std::sync::Arc;
10
11#[derive(Debug)]
17pub struct Observation {
18 number: Number,
19 instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
20}
21
22impl Observation {
23 pub(crate) fn new(number: Number, instrument: Arc<dyn sdk_api::AsyncInstrumentCore>) -> Self {
25 Observation { number, instrument }
26 }
27
28 pub fn number(&self) -> &Number {
30 &self.number
31 }
32 pub fn instrument(&self) -> &Arc<dyn sdk_api::AsyncInstrumentCore> {
34 &self.instrument
35 }
36}
37
38type F64ObserverCallback = Box<dyn Fn(ObserverResult<f64>) + Send + Sync>;
40
41type U64ObserverCallback = Box<dyn Fn(ObserverResult<u64>) + Send + Sync>;
43
44type I64ObserverCallback = Box<dyn Fn(ObserverResult<i64>) + Send + Sync>;
46
47type BatchObserverCallback = Box<dyn Fn(BatchObserverResult) + Send + Sync>;
50
51pub struct ObserverResult<T> {
54 instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
55 f: fn(&[KeyValue], &[Observation]),
56 _marker: marker::PhantomData<T>,
57}
58
59impl<T> ObserverResult<T>
60where
61 T: Into<Number>,
62{
63 fn new(
65 instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
66 f: fn(&[KeyValue], &[Observation]),
67 ) -> Self {
68 ObserverResult {
69 instrument,
70 f,
71 _marker: marker::PhantomData,
72 }
73 }
74
75 pub fn observe(&self, value: T, attributes: &[KeyValue]) {
78 (self.f)(
79 attributes,
80 &[Observation {
81 number: value.into(),
82 instrument: self.instrument.clone(),
83 }],
84 )
85 }
86}
87
88impl<T> fmt::Debug for ObserverResult<T> {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 f.debug_struct("ObserverResult")
91 .field("instrument", &self.instrument)
92 .field("f", &"fn(&[KeyValue], &[Observation])")
93 .finish()
94 }
95}
96
97pub struct BatchObserverResult {
100 f: fn(&[KeyValue], &[Observation]),
101}
102
103impl fmt::Debug for BatchObserverResult {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 f.debug_struct("BatchObserverResult")
106 .field("f", &"fn(&[KeyValue], &[Observation])")
107 .finish()
108 }
109}
110
111impl BatchObserverResult {
112 fn new(f: fn(&[KeyValue], &[Observation])) -> Self {
114 BatchObserverResult { f }
115 }
116
117 pub fn observe(&self, attributes: &[KeyValue], observations: &[Observation]) {
120 (self.f)(attributes, observations)
121 }
122}
123
124pub enum AsyncRunner {
126 F64(F64ObserverCallback),
128 I64(I64ObserverCallback),
130 U64(U64ObserverCallback),
132 Batch(BatchObserverCallback),
134}
135
136impl AsyncRunner {
137 pub fn run(
142 &self,
143 instrument: &Option<Arc<dyn sdk_api::AsyncInstrumentCore>>,
144 f: fn(&[KeyValue], &[Observation]),
145 ) {
146 match (instrument, self) {
147 (Some(i), AsyncRunner::F64(run)) => run(ObserverResult::new(i.clone(), f)),
148 (Some(i), AsyncRunner::I64(run)) => run(ObserverResult::new(i.clone(), f)),
149 (Some(i), AsyncRunner::U64(run)) => run(ObserverResult::new(i.clone(), f)),
150 (None, AsyncRunner::Batch(run)) => run(BatchObserverResult::new(f)),
151 _ => global::handle_error(MetricsError::Other(
152 "Invalid async runner / instrument pair".into(),
153 )),
154 }
155 }
156}
157
158impl fmt::Debug for AsyncRunner {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 match self {
161 AsyncRunner::F64(_) => f
162 .debug_struct("AsyncRunner::F64")
163 .field("closure", &"Fn(ObserverResult)")
164 .finish(),
165 AsyncRunner::I64(_) => f
166 .debug_struct("AsyncRunner::I64")
167 .field("closure", &"Fn(ObserverResult)")
168 .finish(),
169 AsyncRunner::U64(_) => f
170 .debug_struct("AsyncRunner::U64")
171 .field("closure", &"Fn(ObserverResult)")
172 .finish(),
173 AsyncRunner::Batch(_) => f
174 .debug_struct("AsyncRunner::Batch")
175 .field("closure", &"Fn(BatchObserverResult)")
176 .finish(),
177 }
178 }
179}