rs_zero/observability/
otlp.rs1use std::{collections::BTreeMap, time::Duration};
2
3use crate::observability::{ObservabilityError, ObservabilityResult};
4
5#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum OtlpProtocol {
8 Grpc,
10 HttpProtobuf,
12}
13
14#[derive(Debug, Clone, PartialEq)]
16pub struct OtlpTraceConfig {
17 pub endpoint: String,
19 pub protocol: OtlpProtocol,
21 pub headers: BTreeMap<String, String>,
23 pub resource: BTreeMap<String, String>,
25 pub sampling_ratio: f64,
27 pub timeout: Duration,
29}
30
31impl Default for OtlpTraceConfig {
32 fn default() -> Self {
33 Self {
34 endpoint: "http://127.0.0.1:4317".to_string(),
35 protocol: OtlpProtocol::Grpc,
36 headers: BTreeMap::new(),
37 resource: BTreeMap::new(),
38 sampling_ratio: 1.0,
39 timeout: Duration::from_secs(5),
40 }
41 }
42}
43
44#[derive(Debug, Clone, Default)]
46pub struct TraceShutdownHandle {
47 installed: bool,
48 #[cfg(feature = "otlp")]
49 provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
50 #[cfg(feature = "otlp")]
51 timeout: Duration,
52}
53
54impl TraceShutdownHandle {
55 pub fn disabled() -> Self {
57 Self {
58 installed: false,
59 #[cfg(feature = "otlp")]
60 provider: None,
61 #[cfg(feature = "otlp")]
62 timeout: Duration::from_secs(5),
63 }
64 }
65
66 pub fn installed() -> Self {
68 Self {
69 installed: true,
70 #[cfg(feature = "otlp")]
71 provider: None,
72 #[cfg(feature = "otlp")]
73 timeout: Duration::from_secs(5),
74 }
75 }
76
77 pub fn is_installed(&self) -> bool {
79 self.installed
80 }
81
82 pub fn flush(&self) -> ObservabilityResult<()> {
84 #[cfg(feature = "otlp")]
85 if let Some(provider) = &self.provider {
86 provider
87 .force_flush()
88 .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?;
89 }
90 Ok(())
91 }
92
93 pub fn shutdown(self) -> ObservabilityResult<()> {
95 #[cfg(feature = "otlp")]
96 if let Some(provider) = self.provider {
97 provider
98 .shutdown_with_timeout(self.timeout)
99 .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?;
100 }
101 Ok(())
102 }
103
104 #[cfg(feature = "otlp")]
105 fn from_provider(
106 provider: opentelemetry_sdk::trace::SdkTracerProvider,
107 timeout: Duration,
108 ) -> Self {
109 Self {
110 installed: true,
111 provider: Some(provider),
112 timeout,
113 }
114 }
115}
116
117pub fn build_otlp_trace_config(config: OtlpTraceConfig) -> ObservabilityResult<OtlpTraceConfig> {
119 if config.endpoint.trim().is_empty() {
120 return Err(ObservabilityError::MissingOtlpEndpoint);
121 }
122 Ok(OtlpTraceConfig {
123 sampling_ratio: config.sampling_ratio.clamp(0.0, 1.0),
124 ..config
125 })
126}
127
128#[cfg(feature = "otlp")]
130pub fn install_otlp_tracing(
131 config: OtlpTraceConfig,
132 filter: String,
133) -> ObservabilityResult<TraceShutdownHandle> {
134 use opentelemetry::trace::TracerProvider as _;
135 use opentelemetry::{KeyValue, global};
136 use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
137 use opentelemetry_sdk::{Resource, propagation::TraceContextPropagator, trace as sdktrace};
138 use tracing_subscriber::{
139 EnvFilter, fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt,
140 };
141
142 let config = build_otlp_trace_config(config)?;
143 let exporter = match config.protocol {
144 OtlpProtocol::Grpc => build_grpc_exporter(&config)?,
145 OtlpProtocol::HttpProtobuf => SpanExporter::builder()
146 .with_http()
147 .with_endpoint(config.endpoint.clone())
148 .with_protocol(Protocol::HttpBinary)
149 .with_timeout(config.timeout)
150 .with_headers(config.headers.clone().into_iter().collect())
151 .build()
152 .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
153 };
154
155 let resource = Resource::builder_empty()
156 .with_attributes(
157 config
158 .resource
159 .iter()
160 .map(|(key, value)| KeyValue::new(key.clone(), value.clone())),
161 )
162 .build();
163 let provider = sdktrace::SdkTracerProvider::builder()
164 .with_batch_exporter(exporter)
165 .with_sampler(sdktrace::Sampler::ParentBased(Box::new(
166 sdktrace::Sampler::TraceIdRatioBased(config.sampling_ratio),
167 )))
168 .with_resource(resource)
169 .build();
170 let tracer = provider.tracer("rs-zero");
171 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
172 let env_filter = EnvFilter::try_new(filter).unwrap_or_else(|_| EnvFilter::new("info"));
173
174 tracing_subscriber::registry()
175 .with(env_filter)
176 .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::CLOSE))
177 .with(otel_layer)
178 .try_init()
179 .map_err(|_| ObservabilityError::SubscriberAlreadyInitialized)?;
180 global::set_tracer_provider(provider.clone());
181 global::set_text_map_propagator(TraceContextPropagator::new());
182
183 Ok(TraceShutdownHandle::from_provider(provider, config.timeout))
184}
185
186#[cfg(feature = "otlp")]
187fn build_grpc_exporter(
188 config: &OtlpTraceConfig,
189) -> ObservabilityResult<opentelemetry_otlp::SpanExporter> {
190 use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithTonicConfig};
191 use tonic::metadata::{Ascii, MetadataKey, MetadataMap, MetadataValue};
192
193 let mut metadata = MetadataMap::new();
194 for (key, value) in &config.headers {
195 metadata.insert(
196 key.parse::<MetadataKey<Ascii>>()
197 .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
198 MetadataValue::try_from(value.as_str())
199 .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
200 );
201 }
202
203 SpanExporter::builder()
204 .with_tonic()
205 .with_endpoint(config.endpoint.clone())
206 .with_protocol(Protocol::Grpc)
207 .with_timeout(config.timeout)
208 .with_metadata(metadata)
209 .build()
210 .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))
211}
212
213#[cfg(test)]
214mod tests {
215 use super::{OtlpTraceConfig, TraceShutdownHandle, build_otlp_trace_config};
216 use crate::observability::ObservabilityError;
217
218 #[test]
219 fn otlp_config_requires_endpoint() {
220 let error = build_otlp_trace_config(OtlpTraceConfig {
221 endpoint: String::new(),
222 ..OtlpTraceConfig::default()
223 })
224 .expect_err("endpoint");
225 assert_eq!(error, ObservabilityError::MissingOtlpEndpoint);
226 }
227
228 #[test]
229 fn shutdown_handle_flushes_without_collector() {
230 let handle = TraceShutdownHandle::installed();
231 assert!(handle.is_installed());
232 handle.flush().expect("flush");
233 handle.shutdown().expect("shutdown");
234 }
235}