#![cfg_attr(all(doc, not(doctest)), feature(doc_auto_cfg))]
use std::{collections::BTreeMap as Map, str::FromStr as _};
use config::{OtelConfig, OtelUrl, StdoutLogsConfig};
use opentelemetry::{KeyValue, trace::TracerProvider as _};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{ExporterBuildError, LogExporter, SpanExporter, WithExportConfig as _};
use opentelemetry_resource_detectors::{K8sResourceDetector, ProcessResourceDetector};
use opentelemetry_sdk::{
Resource,
logs::SdkLoggerProvider,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
propagation::TraceContextPropagator,
trace::{RandomIdGenerator, SdkTracerProvider},
};
use opentelemetry_semantic_conventions::resource::SERVICE_VERSION;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{
EnvFilter, Layer, layer::SubscriberExt as _, util::SubscriberInitExt as _,
};
#[cfg(feature = "axum")]
pub mod axum;
pub mod config;
pub mod reexport;
#[cfg(feature = "reqwest-middleware")]
pub mod reqwest_middleware;
fn mk_resource(
service_name: &'static str,
version: &'static str,
resource_metadata: Map<String, String>,
) -> Resource {
Resource::builder()
.with_attributes(
resource_metadata.into_iter().map(|(key, value)| KeyValue::new(key, value)),
)
.with_detector(Box::new(K8sResourceDetector {}))
.with_detector(Box::new(ProcessResourceDetector {}))
.with_attribute(KeyValue::new(SERVICE_VERSION, version))
.with_service_name(service_name)
.build()
}
fn init_traces(
endpoint: OtelUrl,
resource: Resource,
) -> Result<SdkTracerProvider, ExporterBuildError> {
let exporter = SpanExporter::builder().with_tonic().with_endpoint(endpoint.url).build()?;
let tracer_provider = SdkTracerProvider::builder()
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource)
.with_batch_exporter(exporter)
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
Ok(tracer_provider)
}
fn init_metrics(
endpoint: OtelUrl,
resource: Resource,
) -> Result<SdkMeterProvider, ExporterBuildError> {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint.url)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()?;
let reader = PeriodicReader::builder(exporter).build();
let meter_provider =
MeterProviderBuilder::default().with_resource(resource).with_reader(reader).build();
opentelemetry::global::set_meter_provider(meter_provider.clone());
Ok(meter_provider)
}
fn init_logs(
endpoint: OtelUrl,
resource: Resource,
) -> Result<SdkLoggerProvider, ExporterBuildError> {
let exporter = LogExporter::builder().with_tonic().with_endpoint(endpoint.url).build()?;
Ok(SdkLoggerProvider::builder().with_resource(resource).with_batch_exporter(exporter).build())
}
#[macro_export]
macro_rules! init_otel {
($config:expr) => {
$crate::init_otel(
$config,
env!("CARGO_CRATE_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
)
};
}
#[must_use = "The return is a guard for the providers and it need to be kept to properly shutdown them"]
pub fn init_otel(
config: &OtelConfig,
main_crate: &'static str,
service_name: &'static str,
pkg_version: &'static str,
) -> Result<ProvidersGuard, OtelInitError> {
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::default());
let stdout_layer = config
.stdout
.as_ref()
.or(Some(&StdoutLogsConfig::default()))
.and_then(|stdout| stdout.enabled.then_some(stdout))
.map(|logger_config| {
let filter_fmt = EnvFilter::from_str(&logger_config.get_filter(main_crate))?;
let stdout_layer = tracing_subscriber::fmt::layer().with_thread_names(true);
Ok::<_, OtelInitError>(
if logger_config.json_output {
Box::new(stdout_layer.json()) as Box<dyn Layer<_> + Send + Sync>
} else {
Box::new(stdout_layer)
}
.with_filter(filter_fmt),
)
})
.transpose()?;
let exporter_with_resource = config.exporter.as_ref().map(|exporter| {
(exporter, mk_resource(service_name, pkg_version, exporter.resource_metadata.clone()))
});
let (logger_provider, logs_layer) = exporter_with_resource
.as_ref()
.and_then(|(exporter, resource)| {
exporter.logs.as_ref().and_then(|c| c.enabled.then_some(c)).map(|logger_config| {
let filter_otel = EnvFilter::from_str(&logger_config.get_filter(main_crate))?;
let logger_provider = init_logs(exporter.endpoint.clone(), resource.clone())?;
let logs_layer =
OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel);
Ok::<_, OtelInitError>((Some(logger_provider), Some(logs_layer)))
})
})
.transpose()?
.unwrap_or((None, None));
let (tracer_provider, tracer_layer) = exporter_with_resource
.as_ref()
.and_then(|(exporter, resource)| {
exporter.traces.as_ref().and_then(|c| c.enabled.then_some(c)).map(|tracer_config| {
let trace_filter = EnvFilter::from_str(&tracer_config.get_filter(main_crate))?;
let tracer_provider = init_traces(exporter.endpoint.clone(), resource.clone())?;
let tracer = tracer_provider.tracer(service_name);
let tracer_layer = OpenTelemetryLayer::new(tracer).with_filter(trace_filter);
Ok::<_, OtelInitError>((Some(tracer_provider), Some(tracer_layer)))
})
})
.transpose()?
.unwrap_or((None, None));
let (meter_provider, meter_layer) = exporter_with_resource
.as_ref()
.and_then(|(exporter, resource)| {
exporter.metrics.as_ref().and_then(|c| c.enabled.then_some(c)).map(|meter_config| {
let metrics_filter = EnvFilter::from_str(&meter_config.get_filter(main_crate))?;
let meter_provider = init_metrics(exporter.endpoint.clone(), resource.clone())?;
let meter_layer =
MetricsLayer::new(meter_provider.clone()).with_filter(metrics_filter);
Ok::<_, OtelInitError>((Some(meter_provider), Some(meter_layer)))
})
})
.transpose()?
.unwrap_or((None, None));
let subscriber = tracing_subscriber::registry()
.with(logs_layer)
.with(stdout_layer)
.with(meter_layer)
.with(tracer_layer);
#[cfg(feature = "tracing-error")]
let subscriber = subscriber.with(tracing_error::ErrorLayer::default());
subscriber.init();
Ok(ProvidersGuard { logger_provider, tracer_provider, meter_provider })
}
#[derive(Debug)]
pub struct ProvidersGuard {
logger_provider: Option<SdkLoggerProvider>,
tracer_provider: Option<SdkTracerProvider>,
meter_provider: Option<SdkMeterProvider>,
}
impl Drop for ProvidersGuard {
fn drop(&mut self) {
#[cfg(not(test))]
{
if let Some(logger_provider) = self.logger_provider.as_ref() {
let _ = logger_provider.shutdown().inspect_err(|err| {
tracing::error!("Could not shutdown LoggerProvider: {err}");
});
}
if let Some(tracer_provider) = self.tracer_provider.as_ref() {
let _ = tracer_provider.shutdown().inspect_err(|err| {
tracing::error!("Could not shutdown TracerProvider: {err}");
});
}
if let Some(meter_provider) = self.meter_provider.as_ref() {
let _ = meter_provider.shutdown().inspect_err(|err| {
tracing::error!("Could not shutdown MeterProvider: {err}");
});
}
}
}
}
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum OtelInitError {
#[error("Error building the exporter: {0}")]
BuildExporterError(#[from] ExporterBuildError),
#[error("Parsing EnvFilter directives error: {0}")]
EnvFilterError(#[from] tracing_subscriber::filter::ParseError),
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used)]
use super::{
config::{ExporterConfig, OtelConfig, ProviderConfig},
init_otel,
};
use crate::config::StdoutLogsConfig;
#[tokio::test]
async fn test_tracer_provider_enabled() {
let config = OtelConfig {
stdout: None,
exporter: Some(ExporterConfig {
traces: Some(ProviderConfig { enabled: true, ..Default::default() }),
..Default::default()
}),
};
let guard = init_otel!(&config).expect("Error initializing Otel");
assert!(guard.tracer_provider.is_some());
}
#[tokio::test]
async fn test_tracer_provider_disabled() {
let config_enabled_false = OtelConfig {
stdout: None,
exporter: Some(ExporterConfig {
traces: Some(ProviderConfig { enabled: false, ..Default::default() }),
..Default::default()
}),
};
let guard = init_otel!(&config_enabled_false).expect("Error initializing Otel");
assert!(guard.tracer_provider.is_none());
}
#[tokio::test]
async fn test_meter_provider_disabled() {
let config_enabled_false = OtelConfig {
stdout: None,
exporter: Some(ExporterConfig {
metrics: Some(ProviderConfig { enabled: false, ..Default::default() }),
..Default::default()
}),
};
let guard = init_otel!(&config_enabled_false).expect("Error initializing Otel");
assert!(guard.meter_provider.is_none());
}
#[tokio::test]
async fn test_logger_provider_enabled() {
let config = OtelConfig {
stdout: None,
exporter: Some(ExporterConfig {
logs: Some(ProviderConfig { enabled: true, ..Default::default() }),
..Default::default()
}),
};
let guard = init_otel!(&config).expect("Error initializing Otel");
assert!(guard.logger_provider.is_some());
}
#[tokio::test]
async fn test_logger_provider_disabled() {
let config_enabled_false = OtelConfig {
stdout: None,
exporter: Some(ExporterConfig {
logs: Some(ProviderConfig { enabled: false, ..Default::default() }),
..Default::default()
}),
};
let guard = init_otel!(&config_enabled_false).expect("Error initializing Otel");
assert!(guard.logger_provider.is_none());
}
#[tokio::test]
async fn test_exporter_config_none() {
let config_none = OtelConfig {
stdout: Some(StdoutLogsConfig { enabled: true, ..Default::default() }),
exporter: Some(ExporterConfig::default()),
};
let guard = init_otel!(&config_none).expect("Error initializing Otel");
assert!(guard.meter_provider.is_none());
assert!(guard.tracer_provider.is_none());
assert!(guard.logger_provider.is_none());
}
}