use std::sync::OnceLock;
use std::time::Duration;
use opentelemetry::{KeyValue, trace::TracerProvider};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::{Sampler, TracerProvider as SdkTracerProvider};
use opentelemetry_sdk::{Resource, runtime};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::{build_filter, build_fmt_layer};
static TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
#[derive(Debug, Clone)]
pub struct OtelConfig {
pub service_name: String,
pub otlp_endpoint: String,
pub sampling_ratio: f64,
pub batch_size: usize,
pub max_queue_size: usize,
pub export_timeout_ms: u64,
pub connect_timeout_ms: u64,
}
impl Default for OtelConfig {
fn default() -> Self {
Self {
service_name: "mx-service".to_string(),
otlp_endpoint: "http://localhost:4317".to_string(),
sampling_ratio: 1.0,
batch_size: 512,
max_queue_size: 2048,
export_timeout_ms: 30_000,
connect_timeout_ms: 10_000,
}
}
}
#[derive(Debug)]
pub enum OtelError {
ExporterCreation(String),
TracerInstallation(String),
AlreadyInitialized,
}
impl std::fmt::Display for OtelError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExporterCreation(msg) => write!(f, "Failed to create OTLP exporter: {msg}"),
Self::TracerInstallation(msg) => write!(f, "Failed to install tracer: {msg}"),
Self::AlreadyInitialized => write!(f, "Tracing already initialized"),
}
}
}
impl std::error::Error for OtelError {}
pub type Result<T> = std::result::Result<T, OtelError>;
pub(crate) fn select_sampler(sampling_ratio: f64) -> Sampler {
if (sampling_ratio - 1.0).abs() < f64::EPSILON {
Sampler::AlwaysOn
} else if sampling_ratio <= 0.0 {
Sampler::AlwaysOff
} else {
Sampler::TraceIdRatioBased(sampling_ratio)
}
}
pub fn init_with_otel(
config: &OtelConfig,
filter: Option<&str>,
default_filter: &str,
) -> Result<()> {
if crate::TRACING.get().is_some() {
return Err(OtelError::AlreadyInitialized);
}
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
let resource = Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
config.service_name.clone(),
)]);
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&config.otlp_endpoint)
.with_timeout(Duration::from_millis(config.export_timeout_ms))
.build()
.map_err(|e| OtelError::ExporterCreation(e.to_string()))?;
let sampler = select_sampler(config.sampling_ratio);
let tracer_provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_sampler(sampler)
.with_resource(resource)
.build();
TRACER_PROVIDER
.set(tracer_provider.clone())
.map_err(|_| OtelError::AlreadyInitialized)?;
let tracer = tracer_provider.tracer(config.service_name.clone());
let env_filter = build_filter(filter, default_filter);
let fmt_layer = build_fmt_layer();
let otel_layer = OpenTelemetryLayer::new(tracer);
crate::TRACING.get_or_init(|| {
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.with(otel_layer)
.init();
});
tracing::info!(
service_name = %config.service_name,
otlp_endpoint = %config.otlp_endpoint,
sampling_ratio = config.sampling_ratio,
"OpenTelemetry tracing initialized"
);
Ok(())
}
pub fn shutdown_otel() {
if let Some(provider) = TRACER_PROVIDER.get() {
if let Err(e) = provider.shutdown() {
tracing::warn!(error = %e, "Failed to shutdown OpenTelemetry tracer provider");
} else {
tracing::info!("OpenTelemetry tracer provider shutdown complete");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = OtelConfig::default();
assert_eq!(config.service_name, "mx-service");
assert_eq!(config.otlp_endpoint, "http://localhost:4317");
assert!((config.sampling_ratio - 1.0).abs() < f64::EPSILON);
assert_eq!(config.batch_size, 512);
assert_eq!(config.max_queue_size, 2048);
}
#[test]
fn test_otel_error_display() {
let err = OtelError::ExporterCreation("connection refused".to_string());
assert!(err.to_string().contains("OTLP exporter"));
assert!(err.to_string().contains("connection refused"));
let err = OtelError::AlreadyInitialized;
assert!(err.to_string().contains("already initialized"));
}
#[test]
fn test_sampler_selection() {
let config = OtelConfig {
sampling_ratio: 1.0,
..Default::default()
};
assert!((config.sampling_ratio - 1.0).abs() < f64::EPSILON);
let config = OtelConfig {
sampling_ratio: 0.5,
..Default::default()
};
assert!(config.sampling_ratio > 0.0 && config.sampling_ratio < 1.0);
}
#[test]
fn test_select_sampler_always_on() {
let sampler = select_sampler(1.0);
assert!(matches!(sampler, Sampler::AlwaysOn));
}
#[test]
fn test_select_sampler_always_on_near_one() {
let sampler = select_sampler(1.0 - f64::EPSILON / 2.0);
assert!(matches!(sampler, Sampler::AlwaysOn));
}
#[test]
fn test_select_sampler_always_off_zero() {
let sampler = select_sampler(0.0);
assert!(matches!(sampler, Sampler::AlwaysOff));
}
#[test]
fn test_select_sampler_always_off_negative() {
let sampler = select_sampler(-0.5);
assert!(matches!(sampler, Sampler::AlwaysOff));
let sampler = select_sampler(-1.0);
assert!(matches!(sampler, Sampler::AlwaysOff));
}
#[test]
fn test_select_sampler_ratio_based() {
for ratio in [0.001, 0.1, 0.25, 0.5, 0.75, 0.9, 0.999] {
let sampler = select_sampler(ratio);
assert!(
matches!(sampler, Sampler::TraceIdRatioBased(r) if (r - ratio).abs() < f64::EPSILON),
"Expected TraceIdRatioBased({ratio}), got {:?}",
sampler
);
}
}
#[test]
fn test_select_sampler_edge_near_zero() {
let sampler = select_sampler(0.0001);
assert!(matches!(sampler, Sampler::TraceIdRatioBased(_)));
}
#[test]
fn test_select_sampler_edge_near_one() {
let ratio = 1.0 - f64::EPSILON * 2.0;
let sampler = select_sampler(ratio);
assert!(matches!(sampler, Sampler::TraceIdRatioBased(_)));
}
}