1#[cfg(feature = "otel")]
7use anyhow::Context;
8#[cfg(feature = "otel")]
9use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
10
11#[cfg(feature = "otel")]
12use opentelemetry_otlp::{Protocol, WithExportConfig};
13#[cfg(feature = "otel")]
15use opentelemetry_otlp::{WithHttpConfig, WithTonicConfig};
16
17#[cfg(feature = "otel")]
18use opentelemetry_sdk::{
19 Resource,
20 propagation::TraceContextPropagator,
21 trace::{Sampler, SdkTracerProvider},
22};
23
24use super::config::MetricsConfig;
25#[cfg(feature = "otel")]
26use super::config::TracingConfig;
27#[cfg(feature = "otel")]
28use crate::telemetry::config::ExporterKind;
29#[cfg(feature = "otel")]
30use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
31
32#[cfg(feature = "otel")]
36pub(crate) fn build_resource(cfg: &TracingConfig) -> Resource {
37 let service_name = cfg.service_name.as_deref().unwrap_or("hyperspot");
38 let mut attrs = vec![KeyValue::new("service.name", service_name.to_owned())];
39
40 if let Some(resource_map) = &cfg.resource {
41 for (k, v) in resource_map {
42 attrs.push(KeyValue::new(k.clone(), v.clone()));
43 }
44 }
45
46 Resource::builder_empty().with_attributes(attrs).build()
47}
48
49#[cfg(feature = "otel")]
51fn build_sampler(cfg: &TracingConfig) -> Sampler {
52 match cfg.sampler.as_ref() {
53 Some(crate::telemetry::config::Sampler::AlwaysOff { .. }) => Sampler::AlwaysOff,
54 Some(crate::telemetry::config::Sampler::AlwaysOn { .. }) => Sampler::AlwaysOn,
55 Some(crate::telemetry::config::Sampler::ParentBasedAlwaysOn { .. }) => {
56 Sampler::ParentBased(Box::new(Sampler::AlwaysOn))
57 }
58 Some(crate::telemetry::config::Sampler::ParentBasedRatio { ratio }) => {
59 let ratio = ratio.unwrap_or(0.1);
60 Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(ratio)))
61 }
62 None => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
63 }
64}
65
66#[cfg(feature = "otel")]
68fn extract_exporter_config(
69 cfg: &TracingConfig,
70) -> (ExporterKind, String, Option<std::time::Duration>) {
71 let (kind, endpoint) = cfg.exporter.as_ref().map_or_else(
72 || (ExporterKind::OtlpGrpc, "http://127.0.0.1:4317".into()),
73 |e| {
74 (
75 e.kind,
76 e.endpoint
77 .clone()
78 .unwrap_or_else(|| "http://127.0.0.1:4317".into()),
79 )
80 },
81 );
82
83 let timeout = cfg
84 .exporter
85 .as_ref()
86 .and_then(|e| e.timeout_ms)
87 .map(std::time::Duration::from_millis);
88
89 (kind, endpoint, timeout)
90}
91
92#[cfg(feature = "otel")]
94fn build_http_exporter(
95 cfg: &TracingConfig,
96 endpoint: String,
97 timeout: Option<std::time::Duration>,
98) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
99 let mut b = opentelemetry_otlp::SpanExporter::builder()
100 .with_http()
101 .with_protocol(Protocol::HttpBinary)
102 .with_endpoint(endpoint);
103 if let Some(t) = timeout {
104 b = b.with_timeout(t);
105 }
106 if let Some(hmap) = build_headers_from_cfg_and_env(cfg.exporter.as_ref()) {
107 b = b.with_headers(hmap);
108 }
109 #[allow(clippy::expect_used)]
110 b.build().context("build OTLP HTTP exporter")
111}
112
113#[cfg(feature = "otel")]
115fn build_grpc_exporter(
116 cfg: &TracingConfig,
117 endpoint: String,
118 timeout: Option<std::time::Duration>,
119) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
120 let mut b = opentelemetry_otlp::SpanExporter::builder()
121 .with_tonic()
122 .with_endpoint(endpoint);
123 if let Some(t) = timeout {
124 b = b.with_timeout(t);
125 }
126 if let Some(md) = build_metadata_from_cfg_and_env(cfg.exporter.as_ref()) {
127 b = b.with_metadata(md);
128 }
129 b.build().context("build OTLP gRPC exporter")
130}
131
132#[cfg(feature = "otel")]
138pub fn init_tracing(
139 cfg: &TracingConfig,
140) -> anyhow::Result<
141 tracing_opentelemetry::OpenTelemetryLayer<
142 tracing_subscriber::Registry,
143 opentelemetry_sdk::trace::Tracer,
144 >,
145> {
146 if !cfg.enabled {
147 return Err(anyhow::anyhow!("tracing is disabled"));
148 }
149
150 global::set_text_map_propagator(TraceContextPropagator::new());
152
153 let service_name = cfg.service_name.as_deref().unwrap_or("hyperspot");
154 tracing::info!("Building OpenTelemetry layer for service: {}", service_name);
155
156 let resource = build_resource(cfg);
158 let sampler = build_sampler(cfg);
159 let (kind, endpoint, timeout) = extract_exporter_config(cfg);
160
161 tracing::info!(kind = ?kind, %endpoint, "OTLP exporter config");
162
163 let exporter = if matches!(kind, ExporterKind::OtlpHttp) {
165 build_http_exporter(cfg, endpoint, timeout)
166 } else {
167 build_grpc_exporter(cfg, endpoint, timeout)
168 }?;
169
170 let provider = SdkTracerProvider::builder()
172 .with_batch_exporter(exporter)
173 .with_sampler(sampler)
174 .with_resource(resource)
175 .build();
176
177 global::set_tracer_provider(provider.clone());
179
180 let tracer = provider.tracer("hyperspot");
182 let otel_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer);
183
184 tracing::info!("OpenTelemetry layer created successfully");
185 Ok(otel_layer)
186}
187
188#[cfg(feature = "otel")]
189pub(crate) fn build_headers_from_cfg_and_env(
190 exporter: Option<&crate::telemetry::config::Exporter>,
191) -> Option<std::collections::HashMap<String, String>> {
192 use std::collections::HashMap;
193 let mut out: HashMap<String, String> = HashMap::new();
194
195 if let Some(exp) = exporter
197 && let Some(hdrs) = &exp.headers
198 {
199 for (k, v) in hdrs {
200 out.insert(k.clone(), v.clone());
201 }
202 }
203
204 if let Ok(env_hdrs) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
206 for part in env_hdrs.split(',').map(str::trim).filter(|s| !s.is_empty()) {
207 if let Some((k, v)) = part.split_once('=') {
208 out.insert(k.trim().to_owned(), v.trim().to_owned());
209 }
210 }
211 }
212
213 if out.is_empty() { None } else { Some(out) }
214}
215
216#[cfg(feature = "otel")]
217pub(crate) fn extend_metadata_from_source<'a, I>(
218 md: &mut MetadataMap,
219 source: I,
220 context: &'static str,
221) where
222 I: Iterator<Item = (&'a str, &'a str)>,
223{
224 for (k, v) in source {
225 match MetadataKey::from_bytes(k.as_bytes()) {
226 Ok(key) => match MetadataValue::try_from(v) {
227 Ok(val) => {
228 md.insert(key, val);
229 }
230 Err(_) => {
231 tracing::warn!(header = %k, context, "Skipping invalid gRPC metadata value");
232 }
233 },
234 Err(_) => {
235 tracing::warn!(header = %k, context, "Skipping invalid gRPC metadata header name");
236 }
237 }
238 }
239}
240
241#[cfg(feature = "otel")]
242pub(crate) fn build_metadata_from_cfg_and_env(
243 exporter: Option<&crate::telemetry::config::Exporter>,
244) -> Option<MetadataMap> {
245 let mut md = MetadataMap::new();
246
247 if let Some(exp) = exporter
249 && let Some(hdrs) = &exp.headers
250 {
251 let iter = hdrs.iter().map(|(k, v)| (k.as_str(), v.as_str()));
252 extend_metadata_from_source(&mut md, iter, "config");
253 }
254
255 if let Ok(env_hdrs) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
257 let iter = env_hdrs.split(',').filter_map(|part| {
258 let part = part.trim();
259 if part.is_empty() {
260 None
261 } else {
262 part.split_once('=').map(|(k, v)| (k.trim(), v.trim()))
263 }
264 });
265 extend_metadata_from_source(&mut md, iter, "env");
266 }
267
268 if md.is_empty() { None } else { Some(md) }
269}
270
271#[cfg(not(feature = "otel"))]
274pub fn init_tracing(_cfg: &serde_json::Value) -> Option<()> {
275 tracing::info!("Tracing configuration provided but runtime feature is disabled");
276 None
277}
278
279#[cfg(feature = "otel")]
286pub fn shutdown_tracing() {
287 tracing::info!("Tracing shutdown: no-op (keep a provider handle to call `shutdown()`).");
288}
289
290#[cfg(not(feature = "otel"))]
291pub fn shutdown_tracing() {
292 tracing::info!("Tracing shutdown (no-op)");
293}
294
295#[cfg(feature = "otel")]
300pub fn shutdown_metrics() {
301 tracing::info!("Metrics shutdown: no-op (keep a provider handle to call `shutdown()`).");
302}
303
304#[cfg(not(feature = "otel"))]
305pub fn shutdown_metrics() {
306 tracing::info!("Metrics shutdown (no-op)");
307}
308
309#[cfg(feature = "otel")]
321#[allow(clippy::needless_pass_by_value)]
322pub fn init_metrics(
323 cfg: &MetricsConfig,
324 provider: opentelemetry_sdk::metrics::SdkMeterProvider,
325) -> anyhow::Result<()> {
326 if !cfg.enabled {
327 return Err(anyhow::anyhow!("metrics is disabled"));
328 }
329 global::set_meter_provider(provider);
330 tracing::info!("OpenTelemetry metrics initialized successfully");
331 Ok(())
332}
333
334#[cfg(not(feature = "otel"))]
339pub fn init_metrics(_cfg: &MetricsConfig, _provider: ()) -> anyhow::Result<()> {
340 tracing::info!("Metrics configuration provided but runtime feature is disabled");
341 Err(anyhow::anyhow!("otel feature is disabled"))
342}
343
344#[cfg(feature = "otel")]
352pub fn otel_connectivity_probe(cfg: &super::config::TracingConfig) -> anyhow::Result<()> {
353 use opentelemetry::trace::{Span, Tracer as _};
354
355 let service_name = cfg
356 .service_name
357 .clone()
358 .unwrap_or_else(|| "hyperspot".into());
359
360 let (kind, endpoint) = cfg.exporter.as_ref().map_or_else(
361 || (ExporterKind::OtlpGrpc, "http://127.0.0.1:4317".into()),
362 |e| {
363 (
364 e.kind,
365 e.endpoint
366 .clone()
367 .unwrap_or_else(|| "http://127.0.0.1:4317".into()),
368 )
369 },
370 );
371
372 let resource = Resource::builder_empty()
374 .with_attributes([KeyValue::new("service.name", service_name)])
375 .build();
376
377 let exporter = if matches!(kind, ExporterKind::OtlpHttp) {
379 let mut b = opentelemetry_otlp::SpanExporter::builder()
380 .with_http()
381 .with_protocol(Protocol::HttpBinary)
382 .with_endpoint(endpoint);
383 if let Some(h) = build_headers_from_cfg_and_env(cfg.exporter.as_ref()) {
384 b = b.with_headers(h);
385 }
386 b.build()
387 .map_err(|e| anyhow::anyhow!("otlp http exporter build failed: {e}"))?
388 } else {
389 let mut b = opentelemetry_otlp::SpanExporter::builder()
390 .with_tonic()
391 .with_endpoint(endpoint);
392 if let Some(md) = build_metadata_from_cfg_and_env(cfg.exporter.as_ref()) {
393 b = b.with_metadata(md);
394 }
395 b.build()
396 .map_err(|e| anyhow::anyhow!("otlp grpc exporter build failed: {e}"))?
397 };
398
399 let provider = SdkTracerProvider::builder()
401 .with_simple_exporter(exporter)
402 .with_resource(resource)
403 .build();
404
405 let tracer = provider.tracer("connectivity_probe");
407 let mut span = tracer.start("otel_connectivity_probe");
408 span.end();
409
410 if let Err(e) = provider.force_flush() {
412 tracing::warn!(error = %e, "force_flush failed during OTLP connectivity probe");
413 }
414
415 provider
416 .shutdown()
417 .map_err(|e| anyhow::anyhow!("shutdown failed: {e}"))?;
418
419 tracing::info!(kind = ?kind, "OTLP connectivity probe exported a test span");
420 Ok(())
421}
422
423#[cfg(not(feature = "otel"))]
428pub fn otel_connectivity_probe(_cfg: &serde_json::Value) -> anyhow::Result<()> {
429 tracing::info!("OTLP connectivity probe skipped (otel feature disabled)");
430 Ok(())
431}
432
433#[cfg(test)]
436#[cfg_attr(coverage_nightly, coverage(off))]
437mod tests {
438 use super::*;
439 use crate::telemetry::config::{Exporter, ExporterKind, Sampler, TracingConfig};
440 use std::collections::HashMap;
441
442 #[test]
443 #[cfg(feature = "otel")]
444 fn test_init_tracing_disabled() {
445 let cfg = TracingConfig {
446 enabled: false,
447 ..Default::default()
448 };
449
450 let result = init_tracing(&cfg);
451 assert!(result.is_err());
452 }
453
454 #[tokio::test]
455 #[cfg(feature = "otel")]
456 async fn test_init_tracing_enabled() {
457 let cfg = TracingConfig {
458 enabled: true,
459 service_name: Some("test-service".to_owned()),
460 ..Default::default()
461 };
462
463 let result = init_tracing(&cfg);
464 assert!(result.is_ok());
465 }
466
467 #[test]
468 #[cfg(feature = "otel")]
469 fn test_init_tracing_with_resource_attributes() {
470 let rt = tokio::runtime::Runtime::new().unwrap();
471 let _guard = rt.enter();
472
473 let mut resource_map = HashMap::new();
474 resource_map.insert("service.version".to_owned(), "1.0.0".to_owned());
475 resource_map.insert("deployment.environment".to_owned(), "test".to_owned());
476
477 let cfg = TracingConfig {
478 enabled: true,
479 service_name: Some("test-service".to_owned()),
480 resource: Some(resource_map),
481 ..Default::default()
482 };
483
484 let result = init_tracing(&cfg);
485 assert!(result.is_ok());
486 }
487
488 #[test]
489 #[cfg(feature = "otel")]
490 fn test_init_tracing_with_always_on_sampler() {
491 let rt = tokio::runtime::Runtime::new().unwrap();
492 let _guard = rt.enter();
493
494 let cfg = TracingConfig {
495 enabled: true,
496 service_name: Some("test-service".to_owned()),
497 sampler: Some(Sampler::AlwaysOn {}),
498 ..Default::default()
499 };
500
501 let result = init_tracing(&cfg);
502 assert!(result.is_ok());
503 }
504
505 #[test]
506 #[cfg(feature = "otel")]
507 fn test_init_tracing_with_always_off_sampler() {
508 let rt = tokio::runtime::Runtime::new().unwrap();
509 let _guard = rt.enter();
510
511 let cfg = TracingConfig {
512 enabled: true,
513 service_name: Some("test-service".to_owned()),
514 sampler: Some(Sampler::AlwaysOff {}),
515 ..Default::default()
516 };
517
518 let result = init_tracing(&cfg);
519 assert!(result.is_ok());
520 }
521
522 #[test]
523 #[cfg(feature = "otel")]
524 fn test_init_tracing_with_ratio_sampler() {
525 let rt = tokio::runtime::Runtime::new().unwrap();
526 let _guard = rt.enter();
527
528 let cfg = TracingConfig {
529 enabled: true,
530 service_name: Some("test-service".to_owned()),
531 sampler: Some(Sampler::ParentBasedRatio { ratio: Some(0.5) }),
532 ..Default::default()
533 };
534
535 let result = init_tracing(&cfg);
536 assert!(result.is_ok());
537 }
538
539 #[test]
540 #[cfg(feature = "otel")]
541 fn test_init_tracing_with_http_exporter() {
542 let _rt = tokio::runtime::Runtime::new().unwrap();
543
544 let cfg = TracingConfig {
545 enabled: true,
546 service_name: Some("test-service".to_owned()),
547 exporter: Some(Exporter {
548 kind: ExporterKind::OtlpHttp,
549 endpoint: Some("http://localhost:4318".to_owned()),
550 headers: None,
551 timeout_ms: Some(5000),
552 }),
553 ..Default::default()
554 };
555
556 let result = init_tracing(&cfg);
557 assert!(result.is_ok());
558 }
559
560 #[test]
561 #[cfg(feature = "otel")]
562 fn test_init_tracing_with_grpc_exporter() {
563 let rt = tokio::runtime::Runtime::new().unwrap();
564 let _guard = rt.enter();
565
566 let cfg = TracingConfig {
567 enabled: true,
568 service_name: Some("test-service".to_owned()),
569 exporter: Some(Exporter {
570 kind: ExporterKind::OtlpGrpc,
571 endpoint: Some("http://localhost:4317".to_owned()),
572 headers: None,
573 timeout_ms: Some(5000),
574 }),
575 ..Default::default()
576 };
577
578 let result = init_tracing(&cfg);
579 assert!(result.is_ok());
580 }
581
582 #[test]
583 #[cfg(feature = "otel")]
584 fn test_build_headers_from_cfg_empty() {
585 let cfg = TracingConfig {
586 enabled: true,
587 ..Default::default()
588 };
589
590 let result = build_headers_from_cfg_and_env(cfg.exporter.as_ref());
591 assert!(result.is_none() || result.is_some());
594 }
595
596 #[test]
597 #[cfg(feature = "otel")]
598 fn test_build_headers_from_cfg_with_headers() {
599 let mut headers = HashMap::new();
600 headers.insert("authorization".to_owned(), "Bearer token".to_owned());
601
602 let cfg = TracingConfig {
603 enabled: true,
604 exporter: Some(Exporter {
605 kind: ExporterKind::OtlpHttp,
606 endpoint: Some("http://localhost:4318".to_owned()),
607 headers: Some(headers.clone()),
608 timeout_ms: None,
609 }),
610 ..Default::default()
611 };
612
613 let result = build_headers_from_cfg_and_env(cfg.exporter.as_ref());
614 assert!(result.is_some());
615 let result_headers = result.unwrap();
616 assert_eq!(
617 result_headers.get("authorization"),
618 Some(&"Bearer token".to_owned())
619 );
620 }
621
622 #[test]
623 #[cfg(feature = "otel")]
624 fn test_build_metadata_from_cfg_empty() {
625 let cfg = TracingConfig {
626 enabled: true,
627 ..Default::default()
628 };
629
630 let result = build_metadata_from_cfg_and_env(cfg.exporter.as_ref());
631 assert!(result.is_none() || result.is_some());
633 }
634
635 #[test]
636 #[cfg(feature = "otel")]
637 fn test_build_metadata_from_cfg_with_headers() {
638 let mut headers = HashMap::new();
639 headers.insert("authorization".to_owned(), "Bearer token".to_owned());
640
641 let cfg = TracingConfig {
642 enabled: true,
643 exporter: Some(Exporter {
644 kind: ExporterKind::OtlpGrpc,
645 endpoint: Some("http://localhost:4317".to_owned()),
646 headers: Some(headers.clone()),
647 timeout_ms: None,
648 }),
649 ..Default::default()
650 };
651
652 let result = build_metadata_from_cfg_and_env(cfg.exporter.as_ref());
653 assert!(result.is_some());
654 let metadata = result.unwrap();
655 assert!(!metadata.is_empty());
656 }
657
658 #[test]
659 #[cfg(feature = "otel")]
660 fn test_build_metadata_multiple_headers() {
661 let mut headers = HashMap::new();
662 headers.insert("authorization".to_owned(), "Bearer token".to_owned());
663 headers.insert("x-custom-header".to_owned(), "custom-value".to_owned());
664
665 let cfg = TracingConfig {
666 enabled: true,
667 exporter: Some(Exporter {
668 kind: ExporterKind::OtlpGrpc,
669 endpoint: Some("http://localhost:4317".to_owned()),
670 headers: Some(headers.clone()),
671 timeout_ms: None,
672 }),
673 ..Default::default()
674 };
675
676 let result = build_metadata_from_cfg_and_env(cfg.exporter.as_ref());
677 assert!(result.is_some());
678 let metadata = result.unwrap();
679 assert_eq!(metadata.len(), 2);
680 }
681
682 #[test]
683 #[cfg(feature = "otel")]
684 fn test_build_metadata_invalid_header_name_skipped() {
685 let mut headers = HashMap::new();
686 headers.insert("valid-header".to_owned(), "value1".to_owned());
687 headers.insert("invalid header with spaces".to_owned(), "value2".to_owned());
688
689 let cfg = TracingConfig {
690 enabled: true,
691 exporter: Some(Exporter {
692 kind: ExporterKind::OtlpGrpc,
693 endpoint: Some("http://localhost:4317".to_owned()),
694 headers: Some(headers.clone()),
695 timeout_ms: None,
696 }),
697 ..Default::default()
698 };
699
700 let result = build_metadata_from_cfg_and_env(cfg.exporter.as_ref());
701 assert!(result.is_some());
702 let metadata = result.unwrap();
703 assert_eq!(metadata.len(), 1);
705 }
706
707 #[test]
708 fn test_shutdown_tracing_does_not_panic() {
709 shutdown_tracing();
711 }
712
713 #[test]
714 #[cfg(feature = "otel")]
715 fn test_init_metrics_disabled() {
716 use crate::telemetry::config::MetricsConfig;
717 let cfg = MetricsConfig {
718 enabled: false,
719 exporter: None,
720 ..Default::default()
721 };
722 let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder().build();
723 let result = init_metrics(&cfg, provider);
724 assert!(result.is_err());
725 assert!(result.unwrap_err().to_string().contains("disabled"));
726 }
727
728 #[test]
729 #[cfg(feature = "otel")]
730 fn test_init_metrics_enabled() {
731 use crate::telemetry::config::MetricsConfig;
732 let cfg = MetricsConfig {
733 enabled: true,
734 exporter: None,
735 ..Default::default()
736 };
737 let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder().build();
738 let result = init_metrics(&cfg, provider);
739 assert!(result.is_ok());
740 }
741}