opentelemetry_spanprocessor_any/metrics/
registry.rs

1//! Metrics Registry API
2use crate::{
3    metrics::{
4        sdk_api::{AsyncInstrumentCore, MeterCore, SyncInstrumentCore},
5        Meter, MeterProvider,
6    },
7    metrics::{AsyncRunner, Descriptor, Measurement, MetricsError, Result},
8    Context, KeyValue,
9};
10use std::collections::HashMap;
11use std::sync::{Arc, Mutex};
12
13/// Create a new `RegistryMeterProvider` from a `MeterCore`.
14pub fn meter_provider(core: Arc<dyn MeterCore + Send + Sync>) -> RegistryMeterProvider {
15    RegistryMeterProvider(Arc::new(UniqueInstrumentMeterCore::wrap(core)))
16}
17
18/// A standard `MeterProvider` for wrapping a `MeterCore`.
19#[derive(Debug, Clone)]
20pub struct RegistryMeterProvider(Arc<dyn MeterCore + Send + Sync>);
21
22impl MeterProvider for RegistryMeterProvider {
23    fn meter(&self, name: &'static str, version: Option<&'static str>) -> Meter {
24        Meter::new(name, version, self.0.clone())
25    }
26}
27
28#[derive(Debug)]
29struct UniqueInstrumentMeterCore {
30    inner: Arc<dyn MeterCore + Send + Sync>,
31    sync_state: Mutex<HashMap<UniqueInstrumentKey, UniqueSyncInstrument>>,
32    async_state: Mutex<HashMap<UniqueInstrumentKey, UniqueAsyncInstrument>>,
33}
34
35impl UniqueInstrumentMeterCore {
36    fn wrap(inner: Arc<dyn MeterCore + Send + Sync>) -> Self {
37        UniqueInstrumentMeterCore {
38            inner,
39            sync_state: Mutex::new(HashMap::default()),
40            async_state: Mutex::new(HashMap::default()),
41        }
42    }
43}
44
45impl MeterCore for UniqueInstrumentMeterCore {
46    fn record_batch_with_context(
47        &self,
48        cx: &Context,
49        attributes: &[KeyValue],
50        measurements: Vec<Measurement>,
51    ) {
52        self.inner
53            .record_batch_with_context(cx, attributes, measurements)
54    }
55
56    fn new_sync_instrument(&self, descriptor: Descriptor) -> Result<UniqueSyncInstrument> {
57        self.sync_state
58            .lock()
59            .map_err(Into::into)
60            .and_then(|mut state| {
61                let key = UniqueInstrumentKey::from(&descriptor);
62                check_sync_uniqueness(&state, &descriptor, &key).and_then(|instrument| {
63                    match instrument {
64                        Some(instrument) => Ok(instrument),
65                        None => {
66                            let instrument = self.inner.new_sync_instrument(descriptor)?;
67                            state.insert(key, instrument.clone());
68
69                            Ok(instrument)
70                        }
71                    }
72                })
73            })
74    }
75
76    fn new_async_instrument(
77        &self,
78        descriptor: Descriptor,
79        runner: Option<AsyncRunner>,
80    ) -> super::Result<UniqueAsyncInstrument> {
81        self.async_state
82            .lock()
83            .map_err(Into::into)
84            .and_then(|mut state| {
85                let key = UniqueInstrumentKey::from(&descriptor);
86                check_async_uniqueness(&state, &descriptor, &key).and_then(|instrument| {
87                    match instrument {
88                        Some(instrument) => Ok(instrument),
89                        None => {
90                            let instrument = self.inner.new_async_instrument(descriptor, runner)?;
91                            state.insert(key, instrument.clone());
92
93                            Ok(instrument)
94                        }
95                    }
96                })
97            })
98    }
99
100    fn new_batch_observer(&self, runner: AsyncRunner) -> Result<()> {
101        self.inner.new_batch_observer(runner)
102    }
103}
104
105fn check_sync_uniqueness(
106    instruments: &HashMap<UniqueInstrumentKey, UniqueSyncInstrument>,
107    desc: &Descriptor,
108    key: &UniqueInstrumentKey,
109) -> Result<Option<UniqueSyncInstrument>> {
110    if let Some(instrument) = instruments.get(key) {
111        if is_equal(instrument.descriptor(), desc) {
112            Ok(Some(instrument.clone()))
113        } else {
114            Err(MetricsError::MetricKindMismatch(format!(
115                "metric was {} ({}), registered as a {:?} {:?}",
116                desc.name(),
117                desc.instrumentation_name(),
118                desc.number_kind(),
119                desc.instrument_kind()
120            )))
121        }
122    } else {
123        Ok(None)
124    }
125}
126
127fn check_async_uniqueness(
128    instruments: &HashMap<UniqueInstrumentKey, UniqueAsyncInstrument>,
129    desc: &Descriptor,
130    key: &UniqueInstrumentKey,
131) -> Result<Option<UniqueAsyncInstrument>> {
132    if let Some(instrument) = instruments.get(key) {
133        if is_equal(instrument.descriptor(), desc) {
134            Ok(Some(instrument.clone()))
135        } else {
136            Err(MetricsError::MetricKindMismatch(format!(
137                "metric was {} ({}), registered as a {:?} {:?}",
138                desc.name(),
139                desc.instrumentation_name(),
140                desc.number_kind(),
141                desc.instrument_kind()
142            )))
143        }
144    } else {
145        Ok(None)
146    }
147}
148
149fn is_equal(a: &Descriptor, b: &Descriptor) -> bool {
150    a.instrument_kind() == b.instrument_kind() && a.number_kind() == b.number_kind()
151}
152
153#[derive(Debug, PartialEq, Eq, Hash)]
154struct UniqueInstrumentKey {
155    instrument_name: String,
156    instrumentation_name: String,
157}
158
159impl From<&Descriptor> for UniqueInstrumentKey {
160    fn from(desc: &Descriptor) -> Self {
161        UniqueInstrumentKey {
162            instrument_name: desc.name().to_string(),
163            instrumentation_name: desc.instrumentation_name().to_string(),
164        }
165    }
166}
167
168type UniqueSyncInstrument = Arc<dyn SyncInstrumentCore>;
169type UniqueAsyncInstrument = Arc<dyn AsyncInstrumentCore>;