Skip to main content

rs_zero/observability/
otlp.rs

1use std::{collections::BTreeMap, time::Duration};
2
3use crate::observability::{ObservabilityError, ObservabilityResult};
4
5/// OTLP transport protocol.
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum OtlpProtocol {
8    /// gRPC OTLP.
9    Grpc,
10    /// HTTP/protobuf OTLP.
11    HttpProtobuf,
12}
13
14/// OTLP traces exporter configuration.
15#[derive(Debug, Clone, PartialEq)]
16pub struct OtlpTraceConfig {
17    /// Collector endpoint.
18    pub endpoint: String,
19    /// Transport protocol.
20    pub protocol: OtlpProtocol,
21    /// Request headers.
22    pub headers: BTreeMap<String, String>,
23    /// Resource attributes.
24    pub resource: BTreeMap<String, String>,
25    /// Sampling ratio in `[0.0, 1.0]`.
26    pub sampling_ratio: f64,
27    /// Export timeout.
28    pub timeout: Duration,
29}
30
31impl Default for OtlpTraceConfig {
32    fn default() -> Self {
33        Self {
34            endpoint: "http://127.0.0.1:4317".to_string(),
35            protocol: OtlpProtocol::Grpc,
36            headers: BTreeMap::new(),
37            resource: BTreeMap::new(),
38            sampling_ratio: 1.0,
39            timeout: Duration::from_secs(5),
40        }
41    }
42}
43
44/// Handle used to flush and shut down tracing exporters.
45#[derive(Debug, Clone, Default)]
46pub struct TraceShutdownHandle {
47    installed: bool,
48    #[cfg(feature = "otlp")]
49    provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
50    #[cfg(feature = "otlp")]
51    timeout: Duration,
52}
53
54impl TraceShutdownHandle {
55    /// Creates a handle representing no external exporter.
56    pub fn disabled() -> Self {
57        Self {
58            installed: false,
59            #[cfg(feature = "otlp")]
60            provider: None,
61            #[cfg(feature = "otlp")]
62            timeout: Duration::from_secs(5),
63        }
64    }
65
66    /// Creates a handle representing an installed exporter.
67    pub fn installed() -> Self {
68        Self {
69            installed: true,
70            #[cfg(feature = "otlp")]
71            provider: None,
72            #[cfg(feature = "otlp")]
73            timeout: Duration::from_secs(5),
74        }
75    }
76
77    /// Returns whether this handle owns an installed exporter.
78    pub fn is_installed(&self) -> bool {
79        self.installed
80    }
81
82    /// Flushes pending spans.
83    pub fn flush(&self) -> ObservabilityResult<()> {
84        #[cfg(feature = "otlp")]
85        if let Some(provider) = &self.provider {
86            provider
87                .force_flush()
88                .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?;
89        }
90        Ok(())
91    }
92
93    /// Shuts down the exporter.
94    pub fn shutdown(self) -> ObservabilityResult<()> {
95        #[cfg(feature = "otlp")]
96        if let Some(provider) = self.provider {
97            provider
98                .shutdown_with_timeout(self.timeout)
99                .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?;
100        }
101        Ok(())
102    }
103
104    #[cfg(feature = "otlp")]
105    fn from_provider(
106        provider: opentelemetry_sdk::trace::SdkTracerProvider,
107        timeout: Duration,
108    ) -> Self {
109        Self {
110            installed: true,
111            provider: Some(provider),
112            timeout,
113        }
114    }
115}
116
117/// Validates and normalizes OTLP trace config.
118pub fn build_otlp_trace_config(config: OtlpTraceConfig) -> ObservabilityResult<OtlpTraceConfig> {
119    if config.endpoint.trim().is_empty() {
120        return Err(ObservabilityError::MissingOtlpEndpoint);
121    }
122    Ok(OtlpTraceConfig {
123        sampling_ratio: config.sampling_ratio.clamp(0.0, 1.0),
124        ..config
125    })
126}
127
128/// Installs a real OTLP tracing pipeline.
129#[cfg(feature = "otlp")]
130pub fn install_otlp_tracing(
131    config: OtlpTraceConfig,
132    filter: String,
133) -> ObservabilityResult<TraceShutdownHandle> {
134    use opentelemetry::trace::TracerProvider as _;
135    use opentelemetry::{KeyValue, global};
136    use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
137    use opentelemetry_sdk::{Resource, propagation::TraceContextPropagator, trace as sdktrace};
138    use tracing_subscriber::{
139        EnvFilter, fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt,
140    };
141
142    let config = build_otlp_trace_config(config)?;
143    let exporter = match config.protocol {
144        OtlpProtocol::Grpc => build_grpc_exporter(&config)?,
145        OtlpProtocol::HttpProtobuf => SpanExporter::builder()
146            .with_http()
147            .with_endpoint(config.endpoint.clone())
148            .with_protocol(Protocol::HttpBinary)
149            .with_timeout(config.timeout)
150            .with_headers(config.headers.clone().into_iter().collect())
151            .build()
152            .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
153    };
154
155    let resource = Resource::builder_empty()
156        .with_attributes(
157            config
158                .resource
159                .iter()
160                .map(|(key, value)| KeyValue::new(key.clone(), value.clone())),
161        )
162        .build();
163    let provider = sdktrace::SdkTracerProvider::builder()
164        .with_batch_exporter(exporter)
165        .with_sampler(sdktrace::Sampler::ParentBased(Box::new(
166            sdktrace::Sampler::TraceIdRatioBased(config.sampling_ratio),
167        )))
168        .with_resource(resource)
169        .build();
170    let tracer = provider.tracer("rs-zero");
171    let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
172    let env_filter = EnvFilter::try_new(filter).unwrap_or_else(|_| EnvFilter::new("info"));
173
174    tracing_subscriber::registry()
175        .with(env_filter)
176        .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::CLOSE))
177        .with(otel_layer)
178        .try_init()
179        .map_err(|_| ObservabilityError::SubscriberAlreadyInitialized)?;
180    global::set_tracer_provider(provider.clone());
181    global::set_text_map_propagator(TraceContextPropagator::new());
182
183    Ok(TraceShutdownHandle::from_provider(provider, config.timeout))
184}
185
186#[cfg(feature = "otlp")]
187fn build_grpc_exporter(
188    config: &OtlpTraceConfig,
189) -> ObservabilityResult<opentelemetry_otlp::SpanExporter> {
190    use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithTonicConfig};
191    use tonic::metadata::{Ascii, MetadataKey, MetadataMap, MetadataValue};
192
193    let mut metadata = MetadataMap::new();
194    for (key, value) in &config.headers {
195        metadata.insert(
196            key.parse::<MetadataKey<Ascii>>()
197                .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
198            MetadataValue::try_from(value.as_str())
199                .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
200        );
201    }
202
203    SpanExporter::builder()
204        .with_tonic()
205        .with_endpoint(config.endpoint.clone())
206        .with_protocol(Protocol::Grpc)
207        .with_timeout(config.timeout)
208        .with_metadata(metadata)
209        .build()
210        .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))
211}
212
213#[cfg(test)]
214mod tests {
215    use super::{OtlpTraceConfig, TraceShutdownHandle, build_otlp_trace_config};
216    use crate::observability::ObservabilityError;
217
218    #[test]
219    fn otlp_config_requires_endpoint() {
220        let error = build_otlp_trace_config(OtlpTraceConfig {
221            endpoint: String::new(),
222            ..OtlpTraceConfig::default()
223        })
224        .expect_err("endpoint");
225        assert_eq!(error, ObservabilityError::MissingOtlpEndpoint);
226    }
227
228    #[test]
229    fn shutdown_handle_flushes_without_collector() {
230        let handle = TraceShutdownHandle::installed();
231        assert!(handle.is_installed());
232        handle.flush().expect("flush");
233        handle.shutdown().expect("shutdown");
234    }
235}