Skip to main content

modkit/telemetry/
init.rs

1//! OpenTelemetry tracing initialization utilities
2//!
3//! This module sets up OpenTelemetry tracing and exports spans via OTLP
4//! (gRPC or HTTP) to collectors such as Jaeger, Uptrace, or the `OTel` Collector.
5
6#[cfg(feature = "otel")]
7use anyhow::Context;
8#[cfg(feature = "otel")]
9use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
10
11#[cfg(feature = "otel")]
12use opentelemetry_otlp::{Protocol, WithExportConfig};
13// Bring extension traits into scope for builder methods like `.with_headers()` and `.with_metadata()`.
14#[cfg(feature = "otel")]
15use opentelemetry_otlp::{WithHttpConfig, WithTonicConfig};
16
17#[cfg(feature = "otel")]
18use opentelemetry_sdk::{
19    Resource,
20    propagation::TraceContextPropagator,
21    trace::{Sampler, SdkTracerProvider},
22};
23
24#[cfg(feature = "otel")]
25use super::config::TracingConfig;
26#[cfg(feature = "otel")]
27use crate::telemetry::config::ExporterKind;
28#[cfg(feature = "otel")]
29use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
30
31// ===== init_tracing (feature = "otel") ========================================
32
33/// Build resource with service name and custom attributes
34#[cfg(feature = "otel")]
35fn build_resource(cfg: &TracingConfig) -> Resource {
36    let service_name = cfg.service_name.as_deref().unwrap_or("hyperspot");
37    let mut attrs = vec![KeyValue::new("service.name", service_name.to_owned())];
38
39    if let Some(resource_map) = &cfg.resource {
40        for (k, v) in resource_map {
41            attrs.push(KeyValue::new(k.clone(), v.clone()));
42        }
43    }
44
45    Resource::builder_empty().with_attributes(attrs).build()
46}
47
48/// Build sampler from configuration
49#[cfg(feature = "otel")]
50fn build_sampler(cfg: &TracingConfig) -> Sampler {
51    match cfg.sampler.as_ref() {
52        Some(crate::telemetry::config::Sampler::AlwaysOff { .. }) => Sampler::AlwaysOff,
53        Some(crate::telemetry::config::Sampler::AlwaysOn { .. }) => Sampler::AlwaysOn,
54        Some(crate::telemetry::config::Sampler::ParentBasedAlwaysOn { .. }) => {
55            Sampler::ParentBased(Box::new(Sampler::AlwaysOn))
56        }
57        Some(crate::telemetry::config::Sampler::ParentBasedRatio { ratio }) => {
58            let ratio = ratio.unwrap_or(0.1);
59            Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(ratio)))
60        }
61        None => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
62    }
63}
64
65/// Extract exporter kind and endpoint from configuration
66#[cfg(feature = "otel")]
67fn extract_exporter_config(
68    cfg: &TracingConfig,
69) -> (ExporterKind, String, Option<std::time::Duration>) {
70    let (kind, endpoint) = cfg.exporter.as_ref().map_or_else(
71        || (ExporterKind::OtlpGrpc, "http://127.0.0.1:4317".into()),
72        |e| {
73            (
74                e.kind,
75                e.endpoint
76                    .clone()
77                    .unwrap_or_else(|| "http://127.0.0.1:4317".into()),
78            )
79        },
80    );
81
82    let timeout = cfg
83        .exporter
84        .as_ref()
85        .and_then(|e| e.timeout_ms)
86        .map(std::time::Duration::from_millis);
87
88    (kind, endpoint, timeout)
89}
90
91/// Build HTTP OTLP exporter
92#[cfg(feature = "otel")]
93fn build_http_exporter(
94    cfg: &TracingConfig,
95    endpoint: String,
96    timeout: Option<std::time::Duration>,
97) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
98    let mut b = opentelemetry_otlp::SpanExporter::builder()
99        .with_http()
100        .with_protocol(Protocol::HttpBinary)
101        .with_endpoint(endpoint);
102    if let Some(t) = timeout {
103        b = b.with_timeout(t);
104    }
105    if let Some(hmap) = build_headers_from_cfg_and_env(cfg) {
106        b = b.with_headers(hmap);
107    }
108    #[allow(clippy::expect_used)]
109    b.build().context("build OTLP HTTP exporter")
110}
111
112/// Build gRPC OTLP exporter
113#[cfg(feature = "otel")]
114fn build_grpc_exporter(
115    cfg: &TracingConfig,
116    endpoint: String,
117    timeout: Option<std::time::Duration>,
118) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
119    let mut b = opentelemetry_otlp::SpanExporter::builder()
120        .with_tonic()
121        .with_endpoint(endpoint);
122    if let Some(t) = timeout {
123        b = b.with_timeout(t);
124    }
125    if let Some(md) = build_metadata_from_cfg_and_env(cfg) {
126        b = b.with_metadata(md);
127    }
128    b.build().context("build OTLP gRPC exporter")
129}
130
131/// Initialize OpenTelemetry tracing from configuration and return a layer
132/// to be attached to `tracing_subscriber`.
133///
134/// # Errors
135/// Returns an error if the configuration is invalid or if the exporter fails to build.
136#[cfg(feature = "otel")]
137pub fn init_tracing(
138    cfg: &TracingConfig,
139) -> anyhow::Result<
140    tracing_opentelemetry::OpenTelemetryLayer<
141        tracing_subscriber::Registry,
142        opentelemetry_sdk::trace::Tracer,
143    >,
144> {
145    if !cfg.enabled {
146        return Err(anyhow::anyhow!("tracing is disabled"));
147    }
148
149    // Set W3C propagator for trace-context propagation
150    global::set_text_map_propagator(TraceContextPropagator::new());
151
152    let service_name = cfg.service_name.as_deref().unwrap_or("hyperspot");
153    tracing::info!("Building OpenTelemetry layer for service: {}", service_name);
154
155    // Build resource, sampler, and extract exporter config
156    let resource = build_resource(cfg);
157    let sampler = build_sampler(cfg);
158    let (kind, endpoint, timeout) = extract_exporter_config(cfg);
159
160    tracing::info!(kind = ?kind, %endpoint, "OTLP exporter config");
161
162    // Build span exporter based on kind
163    let exporter = if matches!(kind, ExporterKind::OtlpHttp) {
164        build_http_exporter(cfg, endpoint, timeout)
165    } else {
166        build_grpc_exporter(cfg, endpoint, timeout)
167    }?;
168
169    // Build tracer provider with batch processor
170    let provider = SdkTracerProvider::builder()
171        .with_batch_exporter(exporter)
172        .with_sampler(sampler)
173        .with_resource(resource)
174        .build();
175
176    // Make it global
177    global::set_tracer_provider(provider.clone());
178
179    // Create tracer and layer
180    let tracer = provider.tracer("hyperspot");
181    let otel_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer);
182
183    tracing::info!("OpenTelemetry layer created successfully");
184    Ok(otel_layer)
185}
186
187#[cfg(feature = "otel")]
188fn build_headers_from_cfg_and_env(
189    cfg: &TracingConfig,
190) -> Option<std::collections::HashMap<String, String>> {
191    use std::collections::HashMap;
192    let mut out: HashMap<String, String> = HashMap::new();
193
194    // From config file
195    if let Some(exp) = &cfg.exporter
196        && let Some(hdrs) = &exp.headers
197    {
198        for (k, v) in hdrs {
199            out.insert(k.clone(), v.clone());
200        }
201    }
202
203    // From ENV OTEL_EXPORTER_OTLP_HEADERS (format: k=v,k2=v2)
204    if let Ok(env_hdrs) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
205        for part in env_hdrs.split(',').map(str::trim).filter(|s| !s.is_empty()) {
206            if let Some((k, v)) = part.split_once('=') {
207                out.insert(k.trim().to_owned(), v.trim().to_owned());
208            }
209        }
210    }
211
212    if out.is_empty() { None } else { Some(out) }
213}
214
215#[cfg(feature = "otel")]
216fn extend_metadata_from_source<'a, I>(md: &mut MetadataMap, source: I, context: &'static str)
217where
218    I: Iterator<Item = (&'a str, &'a str)>,
219{
220    for (k, v) in source {
221        match MetadataKey::from_bytes(k.as_bytes()) {
222            Ok(key) => match MetadataValue::try_from(v) {
223                Ok(val) => {
224                    md.insert(key, val);
225                }
226                Err(_) => {
227                    tracing::warn!(header = %k, context, "Skipping invalid gRPC metadata value");
228                }
229            },
230            Err(_) => {
231                tracing::warn!(header = %k, context, "Skipping invalid gRPC metadata header name");
232            }
233        }
234    }
235}
236
237#[cfg(feature = "otel")]
238fn build_metadata_from_cfg_and_env(cfg: &TracingConfig) -> Option<MetadataMap> {
239    let mut md = MetadataMap::new();
240
241    // From config file
242    if let Some(exp) = &cfg.exporter
243        && let Some(hdrs) = &exp.headers
244    {
245        let iter = hdrs.iter().map(|(k, v)| (k.as_str(), v.as_str()));
246        extend_metadata_from_source(&mut md, iter, "config");
247    }
248
249    // From ENV OTEL_EXPORTER_OTLP_HEADERS (format: k=v,k2=v2)
250    if let Ok(env_hdrs) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
251        let iter = env_hdrs.split(',').filter_map(|part| {
252            let part = part.trim();
253            if part.is_empty() {
254                None
255            } else {
256                part.split_once('=').map(|(k, v)| (k.trim(), v.trim()))
257            }
258        });
259        extend_metadata_from_source(&mut md, iter, "env");
260    }
261
262    if md.is_empty() { None } else { Some(md) }
263}
264
265// ===== init_tracing (feature disabled) ========================================
266
267#[cfg(not(feature = "otel"))]
268pub fn init_tracing(_cfg: &serde_json::Value) -> Option<()> {
269    tracing::info!("Tracing configuration provided but runtime feature is disabled");
270    None
271}
272
273// ===== shutdown_tracing =======================================================
274
275/// Gracefully shut down OpenTelemetry tracing.
276/// In opentelemetry 0.31 there is no global `shutdown_tracer_provider()`.
277/// Keep a handle to `SdkTracerProvider` in your app state and call `shutdown()`
278/// during graceful shutdown. This function remains a no-op for compatibility.
279#[cfg(feature = "otel")]
280pub fn shutdown_tracing() {
281    tracing::info!("Tracing shutdown: no-op (keep a provider handle to call `shutdown()`).");
282}
283
284#[cfg(not(feature = "otel"))]
285pub fn shutdown_tracing() {
286    tracing::info!("Tracing shutdown (no-op)");
287}
288
289// ===== connectivity probe =====================================================
290
291/// Build a tiny, separate OTLP pipeline and export a single span to verify connectivity.
292/// This does *not* depend on `tracing_subscriber`; it uses SDK directly.
293///
294/// # Errors
295/// Returns an error if the OTLP exporter cannot be built or the probe fails.
296#[cfg(feature = "otel")]
297pub fn otel_connectivity_probe(cfg: &super::config::TracingConfig) -> anyhow::Result<()> {
298    use opentelemetry::trace::{Span, Tracer as _};
299
300    let service_name = cfg
301        .service_name
302        .clone()
303        .unwrap_or_else(|| "hyperspot".into());
304
305    let (kind, endpoint) = cfg.exporter.as_ref().map_or_else(
306        || (ExporterKind::OtlpGrpc, "http://127.0.0.1:4317".into()),
307        |e| {
308            (
309                e.kind,
310                e.endpoint
311                    .clone()
312                    .unwrap_or_else(|| "http://127.0.0.1:4317".into()),
313            )
314        },
315    );
316
317    // Resource
318    let resource = Resource::builder_empty()
319        .with_attributes([KeyValue::new("service.name", service_name)])
320        .build();
321
322    // Exporter (type-state branches again)
323    let exporter = if matches!(kind, ExporterKind::OtlpHttp) {
324        let mut b = opentelemetry_otlp::SpanExporter::builder()
325            .with_http()
326            .with_protocol(Protocol::HttpBinary)
327            .with_endpoint(endpoint);
328        if let Some(h) = build_headers_from_cfg_and_env(cfg) {
329            b = b.with_headers(h);
330        }
331        b.build()
332            .map_err(|e| anyhow::anyhow!("otlp http exporter build failed: {e}"))?
333    } else {
334        let mut b = opentelemetry_otlp::SpanExporter::builder()
335            .with_tonic()
336            .with_endpoint(endpoint);
337        if let Some(md) = build_metadata_from_cfg_and_env(cfg) {
338            b = b.with_metadata(md);
339        }
340        b.build()
341            .map_err(|e| anyhow::anyhow!("otlp grpc exporter build failed: {e}"))?
342    };
343
344    // Provider (simple processor is fine for a probe)
345    let provider = SdkTracerProvider::builder()
346        .with_simple_exporter(exporter)
347        .with_resource(resource)
348        .build();
349
350    // Emit a single span
351    let tracer = provider.tracer("connectivity_probe");
352    let mut span = tracer.start("otel_connectivity_probe");
353    span.end();
354
355    // Ensure delivery
356    if let Err(e) = provider.force_flush() {
357        tracing::warn!(error = %e, "force_flush failed during OTLP connectivity probe");
358    }
359
360    provider
361        .shutdown()
362        .map_err(|e| anyhow::anyhow!("shutdown failed: {e}"))?;
363
364    tracing::info!(kind = ?kind, "OTLP connectivity probe exported a test span");
365    Ok(())
366}
367
368/// OTLP connectivity probe (no-op when otel feature is disabled).
369///
370/// # Errors
371/// This function always succeeds when the otel feature is disabled.
372#[cfg(not(feature = "otel"))]
373pub fn otel_connectivity_probe(_cfg: &serde_json::Value) -> anyhow::Result<()> {
374    tracing::info!("OTLP connectivity probe skipped (otel feature disabled)");
375    Ok(())
376}
377
378// ===== tests ==================================================================
379
380#[cfg(test)]
381#[cfg_attr(coverage_nightly, coverage(off))]
382mod tests {
383    use super::*;
384    use crate::telemetry::config::{Exporter, ExporterKind, Sampler, TracingConfig};
385    use std::collections::HashMap;
386
387    #[test]
388    #[cfg(feature = "otel")]
389    fn test_init_tracing_disabled() {
390        let cfg = TracingConfig {
391            enabled: false,
392            ..Default::default()
393        };
394
395        let result = init_tracing(&cfg);
396        assert!(result.is_err());
397    }
398
399    #[tokio::test]
400    #[cfg(feature = "otel")]
401    async fn test_init_tracing_enabled() {
402        let cfg = TracingConfig {
403            enabled: true,
404            service_name: Some("test-service".to_owned()),
405            ..Default::default()
406        };
407
408        let result = init_tracing(&cfg);
409        assert!(result.is_ok());
410    }
411
412    #[test]
413    #[cfg(feature = "otel")]
414    fn test_init_tracing_with_resource_attributes() {
415        let rt = tokio::runtime::Runtime::new().unwrap();
416        let _guard = rt.enter();
417
418        let mut resource_map = HashMap::new();
419        resource_map.insert("service.version".to_owned(), "1.0.0".to_owned());
420        resource_map.insert("deployment.environment".to_owned(), "test".to_owned());
421
422        let cfg = TracingConfig {
423            enabled: true,
424            service_name: Some("test-service".to_owned()),
425            resource: Some(resource_map),
426            ..Default::default()
427        };
428
429        let result = init_tracing(&cfg);
430        assert!(result.is_ok());
431    }
432
433    #[test]
434    #[cfg(feature = "otel")]
435    fn test_init_tracing_with_always_on_sampler() {
436        let rt = tokio::runtime::Runtime::new().unwrap();
437        let _guard = rt.enter();
438
439        let cfg = TracingConfig {
440            enabled: true,
441            service_name: Some("test-service".to_owned()),
442            sampler: Some(Sampler::AlwaysOn {}),
443            ..Default::default()
444        };
445
446        let result = init_tracing(&cfg);
447        assert!(result.is_ok());
448    }
449
450    #[test]
451    #[cfg(feature = "otel")]
452    fn test_init_tracing_with_always_off_sampler() {
453        let rt = tokio::runtime::Runtime::new().unwrap();
454        let _guard = rt.enter();
455
456        let cfg = TracingConfig {
457            enabled: true,
458            service_name: Some("test-service".to_owned()),
459            sampler: Some(Sampler::AlwaysOff {}),
460            ..Default::default()
461        };
462
463        let result = init_tracing(&cfg);
464        assert!(result.is_ok());
465    }
466
467    #[test]
468    #[cfg(feature = "otel")]
469    fn test_init_tracing_with_ratio_sampler() {
470        let rt = tokio::runtime::Runtime::new().unwrap();
471        let _guard = rt.enter();
472
473        let cfg = TracingConfig {
474            enabled: true,
475            service_name: Some("test-service".to_owned()),
476            sampler: Some(Sampler::ParentBasedRatio { ratio: Some(0.5) }),
477            ..Default::default()
478        };
479
480        let result = init_tracing(&cfg);
481        assert!(result.is_ok());
482    }
483
484    #[test]
485    #[cfg(feature = "otel")]
486    fn test_init_tracing_with_http_exporter() {
487        let _rt = tokio::runtime::Runtime::new().unwrap();
488
489        let cfg = TracingConfig {
490            enabled: true,
491            service_name: Some("test-service".to_owned()),
492            exporter: Some(Exporter {
493                kind: ExporterKind::OtlpHttp,
494                endpoint: Some("http://localhost:4318".to_owned()),
495                headers: None,
496                timeout_ms: Some(5000),
497            }),
498            ..Default::default()
499        };
500
501        let result = init_tracing(&cfg);
502        assert!(result.is_ok());
503    }
504
505    #[test]
506    #[cfg(feature = "otel")]
507    fn test_init_tracing_with_grpc_exporter() {
508        let rt = tokio::runtime::Runtime::new().unwrap();
509        let _guard = rt.enter();
510
511        let cfg = TracingConfig {
512            enabled: true,
513            service_name: Some("test-service".to_owned()),
514            exporter: Some(Exporter {
515                kind: ExporterKind::OtlpGrpc,
516                endpoint: Some("http://localhost:4317".to_owned()),
517                headers: None,
518                timeout_ms: Some(5000),
519            }),
520            ..Default::default()
521        };
522
523        let result = init_tracing(&cfg);
524        assert!(result.is_ok());
525    }
526
527    #[test]
528    #[cfg(feature = "otel")]
529    fn test_build_headers_from_cfg_empty() {
530        let cfg = TracingConfig {
531            enabled: true,
532            ..Default::default()
533        };
534
535        let result = build_headers_from_cfg_and_env(&cfg);
536        // Should be None if no headers configured and no env var
537        // (unless OTEL_EXPORTER_OTLP_HEADERS is set, which we can't control in tests)
538        assert!(result.is_none() || result.is_some());
539    }
540
541    #[test]
542    #[cfg(feature = "otel")]
543    fn test_build_headers_from_cfg_with_headers() {
544        let mut headers = HashMap::new();
545        headers.insert("authorization".to_owned(), "Bearer token".to_owned());
546
547        let cfg = TracingConfig {
548            enabled: true,
549            exporter: Some(Exporter {
550                kind: ExporterKind::OtlpHttp,
551                endpoint: Some("http://localhost:4318".to_owned()),
552                headers: Some(headers.clone()),
553                timeout_ms: None,
554            }),
555            ..Default::default()
556        };
557
558        let result = build_headers_from_cfg_and_env(&cfg);
559        assert!(result.is_some());
560        let result_headers = result.unwrap();
561        assert_eq!(
562            result_headers.get("authorization"),
563            Some(&"Bearer token".to_owned())
564        );
565    }
566
567    #[test]
568    #[cfg(feature = "otel")]
569    fn test_build_metadata_from_cfg_empty() {
570        let cfg = TracingConfig {
571            enabled: true,
572            ..Default::default()
573        };
574
575        let result = build_metadata_from_cfg_and_env(&cfg);
576        // Should be None if no headers configured and no env var
577        assert!(result.is_none() || result.is_some());
578    }
579
580    #[test]
581    #[cfg(feature = "otel")]
582    fn test_build_metadata_from_cfg_with_headers() {
583        let mut headers = HashMap::new();
584        headers.insert("authorization".to_owned(), "Bearer token".to_owned());
585
586        let cfg = TracingConfig {
587            enabled: true,
588            exporter: Some(Exporter {
589                kind: ExporterKind::OtlpGrpc,
590                endpoint: Some("http://localhost:4317".to_owned()),
591                headers: Some(headers.clone()),
592                timeout_ms: None,
593            }),
594            ..Default::default()
595        };
596
597        let result = build_metadata_from_cfg_and_env(&cfg);
598        assert!(result.is_some());
599        let metadata = result.unwrap();
600        assert!(!metadata.is_empty());
601    }
602
603    #[test]
604    #[cfg(feature = "otel")]
605    fn test_build_metadata_multiple_headers() {
606        let mut headers = HashMap::new();
607        headers.insert("authorization".to_owned(), "Bearer token".to_owned());
608        headers.insert("x-custom-header".to_owned(), "custom-value".to_owned());
609
610        let cfg = TracingConfig {
611            enabled: true,
612            exporter: Some(Exporter {
613                kind: ExporterKind::OtlpGrpc,
614                endpoint: Some("http://localhost:4317".to_owned()),
615                headers: Some(headers.clone()),
616                timeout_ms: None,
617            }),
618            ..Default::default()
619        };
620
621        let result = build_metadata_from_cfg_and_env(&cfg);
622        assert!(result.is_some());
623        let metadata = result.unwrap();
624        assert_eq!(metadata.len(), 2);
625    }
626
627    #[test]
628    #[cfg(feature = "otel")]
629    fn test_build_metadata_invalid_header_name_skipped() {
630        let mut headers = HashMap::new();
631        headers.insert("valid-header".to_owned(), "value1".to_owned());
632        headers.insert("invalid header with spaces".to_owned(), "value2".to_owned());
633
634        let cfg = TracingConfig {
635            enabled: true,
636            exporter: Some(Exporter {
637                kind: ExporterKind::OtlpGrpc,
638                endpoint: Some("http://localhost:4317".to_owned()),
639                headers: Some(headers.clone()),
640                timeout_ms: None,
641            }),
642            ..Default::default()
643        };
644
645        let result = build_metadata_from_cfg_and_env(&cfg);
646        assert!(result.is_some());
647        let metadata = result.unwrap();
648        // Should only have the valid header
649        assert_eq!(metadata.len(), 1);
650    }
651
652    #[test]
653    fn test_shutdown_tracing_does_not_panic() {
654        // Should not panic regardless of feature state
655        shutdown_tracing();
656    }
657}