use opentelemetry::global;
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::{LoggerProviderBuilder, SdkLoggerProvider};
use opentelemetry_sdk::metrics::{Instrument, MeterProviderBuilder, SdkMeterProvider, Stream};
use opentelemetry_sdk::trace::{
SdkTracerProvider, SpanExporter, SpanProcessor, TracerProviderBuilder,
};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use crate::config::OpenTelemetryConfig;
use crate::{InitError, providers};
use tokio::runtime::RuntimeFlavor;
pub struct TelemetryBuilder {
config: OpenTelemetryConfig,
set_global_tracer: bool,
set_global_meter: bool,
set_global_propagator: bool,
enable_log_bridge: bool,
enable_tracing_bridge: bool,
resource_builder: Option<opentelemetry_sdk::resource::ResourceBuilder>,
tracer_builder: Option<TracerProviderBuilder>,
meter_builder: Option<MeterProviderBuilder>,
logger_builder: Option<LoggerProviderBuilder>,
}
impl TelemetryBuilder {
pub fn new(config: OpenTelemetryConfig) -> Self {
Self {
config,
set_global_tracer: false,
set_global_meter: false,
set_global_propagator: false,
enable_log_bridge: false,
enable_tracing_bridge: false,
resource_builder: None,
tracer_builder: None,
meter_builder: None,
logger_builder: None,
}
}
fn tracer_builder(&mut self) -> TracerProviderBuilder {
self.tracer_builder
.take()
.unwrap_or_else(SdkTracerProvider::builder)
}
fn meter_builder(&mut self) -> MeterProviderBuilder {
self.meter_builder
.take()
.unwrap_or_else(SdkMeterProvider::builder)
}
fn logger_builder(&mut self) -> LoggerProviderBuilder {
self.logger_builder
.take()
.unwrap_or_else(SdkLoggerProvider::builder)
}
pub fn with_global_tracer_provider(mut self) -> Self {
self.set_global_tracer = true;
self
}
pub fn with_global_meter_provider(mut self) -> Self {
self.set_global_meter = true;
self
}
pub fn with_global_propagator(mut self) -> Self {
self.set_global_propagator = true;
self
}
pub fn with_globals(self) -> Self {
self.with_global_tracer_provider()
.with_global_meter_provider()
.with_log_bridge()
.with_tracing_bridge()
.with_global_propagator()
}
pub fn with_log_bridge(mut self) -> Self {
self.enable_log_bridge = true;
self
}
pub fn with_tracing_bridge(mut self) -> Self {
self.enable_tracing_bridge = true;
self
}
pub fn with_resource_builder(
mut self,
builder: opentelemetry_sdk::resource::ResourceBuilder,
) -> Self {
self.resource_builder = Some(builder);
self
}
pub fn with_span_processor<T: SpanProcessor + 'static>(mut self, processor: T) -> Self {
self.tracer_builder = Some(self.tracer_builder().with_span_processor(processor));
self
}
pub fn with_batch_span_exporter<T: SpanExporter + 'static>(mut self, exporter: T) -> Self {
self.tracer_builder = Some(self.tracer_builder().with_batch_exporter(exporter));
self
}
pub fn with_simple_span_exporter<T: SpanExporter + 'static>(mut self, exporter: T) -> Self {
self.tracer_builder = Some(self.tracer_builder().with_simple_exporter(exporter));
self
}
pub fn with_view<F>(mut self, view: F) -> Self
where
F: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
{
self.meter_builder = Some(self.meter_builder().with_view(view));
self
}
pub fn with_periodic_reader<E>(
mut self,
reader: opentelemetry_sdk::metrics::PeriodicReader<E>,
) -> Self
where
E: opentelemetry_sdk::metrics::exporter::PushMetricExporter,
{
self.meter_builder = Some(self.meter_builder().with_reader(reader));
self
}
pub fn with_batch_log_exporter<T>(mut self, exporter: T) -> Self
where
T: opentelemetry_sdk::logs::LogExporter + 'static,
{
self.logger_builder = Some(self.logger_builder().with_batch_exporter(exporter));
self
}
pub fn with_simple_log_exporter<T>(mut self, exporter: T) -> Self
where
T: opentelemetry_sdk::logs::LogExporter + 'static,
{
self.logger_builder = Some(self.logger_builder().with_simple_exporter(exporter));
self
}
pub fn build(self) -> Result<Telemetry, InitError> {
if self.config.disabled == Some(true) {
return Ok(Telemetry {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
});
}
let config_resource: Resource = (&self.config.resource).into();
let resource = match self.resource_builder {
Some(builder) => {
let config_attrs: Vec<_> = config_resource
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect();
Resource::builder_empty()
.with_attributes(config_attrs)
.with_attributes(
builder
.build()
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())),
)
.build()
}
None => config_resource,
};
let builder = self
.tracer_builder
.unwrap_or_else(SdkTracerProvider::builder);
let tracer_provider =
providers::traces::build_tracer_provider(&self.config, resource.clone(), builder)?;
let builder = self.meter_builder.unwrap_or_else(SdkMeterProvider::builder);
let meter_provider =
providers::metrics::build_meter_provider(&self.config, resource.clone(), builder)?;
let builder = self
.logger_builder
.unwrap_or_else(SdkLoggerProvider::builder);
let logger_provider =
providers::logs::build_logger_provider(&self.config, resource, builder)?;
#[cfg(not(test))]
{
if self.set_global_tracer {
global::set_tracer_provider(tracer_provider.clone());
}
if self.set_global_meter {
global::set_meter_provider(meter_provider.clone());
}
if self.set_global_propagator {
global::set_text_map_propagator(
opentelemetry::propagation::TextMapCompositePropagator::from(
&self.config.propagator,
),
);
}
if self.enable_log_bridge {
setup_log_bridge(&logger_provider)?;
}
if self.enable_tracing_bridge {
setup_tracing_bridge(&logger_provider)?;
}
}
Ok(Telemetry {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
})
}
}
fn setup_log_bridge(logger_provider: &SdkLoggerProvider) -> Result<(), InitError> {
let otel_log_bridge = OpenTelemetryLogBridge::new(logger_provider);
let mut builder = env_filter::Builder::new();
builder.filter_level(log::LevelFilter::Info);
builder.filter_module("opentelemetry", log::LevelFilter::Off);
builder.filter_module("opentelemetry_sdk", log::LevelFilter::Off);
if let Ok(rust_log) = std::env::var("RUST_LOG") {
builder.parse(&rust_log);
}
let filter = builder.build();
let max_level = filter.filter();
let logger = env_filter::FilteredLog::new(otel_log_bridge, filter);
log::set_logger(Box::leak(Box::new(logger))).map_err(|e| InitError::Bridge {
bridge: "log".to_string(),
reason: e.to_string(),
})?;
log::set_max_level(max_level);
Ok(())
}
fn setup_tracing_bridge(logger_provider: &SdkLoggerProvider) -> Result<(), InitError> {
let otel_layer =
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(logger_provider);
let filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
.add_directive("opentelemetry=off".parse().unwrap())
.add_directive("opentelemetry_sdk=off".parse().unwrap());
tracing_subscriber::registry()
.with(filter)
.with(otel_layer)
.try_init()
.map_err(|e| InitError::Bridge {
bridge: "tracing".to_string(),
reason: e.to_string(),
})?;
Ok(())
}
pub struct Telemetry {
tracer_provider: Option<SdkTracerProvider>,
meter_provider: Option<SdkMeterProvider>,
logger_provider: Option<SdkLoggerProvider>,
}
impl Telemetry {
pub fn none() -> Self {
Self {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
}
}
pub fn builder(config: OpenTelemetryConfig) -> TelemetryBuilder {
TelemetryBuilder::new(config)
}
pub fn new(config: OpenTelemetryConfig) -> Result<Self, InitError> {
Self::builder(config).with_globals().build()
}
pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
self.tracer_provider.as_ref()
}
pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
self.meter_provider.as_ref()
}
pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
self.logger_provider.as_ref()
}
pub fn shutdown(&mut self) {
let tracer_provider = self.tracer_provider.take();
let meter_provider = self.meter_provider.take();
let logger_provider = self.logger_provider.take();
let shutdown = move || {
if let Some(provider) = tracer_provider {
let _ = provider.shutdown();
}
if let Some(provider) = meter_provider {
let _ = provider.shutdown();
}
if let Some(provider) = logger_provider {
let _ = provider.shutdown();
}
};
if let Ok(rt) = tokio::runtime::Handle::try_current()
&& rt.runtime_flavor() == RuntimeFlavor::MultiThread
{
tokio::task::block_in_place(shutdown);
} else {
shutdown();
}
}
}
impl Drop for Telemetry {
fn drop(&mut self) {
self.shutdown();
}
}
#[cfg(test)]
mod tests {
use apollo_configuration::parse_yaml;
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::error::OTelSdkResult;
use std::time::Duration;
use super::*;
#[test]
fn none_has_no_providers() {
let telemetry = Telemetry::none();
assert!(telemetry.tracer_provider().is_none());
assert!(telemetry.meter_provider().is_none());
assert!(telemetry.logger_provider().is_none());
}
#[test]
fn build_disabled_config() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
disabled: true
tracer_provider:
processors:
- batch:
exporter:
console: {}
"},
&Default::default(),
)
.unwrap();
let telemetry = Telemetry::new(config).unwrap();
assert!(telemetry.tracer_provider().is_none());
assert!(telemetry.meter_provider().is_none());
assert!(telemetry.logger_provider().is_none());
}
#[test]
fn builder_does_not_register_globally_by_default() {
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let builder = TelemetryBuilder::new(config);
assert!(!builder.set_global_tracer);
assert!(!builder.set_global_meter);
assert!(!builder.set_global_propagator);
}
#[test]
fn builder_with_global_propagator_flag() {
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let builder = Telemetry::builder(config).with_global_propagator();
assert!(builder.set_global_propagator);
let _telemetry = builder.build().unwrap();
}
#[test]
fn builder_with_global_tracer_flag() {
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let builder = Telemetry::builder(config).with_global_tracer_provider();
assert!(builder.set_global_tracer);
let _telemetry = builder.build().unwrap();
}
#[test]
fn builder_with_global_meter_flag() {
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let builder = Telemetry::builder(config).with_global_meter_provider();
assert!(builder.set_global_meter);
let _telemetry = builder.build().unwrap();
}
#[test]
fn builder_with_resource_builder() {
use opentelemetry::KeyValue;
use opentelemetry_sdk::Resource;
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let _telemetry = Telemetry::builder(config)
.with_resource_builder(
Resource::builder()
.with_service_name("test-service")
.with_attributes([KeyValue::new("test.attr", "test-value")]),
)
.build()
.unwrap();
assert!(_telemetry.tracer_provider().is_some());
assert!(_telemetry.meter_provider().is_some());
assert!(_telemetry.logger_provider().is_some());
}
#[test]
fn builder_with_resource_builder_overrides_config() {
use opentelemetry::KeyValue;
use opentelemetry_sdk::Resource;
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
resource:
attributes:
- name: from.config
value: config-value
"},
&Default::default(),
)
.unwrap();
let _telemetry = Telemetry::builder(config)
.with_resource_builder(
Resource::builder()
.with_service_name("override-service")
.with_attributes([KeyValue::new("from.builder", "builder-value")]),
)
.build()
.unwrap();
assert!(_telemetry.tracer_provider().is_some());
}
#[test]
fn shutdown_clears_providers() {
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let mut telemetry = Telemetry::builder(config).build().unwrap();
telemetry.shutdown();
assert!(telemetry.tracer_provider().is_none());
assert!(telemetry.meter_provider().is_none());
assert!(telemetry.logger_provider().is_none());
}
#[test]
fn shutdown_is_idempotent() {
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let mut telemetry = Telemetry::builder(config).build().unwrap();
telemetry.shutdown();
telemetry.shutdown();
assert!(telemetry.tracer_provider().is_none());
}
#[cfg(feature = "otlp")]
#[test]
fn global_tracer_set_and_shutdown() {
use opentelemetry::trace::Tracer;
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
console: {}
"},
&Default::default(),
)
.unwrap();
let telemetry = Telemetry::builder(config)
.with_global_tracer_provider()
.build()
.unwrap();
let tracer = global::tracer("test");
let _span = tracer.start("test-span");
drop(telemetry);
let tracer = global::tracer("test-after-shutdown");
let _span = tracer.start("noop-span");
}
#[cfg(feature = "otlp")]
#[test]
fn global_meter_set_and_shutdown() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
meter_provider:
readers:
- periodic:
exporter:
otlp_http:
endpoint: http://localhost:4318
"},
&Default::default(),
)
.unwrap();
let telemetry = Telemetry::builder(config)
.with_global_meter_provider()
.build()
.unwrap();
let meter = global::meter("test");
let counter = meter.u64_counter("test_counter").build();
counter.add(1, &[]);
drop(telemetry);
let meter = global::meter("test-after-shutdown");
let counter = meter.u64_counter("noop_counter").build();
counter.add(1, &[]);
}
#[cfg(feature = "otlp")]
#[test]
fn explicit_shutdown() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http:
endpoint: http://localhost:4318
"},
&Default::default(),
)
.unwrap();
let mut telemetry = Telemetry::builder(config)
.with_global_tracer_provider()
.build()
.unwrap();
telemetry.shutdown();
assert!(telemetry.tracer_provider().is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn shutdown_in_async_context() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct BlockingExporter;
impl opentelemetry_sdk::trace::SpanExporter for BlockingExporter {
async fn export(
&self,
_batch: Vec<opentelemetry_sdk::trace::SpanData>,
) -> opentelemetry_sdk::error::OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
std::thread::sleep(Duration::from_millis(1000));
Ok(())
}
}
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let telemetry = Telemetry::builder(config)
.with_simple_span_exporter(BlockingExporter)
.build()
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let _timer_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(10));
loop {
interval.tick().await;
counter_clone.fetch_add(1, Ordering::SeqCst);
}
});
let drop_task = tokio::spawn(async move {
drop(telemetry);
});
drop_task.await.unwrap();
let final_count = counter.load(Ordering::SeqCst);
assert!(
final_count >= 10,
"Timer should have run during shutdown (got {} ticks)",
final_count
);
}
#[tokio::test(flavor = "current_thread")]
async fn shutdown_in_single_threaded_runtime() {
let config = OpenTelemetryConfig::default();
let mut telemetry = Telemetry::builder(config).build().unwrap();
telemetry.shutdown();
}
#[test]
fn builder_with_simple_span_exporter() {
use opentelemetry::trace::Tracer;
use opentelemetry_sdk::trace::InMemorySpanExporter;
let exporter = InMemorySpanExporter::default();
let exporter_clone = exporter.clone();
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let telemetry = Telemetry::builder(config)
.with_simple_span_exporter(exporter)
.build()
.unwrap();
let tracer = telemetry.tracer_provider().unwrap().tracer("test");
let _span = tracer.start("test-span");
drop(_span);
let spans = exporter_clone.get_finished_spans().unwrap();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].name.as_ref(), "test-span");
}
#[test]
fn builder_with_batch_span_exporter() {
use opentelemetry::trace::Tracer;
use opentelemetry_sdk::trace::InMemorySpanExporter;
let exporter = InMemorySpanExporter::default();
let exporter_clone = exporter.clone();
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let mut telemetry = Telemetry::builder(config)
.with_batch_span_exporter(exporter)
.build()
.unwrap();
let tracer = telemetry.tracer_provider().unwrap().tracer("test");
let _span = tracer.start("batch-test-span");
drop(_span);
telemetry.tracer_provider().unwrap().force_flush().unwrap();
let spans = exporter_clone.get_finished_spans().unwrap();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].name.as_ref(), "batch-test-span");
telemetry.shutdown();
}
#[test]
fn builder_with_span_processor() {
use opentelemetry::trace::Tracer;
use opentelemetry_sdk::trace::{InMemorySpanExporter, SimpleSpanProcessor};
let exporter = InMemorySpanExporter::default();
let exporter_clone = exporter.clone();
let processor = SimpleSpanProcessor::new(exporter);
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let telemetry = Telemetry::builder(config)
.with_span_processor(processor)
.build()
.unwrap();
let tracer = telemetry.tracer_provider().unwrap().tracer("test");
let _span = tracer.start("processor-test-span");
drop(_span);
let spans = exporter_clone.get_finished_spans().unwrap();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].name.as_ref(), "processor-test-span");
}
#[test]
fn builder_with_view() {
use opentelemetry::metrics::MeterProvider;
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let telemetry = Telemetry::builder(config)
.with_view(|_instrument: &Instrument| None)
.build()
.unwrap();
let meter = telemetry.meter_provider().unwrap().meter("test");
let counter = meter.u64_counter("test_counter").build();
counter.add(1, &[]);
}
#[test]
fn builder_with_periodic_reader() {
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader};
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let telemetry = Telemetry::builder(config)
.with_periodic_reader(reader)
.build()
.unwrap();
let meter_provider = telemetry.meter_provider().unwrap();
let meter = meter_provider.meter("test");
let counter = meter.u64_counter("test_counter").build();
counter.add(42, &[]);
meter_provider.force_flush().unwrap();
let metrics = exporter.get_finished_metrics().unwrap();
assert!(!metrics.is_empty(), "expected metrics to be exported");
}
#[test]
fn builder_with_simple_log_exporter() {
use opentelemetry_sdk::logs::InMemoryLogExporter;
let exporter = InMemoryLogExporter::default();
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let telemetry = Telemetry::builder(config)
.with_simple_log_exporter(exporter)
.build()
.unwrap();
assert!(telemetry.logger_provider().is_some());
}
#[test]
fn builder_with_batch_log_exporter() {
use opentelemetry_sdk::logs::InMemoryLogExporter;
let exporter = InMemoryLogExporter::default();
let config: OpenTelemetryConfig = parse_yaml("{}", &Default::default()).unwrap();
let telemetry = Telemetry::builder(config)
.with_batch_log_exporter(exporter)
.build()
.unwrap();
assert!(telemetry.logger_provider().is_some());
}
}