opentelemetry_spanprocessor_any/metrics/
registry.rs1use crate::{
3 metrics::{
4 sdk_api::{AsyncInstrumentCore, MeterCore, SyncInstrumentCore},
5 Meter, MeterProvider,
6 },
7 metrics::{AsyncRunner, Descriptor, Measurement, MetricsError, Result},
8 Context, KeyValue,
9};
10use std::collections::HashMap;
11use std::sync::{Arc, Mutex};
12
13pub fn meter_provider(core: Arc<dyn MeterCore + Send + Sync>) -> RegistryMeterProvider {
15 RegistryMeterProvider(Arc::new(UniqueInstrumentMeterCore::wrap(core)))
16}
17
18#[derive(Debug, Clone)]
20pub struct RegistryMeterProvider(Arc<dyn MeterCore + Send + Sync>);
21
22impl MeterProvider for RegistryMeterProvider {
23 fn meter(&self, name: &'static str, version: Option<&'static str>) -> Meter {
24 Meter::new(name, version, self.0.clone())
25 }
26}
27
28#[derive(Debug)]
29struct UniqueInstrumentMeterCore {
30 inner: Arc<dyn MeterCore + Send + Sync>,
31 sync_state: Mutex<HashMap<UniqueInstrumentKey, UniqueSyncInstrument>>,
32 async_state: Mutex<HashMap<UniqueInstrumentKey, UniqueAsyncInstrument>>,
33}
34
35impl UniqueInstrumentMeterCore {
36 fn wrap(inner: Arc<dyn MeterCore + Send + Sync>) -> Self {
37 UniqueInstrumentMeterCore {
38 inner,
39 sync_state: Mutex::new(HashMap::default()),
40 async_state: Mutex::new(HashMap::default()),
41 }
42 }
43}
44
45impl MeterCore for UniqueInstrumentMeterCore {
46 fn record_batch_with_context(
47 &self,
48 cx: &Context,
49 attributes: &[KeyValue],
50 measurements: Vec<Measurement>,
51 ) {
52 self.inner
53 .record_batch_with_context(cx, attributes, measurements)
54 }
55
56 fn new_sync_instrument(&self, descriptor: Descriptor) -> Result<UniqueSyncInstrument> {
57 self.sync_state
58 .lock()
59 .map_err(Into::into)
60 .and_then(|mut state| {
61 let key = UniqueInstrumentKey::from(&descriptor);
62 check_sync_uniqueness(&state, &descriptor, &key).and_then(|instrument| {
63 match instrument {
64 Some(instrument) => Ok(instrument),
65 None => {
66 let instrument = self.inner.new_sync_instrument(descriptor)?;
67 state.insert(key, instrument.clone());
68
69 Ok(instrument)
70 }
71 }
72 })
73 })
74 }
75
76 fn new_async_instrument(
77 &self,
78 descriptor: Descriptor,
79 runner: Option<AsyncRunner>,
80 ) -> super::Result<UniqueAsyncInstrument> {
81 self.async_state
82 .lock()
83 .map_err(Into::into)
84 .and_then(|mut state| {
85 let key = UniqueInstrumentKey::from(&descriptor);
86 check_async_uniqueness(&state, &descriptor, &key).and_then(|instrument| {
87 match instrument {
88 Some(instrument) => Ok(instrument),
89 None => {
90 let instrument = self.inner.new_async_instrument(descriptor, runner)?;
91 state.insert(key, instrument.clone());
92
93 Ok(instrument)
94 }
95 }
96 })
97 })
98 }
99
100 fn new_batch_observer(&self, runner: AsyncRunner) -> Result<()> {
101 self.inner.new_batch_observer(runner)
102 }
103}
104
105fn check_sync_uniqueness(
106 instruments: &HashMap<UniqueInstrumentKey, UniqueSyncInstrument>,
107 desc: &Descriptor,
108 key: &UniqueInstrumentKey,
109) -> Result<Option<UniqueSyncInstrument>> {
110 if let Some(instrument) = instruments.get(key) {
111 if is_equal(instrument.descriptor(), desc) {
112 Ok(Some(instrument.clone()))
113 } else {
114 Err(MetricsError::MetricKindMismatch(format!(
115 "metric was {} ({}), registered as a {:?} {:?}",
116 desc.name(),
117 desc.instrumentation_name(),
118 desc.number_kind(),
119 desc.instrument_kind()
120 )))
121 }
122 } else {
123 Ok(None)
124 }
125}
126
127fn check_async_uniqueness(
128 instruments: &HashMap<UniqueInstrumentKey, UniqueAsyncInstrument>,
129 desc: &Descriptor,
130 key: &UniqueInstrumentKey,
131) -> Result<Option<UniqueAsyncInstrument>> {
132 if let Some(instrument) = instruments.get(key) {
133 if is_equal(instrument.descriptor(), desc) {
134 Ok(Some(instrument.clone()))
135 } else {
136 Err(MetricsError::MetricKindMismatch(format!(
137 "metric was {} ({}), registered as a {:?} {:?}",
138 desc.name(),
139 desc.instrumentation_name(),
140 desc.number_kind(),
141 desc.instrument_kind()
142 )))
143 }
144 } else {
145 Ok(None)
146 }
147}
148
149fn is_equal(a: &Descriptor, b: &Descriptor) -> bool {
150 a.instrument_kind() == b.instrument_kind() && a.number_kind() == b.number_kind()
151}
152
153#[derive(Debug, PartialEq, Eq, Hash)]
154struct UniqueInstrumentKey {
155 instrument_name: String,
156 instrumentation_name: String,
157}
158
159impl From<&Descriptor> for UniqueInstrumentKey {
160 fn from(desc: &Descriptor) -> Self {
161 UniqueInstrumentKey {
162 instrument_name: desc.name().to_string(),
163 instrumentation_name: desc.instrumentation_name().to_string(),
164 }
165 }
166}
167
168type UniqueSyncInstrument = Arc<dyn SyncInstrumentCore>;
169type UniqueAsyncInstrument = Arc<dyn AsyncInstrumentCore>;