use std::collections::HashMap;
use std::sync::LazyLock;
use opentelemetry::InstrumentationScope;
use opentelemetry::propagation::TextMapCompositePropagator;
use opentelemetry::trace::TracerProvider;
use parking_lot::Mutex;
use prometheus::Registry;
use tokio::task::block_in_place;
use tokio::task::spawn_blocking;
use tracing_subscriber::Layer;
use crate::metrics::aggregation::MeterProviderType;
use crate::metrics::filter::FilterMeterProvider;
use crate::metrics::meter_provider_internal;
use crate::plugins::telemetry::GLOBAL_TRACER_NAME;
use crate::plugins::telemetry::reload::otel::LayeredTracer;
use crate::plugins::telemetry::reload::otel::OPENTELEMETRY_TRACER_HANDLE;
use crate::plugins::telemetry::reload::otel::reload_fmt;
pub(crate) struct Activation {
new_trace_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
new_trace_propagator: Option<TextMapCompositePropagator>,
new_meter_providers: HashMap<MeterProviderType, FilterMeterProvider>,
prometheus_registry: Option<Registry>,
new_logging_fmt_layer: Option<Box<dyn Layer<LayeredTracer> + Send + Sync>>,
#[cfg(test)]
test_instrumentation: TestInstrumentation,
}
#[cfg(test)]
#[derive(Default, Debug, Clone)]
pub(crate) struct TestInstrumentation {
pub(crate) tracer_provider_set: bool,
pub(crate) tracer_propagator_set: bool,
pub(crate) meter_providers_added: std::collections::HashSet<MeterProviderType>,
pub(crate) prometheus_registry_set: bool,
pub(crate) logging_layer_set: bool,
}
static REGISTRY: LazyLock<Mutex<Option<Registry>>> = LazyLock::new(Default::default);
impl Activation {
pub(crate) fn new() -> Self {
Self {
new_trace_provider: None,
new_trace_propagator: None,
new_meter_providers: HashMap::default(),
prometheus_registry: REGISTRY.lock().clone(),
new_logging_fmt_layer: None,
#[cfg(test)]
test_instrumentation: TestInstrumentation::default(),
}
}
pub(crate) fn with_logging(
&mut self,
logging_layer: Box<dyn Layer<LayeredTracer> + Send + Sync>,
) {
self.new_logging_fmt_layer = Some(logging_layer);
#[cfg(test)]
{
self.test_instrumentation.logging_layer_set = true;
}
}
pub(crate) fn with_tracer_propagator(&mut self, tracer_propagator: TextMapCompositePropagator) {
self.new_trace_propagator = Some(tracer_propagator);
#[cfg(test)]
{
self.test_instrumentation.tracer_propagator_set = true;
}
}
pub(crate) fn add_meter_providers(
&mut self,
meter_providers: impl IntoIterator<Item = (MeterProviderType, FilterMeterProvider)>,
) {
for (meter_provider_type, meter_provider) in meter_providers {
self.new_meter_providers
.insert(meter_provider_type, meter_provider);
#[cfg(test)]
{
self.test_instrumentation
.meter_providers_added
.insert(meter_provider_type);
}
}
}
pub(crate) fn with_tracer_provider(
&mut self,
tracer_provider: opentelemetry_sdk::trace::SdkTracerProvider,
) {
self.new_trace_provider = Some(tracer_provider);
#[cfg(test)]
{
self.test_instrumentation.tracer_provider_set = true;
}
}
pub(crate) fn with_prometheus_registry(&mut self, prometheus_registry: Option<Registry>) {
self.prometheus_registry = prometheus_registry;
#[cfg(test)]
{
self.test_instrumentation.prometheus_registry_set = true;
}
}
pub(crate) fn prometheus_registry(&self) -> Option<Registry> {
self.prometheus_registry.clone()
}
#[cfg(test)]
pub(crate) fn test_instrumentation(&self) -> &TestInstrumentation {
&self.test_instrumentation
}
}
impl Activation {
pub(crate) fn commit(mut self) {
self.reload_tracing();
self.reload_trace_propagation();
self.reload_metrics();
self.reload_logging();
*REGISTRY.lock() = self.prometheus_registry.clone();
}
fn reload_tracing(&mut self) {
if let Some(hot_tracer) = OPENTELEMETRY_TRACER_HANDLE.get()
&& let Some(tracer_provider) = self.new_trace_provider.take()
{
let scope = InstrumentationScope::builder(GLOBAL_TRACER_NAME)
.with_version(env!("CARGO_PKG_VERSION"))
.build();
let tracer = tracer_provider.tracer_with_scope(scope);
hot_tracer.reload(tracer);
block_in_place(move || opentelemetry::global::set_tracer_provider(tracer_provider));
}
}
pub(crate) fn reload_metrics(&mut self) {
let global_meter_provider = meter_provider_internal();
for (meter_provider_type, meter_provider) in std::mem::take(&mut self.new_meter_providers) {
self.new_meter_providers.insert(
meter_provider_type,
global_meter_provider.set(meter_provider_type, meter_provider),
);
}
}
fn reload_logging(&mut self) {
if let Some(fmt_layer) = self.new_logging_fmt_layer.take() {
reload_fmt(fmt_layer);
}
}
fn reload_trace_propagation(&mut self) {
if let Some(propagator) = self.new_trace_propagator.take() {
opentelemetry::global::set_text_map_propagator(propagator);
}
}
}
impl Drop for Activation {
fn drop(&mut self) {
for meter_provider in std::mem::take(&mut self.new_meter_providers).into_values() {
spawn_blocking(move || drop(meter_provider));
}
if let Some(tracer_provider) = self.new_trace_provider.take() {
spawn_blocking(move || drop(tracer_provider));
}
}
}