use opentelemetry::KeyValue;
use opentelemetry::{global, trace::TracerProvider};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{LogExporter, Protocol, SpanExporter, WithExportConfig};
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::resource::Resource;
use opentelemetry_sdk::trace::{self, RandomIdGenerator, Sampler, SdkTracerProvider};
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
use std::env;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, fmt, registry};
#[derive(Debug, Clone)]
pub struct TelemetryConfig {
pub service_name: String,
pub service_version: String,
pub otlp_endpoint: Option<String>,
pub log_level: String,
}
impl TelemetryConfig {
pub fn new(service_name: impl Into<String>, service_version: impl Into<String>) -> Self {
Self {
service_name: service_name.into(),
service_version: service_version.into(),
otlp_endpoint: None,
log_level: "info".to_string(),
}
}
pub fn from_env() -> Self {
let service_name = env::var("SERVICE_NAME").expect("SERVICE_NAME must be set");
let service_version = env::var("SERVICE_VERSION")
.or_else(|_| env::var("CARGO_PKG_VERSION"))
.unwrap_or_else(|_| "unknown".to_string());
let otlp_endpoint = env::var("OTLP_ENDPOINT").ok();
let log_level = env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string());
Self {
service_name,
service_version,
otlp_endpoint,
log_level,
}
}
pub fn with_otlp_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.otlp_endpoint = Some(endpoint.into());
self
}
}
pub fn init_telemetry(
config: &TelemetryConfig,
) -> Result<Option<SdkTracerProvider>, Box<dyn std::error::Error>> {
let env_filter = EnvFilter::new(&config.log_level);
if config.otlp_endpoint.is_none() {
let subscriber = registry().with(env_filter).with(fmt::layer());
SubscriberInitExt::try_init(subscriber)?;
return Ok(None);
}
let otlp_endpoint = config.otlp_endpoint.clone().unwrap();
let resource = Resource::builder()
.with_attribute(KeyValue::new(SERVICE_NAME, config.service_name.to_string()))
.with_attribute(KeyValue::new(
SERVICE_VERSION,
config.service_version.to_string(),
))
.build();
let trace_exporter = SpanExporter::builder()
.with_http()
.with_protocol(Protocol::HttpJson)
.with_endpoint(format!("{}/v1/traces", otlp_endpoint.trim_end_matches('/')))
.build()?;
let trace_processor = trace::BatchSpanProcessor::new(
trace_exporter,
trace::BatchConfigBuilder::default().build(),
);
let tracer_provider = trace::SdkTracerProvider::builder()
.with_resource(resource.clone())
.with_sampler(Sampler::AlwaysOn)
.with_span_processor(trace_processor)
.with_id_generator(RandomIdGenerator::default())
.build();
global::set_text_map_propagator(TraceContextPropagator::new());
global::set_tracer_provider(tracer_provider.clone());
let tracer = tracer_provider.tracer(format!("{}-tracer", config.service_name));
let otel_layer = OpenTelemetryLayer::new(tracer);
let log_exporter = LogExporter::builder()
.with_http()
.with_endpoint(format!("{}/v1/logs", otlp_endpoint.trim_end_matches('/')))
.build()?;
let log_provider = SdkLoggerProvider::builder()
.with_resource(resource)
.with_batch_exporter(log_exporter)
.build();
let log_layer = OpenTelemetryTracingBridge::new(&log_provider);
let subscriber = registry()
.with(env_filter)
.with(fmt::layer())
.with(otel_layer)
.with(log_layer);
SubscriberInitExt::try_init(subscriber)?;
tracing::info!(
service = %config.service_name,
version = %config.service_version,
"Telemetry initialized"
);
Ok(Some(tracer_provider))
}
pub async fn shutdown_telemetry(tracer_provider: Option<SdkTracerProvider>) {
if let Some(provider) = tracer_provider {
let _ = provider.force_flush();
let _ = provider.shutdown();
}
}