logo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
//! Metrics Registry API
use crate::metrics::sdk_api::{Descriptor, SyncInstrumentCore};
use core::fmt;
use opentelemetry_api::{
    metrics::{MetricsError, Result},
    Context,
};
use std::sync::{Arc, Mutex};
use std::{any::Any, collections::HashMap};

use super::sdk_api::{AsyncInstrumentCore, InstrumentCore, MeterCore};

/// Create a new `UniqueInstrumentMeterCore` from a `InstrumentProvider`.
pub fn unique_instrument_meter_core<T>(core: T) -> UniqueInstrumentMeterCore
where
    T: AnyMeterCore + Send + Sync + 'static,
{
    UniqueInstrumentMeterCore::wrap(core)
}

/// An extension trait that allows meters to be downcast
pub trait AnyMeterCore: MeterCore {
    /// Returns the current type as [`Any`]
    fn as_any(&self) -> &dyn Any;
}

impl<T: MeterCore + 'static> AnyMeterCore for T {
    fn as_any(&self) -> &dyn Any {
        self
    }
}

/// Implements the [`MeterCore`] interface, adding uniqueness checking for
/// instrument descriptors.
pub struct UniqueInstrumentMeterCore {
    inner: Box<dyn AnyMeterCore + Send + Sync>,
    state: Mutex<HashMap<String, Arc<dyn InstrumentCore + Send + Sync>>>,
}

impl fmt::Debug for UniqueInstrumentMeterCore {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("UniqueInstrumentMeterCore")
    }
}

impl UniqueInstrumentMeterCore {
    fn wrap<T>(inner: T) -> Self
    where
        T: AnyMeterCore + Send + Sync + 'static,
    {
        UniqueInstrumentMeterCore {
            inner: Box::new(inner),
            state: Mutex::new(HashMap::default()),
        }
    }

    pub(crate) fn meter_core(&self) -> &dyn Any {
        self.inner.as_any()
    }
}

impl MeterCore for UniqueInstrumentMeterCore {
    fn new_sync_instrument(
        &self,
        descriptor: Descriptor,
    ) -> Result<Arc<dyn SyncInstrumentCore + Send + Sync>> {
        self.state.lock().map_err(Into::into).and_then(|mut state| {
            let instrument = check_uniqueness(&state, &descriptor)?;
            match instrument {
                Some(instrument) => Ok(instrument),
                None => {
                    let instrument = self.inner.new_sync_instrument(descriptor.clone())?;
                    state.insert(descriptor.name().into(), instrument.clone().as_dyn_core());

                    Ok(instrument)
                }
            }
        })
    }

    fn new_async_instrument(
        &self,
        descriptor: Descriptor,
    ) -> Result<Arc<dyn AsyncInstrumentCore + Send + Sync>> {
        self.state.lock().map_err(Into::into).and_then(|mut state| {
            let instrument = check_uniqueness(&state, &descriptor)?;
            match instrument {
                Some(instrument) => Ok(instrument),
                None => {
                    let instrument = self.inner.new_async_instrument(descriptor)?;
                    state.insert(
                        instrument.descriptor().name().into(),
                        instrument.clone().as_dyn_core(),
                    );

                    Ok(instrument)
                }
            }
        })
    }

    fn register_callback(&self, f: Box<dyn Fn(&Context) + Send + Sync>) -> Result<()> {
        self.inner.register_callback(f)
    }
}

fn check_uniqueness<T: Clone + 'static>(
    instruments: &HashMap<String, Arc<dyn InstrumentCore + Send + Sync>>,
    descriptor: &Descriptor,
) -> Result<Option<T>> {
    if let Some(instrument) = instruments.get(descriptor.name()) {
        if is_equal(instrument.descriptor(), descriptor) {
            Ok(instrument.as_any().downcast_ref::<T>().cloned())
        } else {
            Err(MetricsError::MetricKindMismatch(format!(
                "metric {} registered as a {:?} {:?}",
                descriptor.name(),
                descriptor.number_kind(),
                descriptor.instrument_kind()
            )))
        }
    } else {
        Ok(None)
    }
}

fn is_equal(a: &Descriptor, b: &Descriptor) -> bool {
    a.instrument_kind() == b.instrument_kind() && a.number_kind() == b.number_kind()
}