use std::{collections::BTreeMap, time::Duration};
use crate::observability::{ObservabilityError, ObservabilityResult};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OtlpProtocol {
Grpc,
HttpProtobuf,
}
#[derive(Debug, Clone, PartialEq)]
pub struct OtlpTraceConfig {
pub endpoint: String,
pub protocol: OtlpProtocol,
pub headers: BTreeMap<String, String>,
pub resource: BTreeMap<String, String>,
pub sampling_ratio: f64,
pub timeout: Duration,
}
impl Default for OtlpTraceConfig {
fn default() -> Self {
Self {
endpoint: "http://127.0.0.1:4317".to_string(),
protocol: OtlpProtocol::Grpc,
headers: BTreeMap::new(),
resource: BTreeMap::new(),
sampling_ratio: 1.0,
timeout: Duration::from_secs(5),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TraceShutdownHandle {
installed: bool,
#[cfg(feature = "otlp")]
provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
#[cfg(feature = "otlp")]
timeout: Duration,
}
impl TraceShutdownHandle {
pub fn disabled() -> Self {
Self {
installed: false,
#[cfg(feature = "otlp")]
provider: None,
#[cfg(feature = "otlp")]
timeout: Duration::from_secs(5),
}
}
pub fn installed() -> Self {
Self {
installed: true,
#[cfg(feature = "otlp")]
provider: None,
#[cfg(feature = "otlp")]
timeout: Duration::from_secs(5),
}
}
pub fn is_installed(&self) -> bool {
self.installed
}
pub fn flush(&self) -> ObservabilityResult<()> {
#[cfg(feature = "otlp")]
if let Some(provider) = &self.provider {
provider
.force_flush()
.map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?;
}
Ok(())
}
pub fn shutdown(self) -> ObservabilityResult<()> {
#[cfg(feature = "otlp")]
if let Some(provider) = self.provider {
provider
.shutdown_with_timeout(self.timeout)
.map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?;
}
Ok(())
}
#[cfg(feature = "otlp")]
fn from_provider(
provider: opentelemetry_sdk::trace::SdkTracerProvider,
timeout: Duration,
) -> Self {
Self {
installed: true,
provider: Some(provider),
timeout,
}
}
}
pub fn build_otlp_trace_config(config: OtlpTraceConfig) -> ObservabilityResult<OtlpTraceConfig> {
if config.endpoint.trim().is_empty() {
return Err(ObservabilityError::MissingOtlpEndpoint);
}
Ok(OtlpTraceConfig {
sampling_ratio: config.sampling_ratio.clamp(0.0, 1.0),
..config
})
}
#[cfg(feature = "otlp")]
pub fn install_otlp_tracing(
config: OtlpTraceConfig,
filter: String,
) -> ObservabilityResult<TraceShutdownHandle> {
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::{KeyValue, global};
use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::{Resource, propagation::TraceContextPropagator, trace as sdktrace};
use tracing_subscriber::{
EnvFilter, fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt,
};
let config = build_otlp_trace_config(config)?;
let exporter = match config.protocol {
OtlpProtocol::Grpc => build_grpc_exporter(&config)?,
OtlpProtocol::HttpProtobuf => SpanExporter::builder()
.with_http()
.with_endpoint(config.endpoint.clone())
.with_protocol(Protocol::HttpBinary)
.with_timeout(config.timeout)
.with_headers(config.headers.clone().into_iter().collect())
.build()
.map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
};
let resource = Resource::builder_empty()
.with_attributes(
config
.resource
.iter()
.map(|(key, value)| KeyValue::new(key.clone(), value.clone())),
)
.build();
let provider = sdktrace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_sampler(sdktrace::Sampler::ParentBased(Box::new(
sdktrace::Sampler::TraceIdRatioBased(config.sampling_ratio),
)))
.with_resource(resource)
.build();
let tracer = provider.tracer("rs-zero");
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let env_filter = EnvFilter::try_new(filter).unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::CLOSE))
.with(otel_layer)
.try_init()
.map_err(|_| ObservabilityError::SubscriberAlreadyInitialized)?;
global::set_tracer_provider(provider.clone());
global::set_text_map_propagator(TraceContextPropagator::new());
Ok(TraceShutdownHandle::from_provider(provider, config.timeout))
}
#[cfg(feature = "otlp")]
fn build_grpc_exporter(
config: &OtlpTraceConfig,
) -> ObservabilityResult<opentelemetry_otlp::SpanExporter> {
use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithTonicConfig};
use tonic::metadata::{Ascii, MetadataKey, MetadataMap, MetadataValue};
let mut metadata = MetadataMap::new();
for (key, value) in &config.headers {
metadata.insert(
key.parse::<MetadataKey<Ascii>>()
.map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
MetadataValue::try_from(value.as_str())
.map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
);
}
SpanExporter::builder()
.with_tonic()
.with_endpoint(config.endpoint.clone())
.with_protocol(Protocol::Grpc)
.with_timeout(config.timeout)
.with_metadata(metadata)
.build()
.map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))
}
#[cfg(test)]
mod tests {
use super::{OtlpTraceConfig, TraceShutdownHandle, build_otlp_trace_config};
use crate::observability::ObservabilityError;
#[test]
fn otlp_config_requires_endpoint() {
let error = build_otlp_trace_config(OtlpTraceConfig {
endpoint: String::new(),
..OtlpTraceConfig::default()
})
.expect_err("endpoint");
assert_eq!(error, ObservabilityError::MissingOtlpEndpoint);
}
#[test]
fn shutdown_handle_flushes_without_collector() {
let handle = TraceShutdownHandle::installed();
assert!(handle.is_installed());
handle.flush().expect("flush");
handle.shutdown().expect("shutdown");
}
}