1#![forbid(unsafe_code)]
36
37use std::time::Duration;
38
39use opentelemetry::KeyValue;
40use opentelemetry::trace::TracerProvider as _;
41use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
42use opentelemetry_sdk::Resource;
43use opentelemetry_sdk::propagation::TraceContextPropagator;
44use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider};
45use tracing_subscriber::layer::SubscriberExt as _;
46use tracing_subscriber::util::SubscriberInitExt as _;
47use tracing_subscriber::{EnvFilter, Layer as _};
48
49#[derive(Debug, thiserror::Error)]
53pub enum TelemetryError {
54 #[error("failed to build OTLP span exporter: {0}")]
55 Exporter(#[from] opentelemetry_otlp::ExporterBuildError),
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum OtlpProtocol {
61 Grpc,
63 HttpProtobuf,
65}
66
67impl OtlpProtocol {
68 #[must_use]
71 pub fn parse(s: &str) -> Self {
72 match s.trim().to_ascii_lowercase().as_str() {
73 "http/protobuf" | "http" | "httpbinary" | "http-protobuf" => Self::HttpProtobuf,
74 _ => Self::Grpc,
75 }
76 }
77
78 #[must_use]
79 pub fn default_endpoint(self) -> &'static str {
80 match self {
81 Self::Grpc => "http://localhost:4317",
82 Self::HttpProtobuf => "http://localhost:4318",
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
90pub struct OtlpConfig {
91 pub endpoint: String,
92 pub protocol: OtlpProtocol,
93 pub sample_ratio: f64,
96 pub service_name: String,
97 pub service_version: String,
98 pub service_instance_id: String,
99 pub timeout: Duration,
100}
101
102fn env_truthy(v: &str) -> bool {
104 matches!(
105 v.trim().to_ascii_lowercase().as_str(),
106 "1" | "true" | "yes" | "on"
107 )
108}
109
110impl OtlpConfig {
111 #[must_use]
120 pub fn from_env(
121 get: impl Fn(&str) -> Option<String>,
122 service_instance_id: &str,
123 service_version: &str,
124 default_service_name: &str,
125 ) -> Option<Self> {
126 if get("OTEL_SDK_DISABLED").as_deref().is_some_and(env_truthy) {
127 return None;
128 }
129
130 let endpoint_override = get("CRABKA_OTLP_ENDPOINT")
131 .or_else(|| get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"))
132 .or_else(|| get("OTEL_EXPORTER_OTLP_ENDPOINT"))
133 .map(|s| s.trim().to_owned())
134 .filter(|s| !s.is_empty());
135
136 let explicitly_enabled = get("CRABKA_OTLP_ENABLED")
137 .as_deref()
138 .is_some_and(env_truthy);
139
140 if endpoint_override.is_none() && !explicitly_enabled {
142 return None;
143 }
144
145 let protocol = get("CRABKA_OTLP_PROTOCOL")
146 .or_else(|| get("OTEL_EXPORTER_OTLP_PROTOCOL"))
147 .map_or(OtlpProtocol::Grpc, |s| OtlpProtocol::parse(&s));
148
149 let endpoint = endpoint_override.unwrap_or_else(|| protocol.default_endpoint().to_owned());
150
151 let sample_ratio = get("CRABKA_OTLP_SAMPLE_RATIO")
152 .or_else(|| get("OTEL_TRACES_SAMPLER_ARG"))
153 .and_then(|s| s.trim().parse::<f64>().ok())
154 .map_or(1.0, |r| r.clamp(0.0, 1.0));
155
156 let service_name = get("OTEL_SERVICE_NAME")
157 .map(|s| s.trim().to_owned())
158 .filter(|s| !s.is_empty())
159 .unwrap_or_else(|| default_service_name.to_owned());
160
161 let timeout = get("CRABKA_OTLP_TIMEOUT_SECS")
162 .or_else(|| get("OTEL_EXPORTER_OTLP_TIMEOUT_SECS"))
163 .and_then(|s| s.trim().parse::<u64>().ok())
164 .map_or(Duration::from_secs(10), Duration::from_secs);
165
166 Some(Self {
167 endpoint,
168 protocol,
169 sample_ratio,
170 service_name,
171 service_version: service_version.to_owned(),
172 service_instance_id: service_instance_id.to_owned(),
173 timeout,
174 })
175 }
176
177 fn build_exporter(&self) -> Result<SpanExporter, TelemetryError> {
178 let builder = SpanExporter::builder();
179 let exporter = match self.protocol {
180 OtlpProtocol::Grpc => builder
181 .with_tonic()
182 .with_endpoint(self.endpoint.clone())
183 .with_timeout(self.timeout)
184 .build()?,
185 OtlpProtocol::HttpProtobuf => builder
186 .with_http()
187 .with_protocol(Protocol::HttpBinary)
188 .with_endpoint(self.endpoint.clone())
189 .with_timeout(self.timeout)
190 .build()?,
191 };
192 Ok(exporter)
193 }
194
195 fn resource(&self) -> Resource {
196 Resource::builder()
197 .with_service_name(self.service_name.clone())
198 .with_attributes([
199 KeyValue::new("service.version", self.service_version.clone()),
200 KeyValue::new("service.instance.id", self.service_instance_id.clone()),
201 ])
202 .build()
203 }
204
205 fn sampler(&self) -> Sampler {
206 Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(self.sample_ratio)))
207 }
208}
209
210fn otel_filter(default: &str, get: impl Fn(&str) -> Option<String>) -> EnvFilter {
213 get("CRABKA_OTLP_FILTER")
214 .and_then(|s| EnvFilter::try_new(s).ok())
215 .unwrap_or_else(|| EnvFilter::new(default))
216}
217
218#[must_use = "hold the guard for the process lifetime and call shutdown() before exit"]
223pub struct TelemetryGuard {
224 provider: Option<SdkTracerProvider>,
225}
226
227impl TelemetryGuard {
228 pub fn shutdown(self) {
230 if let Some(provider) = self.provider
231 && let Err(e) = provider.shutdown()
232 {
233 tracing::warn!(error = %e, "OTLP tracer provider shutdown error");
234 }
235 }
236}
237
238pub fn init(
249 otlp: Option<OtlpConfig>,
250 fmt_default_filter: &str,
251 otel_default_filter: &str,
252 tracer_name: &str,
253) -> Result<TelemetryGuard, TelemetryError> {
254 let fmt_filter =
255 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(fmt_default_filter));
256 let fmt_layer = tracing_subscriber::fmt::layer().with_filter(fmt_filter);
257
258 let Some(cfg) = otlp else {
259 tracing_subscriber::registry().with(fmt_layer).init();
260 return Ok(TelemetryGuard { provider: None });
261 };
262
263 let exporter = cfg.build_exporter()?;
264 let provider = SdkTracerProvider::builder()
265 .with_batch_exporter(exporter)
266 .with_resource(cfg.resource())
267 .with_sampler(cfg.sampler())
268 .build();
269 let tracer = provider.tracer(tracer_name.to_owned());
270
271 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
272 opentelemetry::global::set_tracer_provider(provider.clone());
273
274 let otel_layer = tracing_opentelemetry::layer()
275 .with_tracer(tracer)
276 .with_location(false)
277 .with_filter(otel_filter(otel_default_filter, |k| std::env::var(k).ok()));
278
279 tracing_subscriber::registry()
280 .with(fmt_layer)
281 .with(otel_layer)
282 .init();
283
284 tracing::info!(
285 endpoint = %cfg.endpoint,
286 protocol = ?cfg.protocol,
287 sample_ratio = cfg.sample_ratio,
288 "OTLP distributed tracing enabled"
289 );
290
291 Ok(TelemetryGuard {
292 provider: Some(provider),
293 })
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use assert2::assert;
300
301 fn env_from<'a>(pairs: &'a [(&'a str, &'a str)]) -> impl Fn(&str) -> Option<String> + 'a {
302 move |k: &str| {
303 pairs
304 .iter()
305 .find(|(key, _)| *key == k)
306 .map(|(_, v)| (*v).to_owned())
307 }
308 }
309
310 #[test]
311 fn disabled_when_no_env() {
312 let cfg = OtlpConfig::from_env(env_from(&[]), "1", "0.1.1", "crabka-broker");
313 assert!(cfg.is_none());
314 }
315
316 #[test]
317 fn enabled_by_crabka_endpoint() {
318 let cfg = OtlpConfig::from_env(
319 env_from(&[("CRABKA_OTLP_ENDPOINT", "http://collector:4317")]),
320 "7",
321 "0.1.1",
322 "crabka-broker",
323 )
324 .expect("enabled");
325 assert!(cfg.endpoint == "http://collector:4317");
326 assert!(cfg.protocol == OtlpProtocol::Grpc);
327 assert!((cfg.sample_ratio - 1.0).abs() < f64::EPSILON);
328 assert!(cfg.service_name == "crabka-broker");
329 assert!(cfg.service_instance_id == "7");
330 assert!(cfg.service_version == "0.1.1");
331 }
332
333 #[test]
334 fn enabled_flag_uses_protocol_default_endpoint() {
335 let cfg = OtlpConfig::from_env(
336 env_from(&[
337 ("CRABKA_OTLP_ENABLED", "true"),
338 ("CRABKA_OTLP_PROTOCOL", "http/protobuf"),
339 ]),
340 "1",
341 "0.1.1",
342 "crabka-broker",
343 )
344 .expect("enabled");
345 assert!(cfg.protocol == OtlpProtocol::HttpProtobuf);
346 assert!(cfg.endpoint == "http://localhost:4318");
347 }
348
349 #[test]
350 fn grpc_is_the_default_protocol() {
351 let cfg = OtlpConfig::from_env(
352 env_from(&[("CRABKA_OTLP_ENABLED", "1")]),
353 "1",
354 "0.1.1",
355 "crabka-broker",
356 )
357 .expect("enabled");
358 assert!(cfg.protocol == OtlpProtocol::Grpc);
359 assert!(cfg.endpoint == "http://localhost:4317");
360 }
361
362 #[test]
363 fn sdk_disabled_overrides_endpoint() {
364 let cfg = OtlpConfig::from_env(
365 env_from(&[
366 ("CRABKA_OTLP_ENDPOINT", "http://collector:4317"),
367 ("OTEL_SDK_DISABLED", "true"),
368 ]),
369 "1",
370 "0.1.1",
371 "crabka-broker",
372 );
373 assert!(cfg.is_none());
374 }
375
376 #[test]
377 fn endpoint_precedence_and_standard_vars() {
378 let cfg = OtlpConfig::from_env(
380 env_from(&[("OTEL_EXPORTER_OTLP_ENDPOINT", "http://otel:4317")]),
381 "1",
382 "0.1.1",
383 "crabka-broker",
384 )
385 .expect("enabled");
386 assert!(cfg.endpoint == "http://otel:4317");
387
388 let cfg = OtlpConfig::from_env(
390 env_from(&[
391 ("OTEL_EXPORTER_OTLP_ENDPOINT", "http://generic:4317"),
392 ("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "http://traces:4317"),
393 ]),
394 "1",
395 "0.1.1",
396 "crabka-broker",
397 )
398 .expect("enabled");
399 assert!(cfg.endpoint == "http://traces:4317");
400
401 let cfg = OtlpConfig::from_env(
403 env_from(&[
404 ("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "http://traces:4317"),
405 ("CRABKA_OTLP_ENDPOINT", "http://crabka:4317"),
406 ]),
407 "1",
408 "0.1.1",
409 "crabka-broker",
410 )
411 .expect("enabled");
412 assert!(cfg.endpoint == "http://crabka:4317");
413 }
414
415 #[test]
416 fn sample_ratio_parsed_and_clamped() {
417 let cfg = OtlpConfig::from_env(
418 env_from(&[
419 ("CRABKA_OTLP_ENDPOINT", "http://c:4317"),
420 ("CRABKA_OTLP_SAMPLE_RATIO", "0.25"),
421 ]),
422 "1",
423 "0.1.1",
424 "crabka-broker",
425 )
426 .expect("enabled");
427 assert!((cfg.sample_ratio - 0.25).abs() < f64::EPSILON);
428
429 let cfg = OtlpConfig::from_env(
431 env_from(&[
432 ("CRABKA_OTLP_ENABLED", "true"),
433 ("CRABKA_OTLP_SAMPLE_RATIO", "9.0"),
434 ]),
435 "1",
436 "0.1.1",
437 "crabka-broker",
438 )
439 .expect("enabled");
440 assert!((cfg.sample_ratio - 1.0).abs() < f64::EPSILON);
441 }
442
443 #[test]
444 fn service_name_and_timeout_overrides() {
445 let cfg = OtlpConfig::from_env(
446 env_from(&[
447 ("CRABKA_OTLP_ENDPOINT", "http://c:4317"),
448 ("OTEL_SERVICE_NAME", "my-kafka"),
449 ("CRABKA_OTLP_TIMEOUT_SECS", "3"),
450 ]),
451 "9",
452 "0.1.1",
453 "crabka-broker",
454 )
455 .expect("enabled");
456 assert!(cfg.service_name == "my-kafka");
457 assert!(cfg.timeout == Duration::from_secs(3));
458 }
459
460 #[test]
461 fn protocol_parse_variants() {
462 assert!(OtlpProtocol::parse("grpc") == OtlpProtocol::Grpc);
463 assert!(OtlpProtocol::parse("http/protobuf") == OtlpProtocol::HttpProtobuf);
464 assert!(OtlpProtocol::parse("HTTP") == OtlpProtocol::HttpProtobuf);
465 assert!(OtlpProtocol::parse("nonsense") == OtlpProtocol::Grpc);
466 }
467}