use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::mem;
use std::sync::Arc;
use std::time::Duration;
use derive_more::From;
use opentelemetry::InstrumentationScope;
use opentelemetry::KeyValue;
use opentelemetry::metrics::AsyncInstrument;
use opentelemetry::metrics::AsyncInstrumentBuilder;
use opentelemetry::metrics::Counter;
use opentelemetry::metrics::Gauge;
use opentelemetry::metrics::Histogram;
use opentelemetry::metrics::HistogramBuilder;
use opentelemetry::metrics::InstrumentBuilder;
use opentelemetry::metrics::InstrumentProvider;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::MeterProvider;
use opentelemetry::metrics::ObservableCounter;
use opentelemetry::metrics::ObservableGauge;
use opentelemetry::metrics::ObservableUpDownCounter;
use opentelemetry::metrics::SyncInstrument;
use opentelemetry::metrics::UpDownCounter;
use opentelemetry_sdk::error::OTelSdkError;
use opentelemetry_sdk::error::OTelSdkResult;
use parking_lot::Mutex;
use strum::Display;
use strum::EnumCount;
use strum::EnumIter;
use strum::FromRepr;
use strum::IntoEnumIterator;
use super::NoopInstrumentProvider;
use crate::metrics::filter::FilterMeterProvider;
#[derive(
Hash, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug, EnumCount, EnumIter, Display, FromRepr,
)]
#[repr(u8)]
pub(crate) enum MeterProviderType {
Apollo,
ApolloRealtime,
Public,
OtelDefault,
}
#[derive(Clone)]
pub(crate) struct AggregateMeterProvider {
inner: Arc<Mutex<Option<Inner>>>,
}
impl Default for AggregateMeterProvider {
fn default() -> Self {
AggregateMeterProvider {
inner: Arc::new(Mutex::new(Some(Inner::default()))),
}
}
}
pub(crate) struct Inner {
providers: Vec<(FilterMeterProvider, HashMap<MeterId, Meter>)>,
registered_instruments: Vec<InstrumentWrapper>,
observable_registries: Arc<SharedObservableRegistries>,
}
impl Default for Inner {
fn default() -> Self {
Inner {
providers: (0..MeterProviderType::COUNT)
.map(|_| (FilterMeterProvider::noop(), HashMap::new()))
.collect(),
registered_instruments: Vec::new(),
observable_registries: Arc::new(SharedObservableRegistries::new(
MeterProviderType::COUNT,
)),
}
}
}
impl Drop for Inner {
fn drop(&mut self) {
let noop = tracing::subscriber::NoSubscriber::new();
tracing::subscriber::with_default(noop, || {
self.providers.clear();
});
}
}
#[derive(From)]
pub(crate) enum InstrumentWrapper {
U64Counter {
_keep_alive: Arc<Counter<u64>>,
},
F64Counter {
_keep_alive: Arc<Counter<f64>>,
},
I64UpDownCounter {
_keep_alive: Arc<UpDownCounter<i64>>,
},
F64UpDownCounter {
_keep_alive: Arc<UpDownCounter<f64>>,
},
I64Histogram {
_keep_alive: Arc<Histogram<i64>>,
},
U64Histogram {
_keep_alive: Arc<Histogram<u64>>,
},
F64Histogram {
_keep_alive: Arc<Histogram<f64>>,
},
}
#[derive(Eq, PartialEq, Hash, Clone)]
struct MeterId {
name: Cow<'static, str>,
version: Option<Cow<'static, str>>,
schema_url: Option<Cow<'static, str>>,
}
impl AggregateMeterProvider {
pub(crate) fn set(
&self,
meter_provider_type: MeterProviderType,
meter_provider: FilterMeterProvider,
) -> FilterMeterProvider {
let mut guard = self.inner.lock();
let inner = guard
.as_mut()
.expect("cannot use meter provider after shutdown");
inner.invalidate();
inner
.observable_registries
.clear_provider(meter_provider_type);
let mut swap = (meter_provider, HashMap::new());
mem::swap(
&mut inner.providers[meter_provider_type as usize],
&mut swap,
);
drop(guard);
swap.0
}
#[cfg(test)]
pub(crate) fn invalidate(&self) {
if let Some(inner) = self.inner.lock().as_mut() {
inner.invalidate();
}
}
pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
fn tag_error_with_provider_type(err: OTelSdkError, index: usize) -> OTelSdkError {
let OTelSdkError::InternalFailure(message) = &err else {
return err;
};
let Ok(ty) = u8::try_from(index) else {
return err;
};
let Some(ty) = MeterProviderType::from_repr(ty) else {
return err;
};
OTelSdkError::InternalFailure(format!("[{ty}] {message}"))
}
let mut guard = self.inner.lock();
let old = guard.take();
drop(guard);
let Some(inner) = old else { return Ok(()) };
let mut result = Ok(());
for (index, (provider, _)) in inner.providers.iter().enumerate() {
let Err(err) = provider.shutdown_with_timeout(timeout) else {
continue;
};
let err = tag_error_with_provider_type(err, index);
result = match (result, err) {
(
Err(OTelSdkError::InternalFailure(old_error)),
OTelSdkError::InternalFailure(new_error),
) => Err(OTelSdkError::InternalFailure(format!(
"{old_error}\n{new_error}"
))),
(result @ Err(OTelSdkError::InternalFailure(_)), _) => result,
(_, err @ OTelSdkError::InternalFailure(_)) => Err(err),
(result @ Err(_), _) => result,
(Ok(_), err) => Err(err),
};
}
Ok(())
}
pub(crate) fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
pub(crate) fn create_registered_instrument<T>(
&self,
create_fn: impl Fn(&mut Inner) -> T,
) -> Arc<T>
where
Arc<T>: Into<InstrumentWrapper>,
{
let mut guard = self.inner.lock();
let inner = guard
.as_mut()
.expect("cannot use meter provider after shutdown");
inner.create_registered_instrument(create_fn)
}
#[cfg(test)]
pub(crate) fn registered_instruments(&self) -> usize {
self.inner
.lock()
.as_ref()
.expect("cannot use meter provider after shutdown")
.registered_instruments
.len()
}
}
impl Inner {
pub(crate) fn invalidate(&mut self) {
self.registered_instruments.clear()
}
pub(crate) fn meter(&mut self, name: impl Into<Cow<'static, str>>) -> Meter {
let scope = InstrumentationScope::builder(name).build();
self.meter_with_scope(scope)
}
pub(crate) fn meter_with_scope(&mut self, scope: InstrumentationScope) -> Meter {
let name: Cow<'static, str> = Cow::Owned(scope.name().to_string());
let version: Option<Cow<'static, str>> = scope.version().map(|v| Cow::Owned(v.to_string()));
let schema_url: Option<Cow<'static, str>> =
scope.schema_url().map(|v| Cow::Owned(v.to_string()));
let mut meters = Vec::with_capacity(self.providers.len());
for (provider, existing_meters) in &mut self.providers {
meters.push(
existing_meters
.entry(MeterId {
name: name.clone(),
version: version.clone(),
schema_url: schema_url.clone(),
})
.or_insert_with(|| provider.meter_with_scope(scope.clone()))
.clone(),
);
}
Meter::new(Arc::new(AggregateInstrumentProvider {
meters,
registries: Arc::clone(&self.observable_registries),
}))
}
pub(crate) fn create_registered_instrument<T>(
&mut self,
create_fn: impl Fn(&mut Inner) -> T,
) -> Arc<T>
where
Arc<T>: Into<InstrumentWrapper>,
{
let instrument = Arc::new((create_fn)(self));
self.registered_instruments.push(instrument.clone().into());
instrument
}
}
impl MeterProvider for AggregateMeterProvider {
fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter {
let mut inner = self.inner.lock();
if let Some(inner) = inner.as_mut() {
inner.meter_with_scope(scope)
} else {
Meter::new(Arc::new(NoopInstrumentProvider))
}
}
}
pub(crate) struct AggregateInstrumentProvider {
meters: Vec<Meter>,
registries: Arc<SharedObservableRegistries>,
}
pub(crate) struct AggregateCounter<T> {
delegates: Vec<Counter<T>>,
}
impl<T: Copy> SyncInstrument<T> for AggregateCounter<T> {
fn measure(&self, value: T, attributes: &[KeyValue]) {
for counter in &self.delegates {
counter.add(value, attributes)
}
}
}
pub(crate) struct AggregateHistogram<T> {
delegates: Vec<Histogram<T>>,
}
impl<T: Copy> SyncInstrument<T> for AggregateHistogram<T> {
fn measure(&self, value: T, attributes: &[KeyValue]) {
for histogram in &self.delegates {
histogram.record(value, attributes)
}
}
}
pub(crate) struct AggregateUpDownCounter<T> {
delegates: Vec<UpDownCounter<T>>,
}
impl<T: Copy> SyncInstrument<T> for AggregateUpDownCounter<T> {
fn measure(&self, value: T, attributes: &[KeyValue]) {
for counter in &self.delegates {
counter.add(value, attributes)
}
}
}
pub(crate) struct AggregateGauge<T> {
delegates: Vec<Gauge<T>>,
}
impl<T: Copy> SyncInstrument<T> for AggregateGauge<T> {
fn measure(&self, value: T, attributes: &[KeyValue]) {
for gauge in &self.delegates {
gauge.record(value, attributes)
}
}
}
type ObservableCallback<T> = Arc<dyn Fn(&dyn AsyncInstrument<T>) + Send + Sync>;
struct ObservableCallbackRegistry<T: Send + Sync + 'static> {
callbacks: Mutex<HashMap<String, Vec<ObservableCallback<T>>>>,
registered: Mutex<HashSet<(MeterProviderType, String)>>,
}
impl<T: Send + Sync + 'static> ObservableCallbackRegistry<T> {
fn new() -> Self {
Self {
callbacks: Mutex::new(HashMap::new()),
registered: Mutex::new(HashSet::new()),
}
}
fn register_callback(&self, instrument_name: &str, callback: ObservableCallback<T>) {
let mut callbacks = self.callbacks.lock();
callbacks
.entry(instrument_name.to_string())
.or_default()
.push(callback);
}
fn invoke_callback(&self, instrument_name: &str, observer: &dyn AsyncInstrument<T>) {
let callbacks = self.callbacks.lock();
if let Some(callback_list) = callbacks.get(instrument_name) {
for callback in callback_list {
callback(observer);
}
}
}
fn try_register_for_provider(
&self,
meter_provider_type: MeterProviderType,
instrument_name: String,
) -> bool {
self.registered
.lock()
.insert((meter_provider_type, instrument_name))
}
fn clear_provider_registrations(&self, meter_provider_type: MeterProviderType) {
let mut registered = self.registered.lock();
registered.retain(|(provider_type, _)| *provider_type != meter_provider_type);
}
fn clear_callbacks(&self) {
self.callbacks.lock().clear();
}
}
pub(crate) struct SharedObservableRegistries {
u64_gauge: ObservableCallbackRegistry<u64>,
i64_gauge: ObservableCallbackRegistry<i64>,
f64_gauge: ObservableCallbackRegistry<f64>,
u64_counter: ObservableCallbackRegistry<u64>,
f64_counter: ObservableCallbackRegistry<f64>,
i64_up_down_counter: ObservableCallbackRegistry<i64>,
f64_up_down_counter: ObservableCallbackRegistry<f64>,
}
impl SharedObservableRegistries {
fn new(_num_providers: usize) -> Self {
Self {
u64_gauge: ObservableCallbackRegistry::new(),
i64_gauge: ObservableCallbackRegistry::new(),
f64_gauge: ObservableCallbackRegistry::new(),
u64_counter: ObservableCallbackRegistry::new(),
f64_counter: ObservableCallbackRegistry::new(),
i64_up_down_counter: ObservableCallbackRegistry::new(),
f64_up_down_counter: ObservableCallbackRegistry::new(),
}
}
fn clear_provider(&self, meter_provider_type: MeterProviderType) {
self.u64_gauge
.clear_provider_registrations(meter_provider_type);
self.i64_gauge
.clear_provider_registrations(meter_provider_type);
self.f64_gauge
.clear_provider_registrations(meter_provider_type);
self.u64_counter
.clear_provider_registrations(meter_provider_type);
self.f64_counter
.clear_provider_registrations(meter_provider_type);
self.i64_up_down_counter
.clear_provider_registrations(meter_provider_type);
self.f64_up_down_counter
.clear_provider_registrations(meter_provider_type);
self.u64_gauge.clear_callbacks();
self.i64_gauge.clear_callbacks();
self.f64_gauge.clear_callbacks();
self.u64_counter.clear_callbacks();
self.f64_counter.clear_callbacks();
self.i64_up_down_counter.clear_callbacks();
self.f64_up_down_counter.clear_callbacks();
}
}
macro_rules! aggregate_instrument_fn {
($name:ident, $ty:ty, $wrapper:ident, $implementation:ident) => {
fn $name(&self, builder: InstrumentBuilder<'_, $wrapper<$ty>>) -> $wrapper<$ty> {
let delegates: Vec<$wrapper<$ty>> = self
.meters
.iter()
.map(|meter| {
let mut b = meter.$name(builder.name.clone());
if let Some(description) = &builder.description {
b = b.with_description(description.clone());
}
if let Some(unit) = &builder.unit {
b = b.with_unit(unit.clone());
}
b.build()
})
.collect();
$wrapper::new(Arc::new($implementation { delegates }))
}
};
}
macro_rules! aggregate_histogram_fn {
($name:ident, $ty:ty, $wrapper:ident, $implementation:ident) => {
fn $name(&self, builder: HistogramBuilder<'_, $wrapper<$ty>>) -> $wrapper<$ty> {
let delegates: Vec<$wrapper<$ty>> = self
.meters
.iter()
.map(|meter| {
let mut b = meter.$name(builder.name.clone());
if let Some(description) = &builder.description {
b = b.with_description(description.clone());
}
if let Some(unit) = &builder.unit {
b = b.with_unit(unit.clone());
}
if let Some(boundaries) = &builder.boundaries {
b = b.with_boundaries(boundaries.clone());
}
b.build()
})
.collect();
$wrapper::new(Arc::new($implementation { delegates }))
}
};
}
macro_rules! aggregate_observable_gauge_fn {
($name:ident, $ty:ty, $registry:ident) => {
fn $name(
&self,
builder: AsyncInstrumentBuilder<'_, ObservableGauge<$ty>, $ty>,
) -> ObservableGauge<$ty> {
let gauge_name = builder.name.to_string();
let description = builder.description.as_ref().map(|s| s.to_string());
let unit = builder.unit.as_ref().map(|s| s.to_string());
let shared_callbacks: Vec<ObservableCallback<$ty>> =
builder.callbacks.into_iter().map(Arc::from).collect();
if shared_callbacks.is_empty() {
return ObservableGauge::new();
}
for callback in shared_callbacks {
self.registries
.$registry
.register_callback(&gauge_name, callback);
}
for (meter, meter_provider_type) in self.meters.iter().zip(MeterProviderType::iter()) {
if !self
.registries
.$registry
.try_register_for_provider(meter_provider_type, gauge_name.clone())
{
continue;
}
let mut b = meter.$name(gauge_name.clone());
if let Some(desc) = &description {
b = b.with_description(desc.clone());
}
if let Some(u) = &unit {
b = b.with_unit(u.clone());
}
let registry = Arc::clone(&self.registries);
let name = gauge_name.clone();
b = b.with_callback(move |observer| {
registry.$registry.invoke_callback(&name, observer);
});
let _ = b.build();
}
ObservableGauge::new()
}
};
}
macro_rules! aggregate_observable_counter_fn {
($name:ident, $ty:ty, $wrapper:ident, $registry:ident) => {
fn $name(&self, builder: AsyncInstrumentBuilder<'_, $wrapper<$ty>, $ty>) -> $wrapper<$ty> {
let instrument_name = builder.name.to_string();
let description = builder.description.as_ref().map(|s| s.to_string());
let unit = builder.unit.as_ref().map(|s| s.to_string());
let shared_callbacks: Vec<ObservableCallback<$ty>> =
builder.callbacks.into_iter().map(Arc::from).collect();
if shared_callbacks.is_empty() {
return $wrapper::new();
}
for callback in shared_callbacks {
self.registries
.$registry
.register_callback(&instrument_name, callback);
}
for (meter, meter_provider_type) in self.meters.iter().zip(MeterProviderType::iter()) {
if !self
.registries
.$registry
.try_register_for_provider(meter_provider_type, instrument_name.clone())
{
continue;
}
let mut b = meter.$name(instrument_name.clone());
if let Some(desc) = &description {
b = b.with_description(desc.clone());
}
if let Some(u) = &unit {
b = b.with_unit(u.clone());
}
let registry = Arc::clone(&self.registries);
let name = instrument_name.clone();
b = b.with_callback(move |observer| {
registry.$registry.invoke_callback(&name, observer);
});
let _ = b.build();
}
$wrapper::new()
}
};
}
impl InstrumentProvider for AggregateInstrumentProvider {
aggregate_instrument_fn!(u64_counter, u64, Counter, AggregateCounter);
aggregate_instrument_fn!(f64_counter, f64, Counter, AggregateCounter);
aggregate_observable_counter_fn!(f64_observable_counter, f64, ObservableCounter, f64_counter);
aggregate_observable_counter_fn!(u64_observable_counter, u64, ObservableCounter, u64_counter);
aggregate_histogram_fn!(u64_histogram, u64, Histogram, AggregateHistogram);
aggregate_histogram_fn!(f64_histogram, f64, Histogram, AggregateHistogram);
aggregate_instrument_fn!(
i64_up_down_counter,
i64,
UpDownCounter,
AggregateUpDownCounter
);
aggregate_instrument_fn!(
f64_up_down_counter,
f64,
UpDownCounter,
AggregateUpDownCounter
);
aggregate_instrument_fn!(u64_gauge, u64, Gauge, AggregateGauge);
aggregate_instrument_fn!(i64_gauge, i64, Gauge, AggregateGauge);
aggregate_instrument_fn!(f64_gauge, f64, Gauge, AggregateGauge);
aggregate_observable_counter_fn!(
i64_observable_up_down_counter,
i64,
ObservableUpDownCounter,
i64_up_down_counter
);
aggregate_observable_counter_fn!(
f64_observable_up_down_counter,
f64,
ObservableUpDownCounter,
f64_up_down_counter
);
aggregate_observable_gauge_fn!(f64_observable_gauge, f64, f64_gauge);
aggregate_observable_gauge_fn!(i64_observable_gauge, i64, i64_gauge);
aggregate_observable_gauge_fn!(u64_observable_gauge, u64, u64_gauge);
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use std::sync::Weak;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::time::Duration;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::metrics::InstrumentKind;
use opentelemetry_sdk::metrics::ManualReader;
use opentelemetry_sdk::metrics::MeterProviderBuilder;
use opentelemetry_sdk::metrics::Pipeline;
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::metrics::data::AggregatedMetrics;
use opentelemetry_sdk::metrics::data::MetricData;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader;
use opentelemetry_sdk::metrics::reader::MetricReader;
use opentelemetry_sdk::runtime;
use crate::metrics::aggregation::AggregateMeterProvider;
use crate::metrics::aggregation::MeterProviderType;
use crate::metrics::filter::FilterMeterProvider;
#[derive(Clone, Debug)]
struct SharedReader(Arc<ManualReader>);
impl MetricReader for SharedReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.0.register_pipeline(pipeline)
}
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
self.0.collect(rm)
}
fn force_flush(&self) -> OTelSdkResult {
self.0.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.0.shutdown_with_timeout(timeout)
}
fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.0.temporality(kind)
}
}
#[test]
fn test_i64_gauge_callback_invocation() {
let reader = SharedReader(Arc::new(ManualReader::builder().build()));
let delegate = MeterProviderBuilder::default()
.with_reader(reader.clone())
.build();
let meter_provider = AggregateMeterProvider::default();
meter_provider.set(
MeterProviderType::Public,
FilterMeterProvider::public(delegate),
);
let meter = meter_provider.meter("test");
let observe_counter = Arc::new(AtomicI64::new(0));
let callback_observe_counter = observe_counter.clone();
let _gauge = meter
.i64_observable_gauge("test")
.with_callback(move |i| {
let count =
callback_observe_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
i.observe(count + 1, &[])
})
.build();
let mut result = ResourceMetrics::default();
reader
.collect(&mut result)
.expect("metrics must be collected");
reader
.collect(&mut result)
.expect("metrics must be collected");
assert_eq!(get_gauge_value(&result), 2);
assert_eq!(observe_counter.load(std::sync::atomic::Ordering::SeqCst), 2);
}
#[test]
fn test_i64_gauge_multiple_callbacks() {
let reader = SharedReader(Arc::new(ManualReader::builder().build()));
let delegate = MeterProviderBuilder::default()
.with_reader(reader.clone())
.build();
let meter_provider = AggregateMeterProvider::default();
meter_provider.set(
MeterProviderType::Public,
FilterMeterProvider::public(delegate),
);
let meter = meter_provider.meter("test");
let observe_counter = Arc::new(AtomicI64::new(0));
let callback_observe_counter1 = observe_counter.clone();
let _gauge1 = meter
.i64_observable_gauge("test")
.with_callback(move |i| {
let count =
callback_observe_counter1.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
i.observe(count + 1, &[])
})
.build();
let mut result = ResourceMetrics::default();
reader
.collect(&mut result)
.expect("metrics must be collected");
assert_eq!(get_gauge_value(&result), 1);
assert_eq!(observe_counter.load(std::sync::atomic::Ordering::SeqCst), 1);
}
fn get_gauge_value(result: &ResourceMetrics) -> i64 {
let scope_metrics: Vec<_> = result.scope_metrics().collect();
assert_eq!(scope_metrics.len(), 1);
let metrics: Vec<_> = scope_metrics.first().unwrap().metrics().collect();
assert_eq!(metrics.len(), 1);
let metric = metrics.first().unwrap();
if let AggregatedMetrics::I64(MetricData::Gauge(gauge)) = metric.data() {
assert_eq!(gauge.data_points().count(), 1);
gauge.data_points().next().unwrap().value()
} else {
panic!("Expected i64 gauge")
}
}
#[test]
fn test_otel_default_meter_provider() {
let reader = SharedReader(Arc::new(ManualReader::builder().build()));
let delegate = MeterProviderBuilder::default()
.with_reader(reader.clone())
.build();
let meter_provider = AggregateMeterProvider::default();
meter_provider.set(
MeterProviderType::OtelDefault,
FilterMeterProvider::public(delegate),
);
let counter = meter_provider
.meter("test")
.u64_counter("test.counter")
.build();
counter.add(1, &[]);
let mut resource_metrics = ResourceMetrics::default();
reader.collect(&mut resource_metrics).unwrap();
assert_eq!(1, resource_metrics.scope_metrics().count());
}
struct TestExporter {
meter_provider: AggregateMeterProvider,
shutdown: Arc<AtomicBool>,
}
impl PushMetricExporter for TestExporter {
fn export(
&self,
_metrics: &ResourceMetrics,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
self.count();
std::future::ready(Ok(()))
}
fn force_flush(&self) -> OTelSdkResult {
self.count();
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.count();
self.shutdown
.store(true, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
fn temporality(&self) -> Temporality {
Temporality::Cumulative
}
}
impl TestExporter {
fn count(&self) {
let counter = self
.meter_provider
.meter("test")
.u64_counter("test.counter")
.build();
counter.add(1, &[]);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_shutdown_exporter_metrics() {
let meter_provider = AggregateMeterProvider::default();
let shutdown = Arc::new(AtomicBool::new(false));
let periodic_reader = reader(&meter_provider, &shutdown);
let delegate = MeterProviderBuilder::default()
.with_reader(periodic_reader)
.build();
meter_provider.set(
MeterProviderType::OtelDefault,
FilterMeterProvider::public(delegate),
);
tokio::time::sleep(Duration::from_millis(20)).await;
meter_provider.shutdown().unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(shutdown.load(std::sync::atomic::Ordering::SeqCst));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_reload_exporter_metrics() {
let meter_provider = AggregateMeterProvider::default();
let shutdown1 = Arc::new(AtomicBool::new(false));
let periodic_reader = reader(&meter_provider, &shutdown1);
let delegate = MeterProviderBuilder::default()
.with_reader(periodic_reader)
.build();
meter_provider.set(
MeterProviderType::OtelDefault,
FilterMeterProvider::public(delegate),
);
tokio::time::sleep(Duration::from_millis(20)).await;
let shutdown2 = Arc::new(AtomicBool::new(false));
let periodic_reader = reader(&meter_provider, &shutdown2);
let delegate = MeterProviderBuilder::default()
.with_reader(periodic_reader)
.build();
meter_provider.set(
MeterProviderType::OtelDefault,
FilterMeterProvider::public(delegate),
);
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(shutdown1.load(std::sync::atomic::Ordering::SeqCst));
assert!(!shutdown2.load(std::sync::atomic::Ordering::SeqCst));
}
fn reader(
meter_provider: &AggregateMeterProvider,
shutdown: &Arc<AtomicBool>,
) -> PeriodicReader<TestExporter> {
PeriodicReader::builder(
TestExporter {
meter_provider: meter_provider.clone(),
shutdown: shutdown.clone(),
},
runtime::Tokio,
)
.with_interval(Duration::from_millis(10))
.build()
}
}