Skip to main content

crabka_telemetry/
lib.rs

1//! Generic OTLP distributed-tracing pipeline for Crabka services.
2//!
3//! The consuming service always installs a `tracing_subscriber` `fmt` layer
4//! (stdout, gated by the usual `RUST_LOG` `EnvFilter`). When OTLP export is
5//! configured via the environment, a second `tracing-opentelemetry` layer is
6//! attached that converts `tracing` spans into OpenTelemetry spans and
7//! batch-exports them over OTLP to a collector (gRPC `:4317` or
8//! HTTP/protobuf `:4318`).
9//!
10//! ## Enabling
11//!
12//! OTLP is **off by default** — a service with no OTLP environment behaves
13//! byte-for-byte as before. It turns on when any endpoint is set
14//! (`CRABKA_OTLP_ENDPOINT`, `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`,
15//! `OTEL_EXPORTER_OTLP_ENDPOINT`) or `CRABKA_OTLP_ENABLED=true`, and is
16//! force-disabled by `OTEL_SDK_DISABLED=true`.
17//!
18//! ## Resolve OTLP settings without touching the environment
19//!
20//! ```rust
21//! use crabka_telemetry::{OtlpConfig, OtlpProtocol};
22//!
23//! let cfg = OtlpConfig::from_env(
24//!     |key| (key == "CRABKA_OTLP_ENABLED").then(|| "true".to_string()),
25//!     "broker-1",
26//!     "0.1.0",
27//!     "crabka-broker",
28//! )
29//! .unwrap();
30//!
31//! assert_eq!(cfg.protocol, OtlpProtocol::Grpc);
32//! assert_eq!(cfg.endpoint, "http://localhost:4317");
33//! ```
34
35#![forbid(unsafe_code)]
36
37use std::time::Duration;
38
39use opentelemetry::KeyValue;
40use opentelemetry::trace::TracerProvider as _;
41use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
42use opentelemetry_sdk::Resource;
43use opentelemetry_sdk::propagation::TraceContextPropagator;
44use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider};
45use tracing_subscriber::layer::SubscriberExt as _;
46use tracing_subscriber::util::SubscriberInitExt as _;
47use tracing_subscriber::{EnvFilter, Layer as _};
48
49/// Errors building the OTLP pipeline. Carries the underlying exporter
50/// build failure so a misconfigured endpoint surfaces a clear message
51/// rather than a silent no-export.
52#[derive(Debug, thiserror::Error)]
53pub enum TelemetryError {
54    #[error("failed to build OTLP span exporter: {0}")]
55    Exporter(#[from] opentelemetry_otlp::ExporterBuildError),
56}
57
58/// OTLP transport. Mirrors the `OTEL_EXPORTER_OTLP_PROTOCOL` spec values.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum OtlpProtocol {
61    /// gRPC (OTLP/gRPC), default collector port `4317`.
62    Grpc,
63    /// HTTP with protobuf payloads (OTLP/HTTP), default collector port `4318`.
64    HttpProtobuf,
65}
66
67impl OtlpProtocol {
68    /// Parse an `OTEL_EXPORTER_OTLP_PROTOCOL`-style value. Unknown /
69    /// unsupported values fall back to gRPC (the SDK's transport default).
70    #[must_use]
71    pub fn parse(s: &str) -> Self {
72        match s.trim().to_ascii_lowercase().as_str() {
73            "http/protobuf" | "http" | "httpbinary" | "http-protobuf" => Self::HttpProtobuf,
74            _ => Self::Grpc,
75        }
76    }
77
78    #[must_use]
79    pub fn default_endpoint(self) -> &'static str {
80        match self {
81            Self::Grpc => "http://localhost:4317",
82            Self::HttpProtobuf => "http://localhost:4318",
83        }
84    }
85}
86
87/// Resolved OTLP configuration. Built by [`OtlpConfig::from_env`]; `None`
88/// from that constructor means OTLP is disabled and no exporter is built.
89#[derive(Debug, Clone)]
90pub struct OtlpConfig {
91    pub endpoint: String,
92    pub protocol: OtlpProtocol,
93    /// Head sampling ratio in `[0.0, 1.0]`, wrapped in a parent-based
94    /// sampler so child spans honour an upstream sampling decision.
95    pub sample_ratio: f64,
96    pub service_name: String,
97    pub service_version: String,
98    pub service_instance_id: String,
99    pub timeout: Duration,
100}
101
102/// Truthy parse for `*_ENABLED` / `*_DISABLED` style env values.
103fn env_truthy(v: &str) -> bool {
104    matches!(
105        v.trim().to_ascii_lowercase().as_str(),
106        "1" | "true" | "yes" | "on"
107    )
108}
109
110impl OtlpConfig {
111    /// Resolve OTLP config from the environment. `get` is the env lookup
112    /// (injected so this is a pure, testable function);
113    /// `service_instance_id` is the service instance id (e.g. broker node id),
114    /// `service_version` the crate version, and `default_service_name` is the
115    /// fallback used when `OTEL_SERVICE_NAME` is not set.
116    ///
117    /// Returns `None` when OTLP is disabled — either nothing turned it on
118    /// or `OTEL_SDK_DISABLED` turned it off.
119    #[must_use]
120    pub fn from_env(
121        get: impl Fn(&str) -> Option<String>,
122        service_instance_id: &str,
123        service_version: &str,
124        default_service_name: &str,
125    ) -> Option<Self> {
126        if get("OTEL_SDK_DISABLED").as_deref().is_some_and(env_truthy) {
127            return None;
128        }
129
130        let endpoint_override = get("CRABKA_OTLP_ENDPOINT")
131            .or_else(|| get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"))
132            .or_else(|| get("OTEL_EXPORTER_OTLP_ENDPOINT"))
133            .map(|s| s.trim().to_owned())
134            .filter(|s| !s.is_empty());
135
136        let explicitly_enabled = get("CRABKA_OTLP_ENABLED")
137            .as_deref()
138            .is_some_and(env_truthy);
139
140        // Off unless something opts in.
141        if endpoint_override.is_none() && !explicitly_enabled {
142            return None;
143        }
144
145        let protocol = get("CRABKA_OTLP_PROTOCOL")
146            .or_else(|| get("OTEL_EXPORTER_OTLP_PROTOCOL"))
147            .map_or(OtlpProtocol::Grpc, |s| OtlpProtocol::parse(&s));
148
149        let endpoint = endpoint_override.unwrap_or_else(|| protocol.default_endpoint().to_owned());
150
151        let sample_ratio = get("CRABKA_OTLP_SAMPLE_RATIO")
152            .or_else(|| get("OTEL_TRACES_SAMPLER_ARG"))
153            .and_then(|s| s.trim().parse::<f64>().ok())
154            .map_or(1.0, |r| r.clamp(0.0, 1.0));
155
156        let service_name = get("OTEL_SERVICE_NAME")
157            .map(|s| s.trim().to_owned())
158            .filter(|s| !s.is_empty())
159            .unwrap_or_else(|| default_service_name.to_owned());
160
161        let timeout = get("CRABKA_OTLP_TIMEOUT_SECS")
162            .or_else(|| get("OTEL_EXPORTER_OTLP_TIMEOUT_SECS"))
163            .and_then(|s| s.trim().parse::<u64>().ok())
164            .map_or(Duration::from_secs(10), Duration::from_secs);
165
166        Some(Self {
167            endpoint,
168            protocol,
169            sample_ratio,
170            service_name,
171            service_version: service_version.to_owned(),
172            service_instance_id: service_instance_id.to_owned(),
173            timeout,
174        })
175    }
176
177    fn build_exporter(&self) -> Result<SpanExporter, TelemetryError> {
178        let builder = SpanExporter::builder();
179        let exporter = match self.protocol {
180            OtlpProtocol::Grpc => builder
181                .with_tonic()
182                .with_endpoint(self.endpoint.clone())
183                .with_timeout(self.timeout)
184                .build()?,
185            OtlpProtocol::HttpProtobuf => builder
186                .with_http()
187                .with_protocol(Protocol::HttpBinary)
188                .with_endpoint(self.endpoint.clone())
189                .with_timeout(self.timeout)
190                .build()?,
191        };
192        Ok(exporter)
193    }
194
195    fn resource(&self) -> Resource {
196        Resource::builder()
197            .with_service_name(self.service_name.clone())
198            .with_attributes([
199                KeyValue::new("service.version", self.service_version.clone()),
200                KeyValue::new("service.instance.id", self.service_instance_id.clone()),
201            ])
202            .build()
203    }
204
205    fn sampler(&self) -> Sampler {
206        Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(self.sample_ratio)))
207    }
208}
209
210/// Per-layer filter for the OTLP layer. Overridable with `CRABKA_OTLP_FILTER`
211/// for operators who want finer control; falls back to `default`.
212fn otel_filter(default: &str, get: impl Fn(&str) -> Option<String>) -> EnvFilter {
213    get("CRABKA_OTLP_FILTER")
214        .and_then(|s| EnvFilter::try_new(s).ok())
215        .unwrap_or_else(|| EnvFilter::new(default))
216}
217
218/// Owns the OTLP `SdkTracerProvider` so spans are flushed on shutdown.
219/// Dropping also flushes (the provider shuts down when its last clone
220/// drops), but call [`TelemetryGuard::shutdown`] explicitly before exit so
221/// the final batch is delivered before the process ends.
222#[must_use = "hold the guard for the process lifetime and call shutdown() before exit"]
223pub struct TelemetryGuard {
224    provider: Option<SdkTracerProvider>,
225}
226
227impl TelemetryGuard {
228    /// Flush and shut down the OTLP exporter. No-op when OTLP is disabled.
229    pub fn shutdown(self) {
230        if let Some(provider) = self.provider
231            && let Err(e) = provider.shutdown()
232        {
233            tracing::warn!(error = %e, "OTLP tracer provider shutdown error");
234        }
235    }
236}
237
238/// Install the global `tracing` subscriber: a stdout `fmt` layer plus,
239/// when `otlp` is `Some`, a batch OTLP export layer.
240///
241/// - `fmt_default_filter`: the `fmt` layer's filter when `RUST_LOG` is unset.
242/// - `otel_default_filter`: the OTLP layer's filter when `CRABKA_OTLP_FILTER`
243///   is unset.
244/// - `tracer_name`: the name passed to `TracerProvider::tracer(...)`.
245///
246/// Must be called exactly once, from within the tokio runtime (the
247/// gRPC exporter captures the current runtime handle).
248pub fn init(
249    otlp: Option<OtlpConfig>,
250    fmt_default_filter: &str,
251    otel_default_filter: &str,
252    tracer_name: &str,
253) -> Result<TelemetryGuard, TelemetryError> {
254    let fmt_filter =
255        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(fmt_default_filter));
256    let fmt_layer = tracing_subscriber::fmt::layer().with_filter(fmt_filter);
257
258    let Some(cfg) = otlp else {
259        tracing_subscriber::registry().with(fmt_layer).init();
260        return Ok(TelemetryGuard { provider: None });
261    };
262
263    let exporter = cfg.build_exporter()?;
264    let provider = SdkTracerProvider::builder()
265        .with_batch_exporter(exporter)
266        .with_resource(cfg.resource())
267        .with_sampler(cfg.sampler())
268        .build();
269    let tracer = provider.tracer(tracer_name.to_owned());
270
271    opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
272    opentelemetry::global::set_tracer_provider(provider.clone());
273
274    let otel_layer = tracing_opentelemetry::layer()
275        .with_tracer(tracer)
276        .with_location(false)
277        .with_filter(otel_filter(otel_default_filter, |k| std::env::var(k).ok()));
278
279    tracing_subscriber::registry()
280        .with(fmt_layer)
281        .with(otel_layer)
282        .init();
283
284    tracing::info!(
285        endpoint = %cfg.endpoint,
286        protocol = ?cfg.protocol,
287        sample_ratio = cfg.sample_ratio,
288        "OTLP distributed tracing enabled"
289    );
290
291    Ok(TelemetryGuard {
292        provider: Some(provider),
293    })
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use assert2::assert;
300
301    fn env_from<'a>(pairs: &'a [(&'a str, &'a str)]) -> impl Fn(&str) -> Option<String> + 'a {
302        move |k: &str| {
303            pairs
304                .iter()
305                .find(|(key, _)| *key == k)
306                .map(|(_, v)| (*v).to_owned())
307        }
308    }
309
310    #[test]
311    fn disabled_when_no_env() {
312        let cfg = OtlpConfig::from_env(env_from(&[]), "1", "0.1.1", "crabka-broker");
313        assert!(cfg.is_none());
314    }
315
316    #[test]
317    fn enabled_by_crabka_endpoint() {
318        let cfg = OtlpConfig::from_env(
319            env_from(&[("CRABKA_OTLP_ENDPOINT", "http://collector:4317")]),
320            "7",
321            "0.1.1",
322            "crabka-broker",
323        )
324        .expect("enabled");
325        assert!(cfg.endpoint == "http://collector:4317");
326        assert!(cfg.protocol == OtlpProtocol::Grpc);
327        assert!((cfg.sample_ratio - 1.0).abs() < f64::EPSILON);
328        assert!(cfg.service_name == "crabka-broker");
329        assert!(cfg.service_instance_id == "7");
330        assert!(cfg.service_version == "0.1.1");
331    }
332
333    #[test]
334    fn enabled_flag_uses_protocol_default_endpoint() {
335        let cfg = OtlpConfig::from_env(
336            env_from(&[
337                ("CRABKA_OTLP_ENABLED", "true"),
338                ("CRABKA_OTLP_PROTOCOL", "http/protobuf"),
339            ]),
340            "1",
341            "0.1.1",
342            "crabka-broker",
343        )
344        .expect("enabled");
345        assert!(cfg.protocol == OtlpProtocol::HttpProtobuf);
346        assert!(cfg.endpoint == "http://localhost:4318");
347    }
348
349    #[test]
350    fn grpc_is_the_default_protocol() {
351        let cfg = OtlpConfig::from_env(
352            env_from(&[("CRABKA_OTLP_ENABLED", "1")]),
353            "1",
354            "0.1.1",
355            "crabka-broker",
356        )
357        .expect("enabled");
358        assert!(cfg.protocol == OtlpProtocol::Grpc);
359        assert!(cfg.endpoint == "http://localhost:4317");
360    }
361
362    #[test]
363    fn sdk_disabled_overrides_endpoint() {
364        let cfg = OtlpConfig::from_env(
365            env_from(&[
366                ("CRABKA_OTLP_ENDPOINT", "http://collector:4317"),
367                ("OTEL_SDK_DISABLED", "true"),
368            ]),
369            "1",
370            "0.1.1",
371            "crabka-broker",
372        );
373        assert!(cfg.is_none());
374    }
375
376    #[test]
377    fn endpoint_precedence_and_standard_vars() {
378        // Standard OTLP env (no CRABKA_ override) still enables export.
379        let cfg = OtlpConfig::from_env(
380            env_from(&[("OTEL_EXPORTER_OTLP_ENDPOINT", "http://otel:4317")]),
381            "1",
382            "0.1.1",
383            "crabka-broker",
384        )
385        .expect("enabled");
386        assert!(cfg.endpoint == "http://otel:4317");
387
388        // Traces-specific endpoint wins over the generic one.
389        let cfg = OtlpConfig::from_env(
390            env_from(&[
391                ("OTEL_EXPORTER_OTLP_ENDPOINT", "http://generic:4317"),
392                ("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "http://traces:4317"),
393            ]),
394            "1",
395            "0.1.1",
396            "crabka-broker",
397        )
398        .expect("enabled");
399        assert!(cfg.endpoint == "http://traces:4317");
400
401        // CRABKA override wins over everything.
402        let cfg = OtlpConfig::from_env(
403            env_from(&[
404                ("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "http://traces:4317"),
405                ("CRABKA_OTLP_ENDPOINT", "http://crabka:4317"),
406            ]),
407            "1",
408            "0.1.1",
409            "crabka-broker",
410        )
411        .expect("enabled");
412        assert!(cfg.endpoint == "http://crabka:4317");
413    }
414
415    #[test]
416    fn sample_ratio_parsed_and_clamped() {
417        let cfg = OtlpConfig::from_env(
418            env_from(&[
419                ("CRABKA_OTLP_ENDPOINT", "http://c:4317"),
420                ("CRABKA_OTLP_SAMPLE_RATIO", "0.25"),
421            ]),
422            "1",
423            "0.1.1",
424            "crabka-broker",
425        )
426        .expect("enabled");
427        assert!((cfg.sample_ratio - 0.25).abs() < f64::EPSILON);
428
429        // Out-of-range clamps to [0,1].
430        let cfg = OtlpConfig::from_env(
431            env_from(&[
432                ("CRABKA_OTLP_ENABLED", "true"),
433                ("CRABKA_OTLP_SAMPLE_RATIO", "9.0"),
434            ]),
435            "1",
436            "0.1.1",
437            "crabka-broker",
438        )
439        .expect("enabled");
440        assert!((cfg.sample_ratio - 1.0).abs() < f64::EPSILON);
441    }
442
443    #[test]
444    fn service_name_and_timeout_overrides() {
445        let cfg = OtlpConfig::from_env(
446            env_from(&[
447                ("CRABKA_OTLP_ENDPOINT", "http://c:4317"),
448                ("OTEL_SERVICE_NAME", "my-kafka"),
449                ("CRABKA_OTLP_TIMEOUT_SECS", "3"),
450            ]),
451            "9",
452            "0.1.1",
453            "crabka-broker",
454        )
455        .expect("enabled");
456        assert!(cfg.service_name == "my-kafka");
457        assert!(cfg.timeout == Duration::from_secs(3));
458    }
459
460    #[test]
461    fn protocol_parse_variants() {
462        assert!(OtlpProtocol::parse("grpc") == OtlpProtocol::Grpc);
463        assert!(OtlpProtocol::parse("http/protobuf") == OtlpProtocol::HttpProtobuf);
464        assert!(OtlpProtocol::parse("HTTP") == OtlpProtocol::HttpProtobuf);
465        assert!(OtlpProtocol::parse("nonsense") == OtlpProtocol::Grpc);
466    }
467}