1#[cfg(feature = "otel")]
7use anyhow::Context;
8#[cfg(feature = "otel")]
9use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
10
11#[cfg(feature = "otel")]
12use opentelemetry_otlp::{Protocol, WithExportConfig};
13#[cfg(feature = "otel")]
15use opentelemetry_otlp::{WithHttpConfig, WithTonicConfig};
16
17#[cfg(feature = "otel")]
18use opentelemetry_sdk::{
19 Resource,
20 propagation::TraceContextPropagator,
21 trace::{Sampler, SdkTracerProvider},
22};
23
24#[cfg(feature = "otel")]
25use super::config::TracingConfig;
26#[cfg(feature = "otel")]
27use crate::telemetry::config::ExporterKind;
28#[cfg(feature = "otel")]
29use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
30
31#[cfg(feature = "otel")]
35fn build_resource(cfg: &TracingConfig) -> Resource {
36 let service_name = cfg.service_name.as_deref().unwrap_or("hyperspot");
37 let mut attrs = vec![KeyValue::new("service.name", service_name.to_owned())];
38
39 if let Some(resource_map) = &cfg.resource {
40 for (k, v) in resource_map {
41 attrs.push(KeyValue::new(k.clone(), v.clone()));
42 }
43 }
44
45 Resource::builder_empty().with_attributes(attrs).build()
46}
47
48#[cfg(feature = "otel")]
50fn build_sampler(cfg: &TracingConfig) -> Sampler {
51 match cfg.sampler.as_ref() {
52 Some(crate::telemetry::config::Sampler::AlwaysOff { .. }) => Sampler::AlwaysOff,
53 Some(crate::telemetry::config::Sampler::AlwaysOn { .. }) => Sampler::AlwaysOn,
54 Some(crate::telemetry::config::Sampler::ParentBasedAlwaysOn { .. }) => {
55 Sampler::ParentBased(Box::new(Sampler::AlwaysOn))
56 }
57 Some(crate::telemetry::config::Sampler::ParentBasedRatio { ratio }) => {
58 let ratio = ratio.unwrap_or(0.1);
59 Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(ratio)))
60 }
61 None => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
62 }
63}
64
65#[cfg(feature = "otel")]
67fn extract_exporter_config(
68 cfg: &TracingConfig,
69) -> (ExporterKind, String, Option<std::time::Duration>) {
70 let (kind, endpoint) = cfg.exporter.as_ref().map_or_else(
71 || (ExporterKind::OtlpGrpc, "http://127.0.0.1:4317".into()),
72 |e| {
73 (
74 e.kind,
75 e.endpoint
76 .clone()
77 .unwrap_or_else(|| "http://127.0.0.1:4317".into()),
78 )
79 },
80 );
81
82 let timeout = cfg
83 .exporter
84 .as_ref()
85 .and_then(|e| e.timeout_ms)
86 .map(std::time::Duration::from_millis);
87
88 (kind, endpoint, timeout)
89}
90
91#[cfg(feature = "otel")]
93fn build_http_exporter(
94 cfg: &TracingConfig,
95 endpoint: String,
96 timeout: Option<std::time::Duration>,
97) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
98 let mut b = opentelemetry_otlp::SpanExporter::builder()
99 .with_http()
100 .with_protocol(Protocol::HttpBinary)
101 .with_endpoint(endpoint);
102 if let Some(t) = timeout {
103 b = b.with_timeout(t);
104 }
105 if let Some(hmap) = build_headers_from_cfg_and_env(cfg) {
106 b = b.with_headers(hmap);
107 }
108 #[allow(clippy::expect_used)]
109 b.build().context("build OTLP HTTP exporter")
110}
111
112#[cfg(feature = "otel")]
114fn build_grpc_exporter(
115 cfg: &TracingConfig,
116 endpoint: String,
117 timeout: Option<std::time::Duration>,
118) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
119 let mut b = opentelemetry_otlp::SpanExporter::builder()
120 .with_tonic()
121 .with_endpoint(endpoint);
122 if let Some(t) = timeout {
123 b = b.with_timeout(t);
124 }
125 if let Some(md) = build_metadata_from_cfg_and_env(cfg) {
126 b = b.with_metadata(md);
127 }
128 b.build().context("build OTLP gRPC exporter")
129}
130
131#[cfg(feature = "otel")]
137pub fn init_tracing(
138 cfg: &TracingConfig,
139) -> anyhow::Result<
140 tracing_opentelemetry::OpenTelemetryLayer<
141 tracing_subscriber::Registry,
142 opentelemetry_sdk::trace::Tracer,
143 >,
144> {
145 if !cfg.enabled {
146 return Err(anyhow::anyhow!("tracing is disabled"));
147 }
148
149 global::set_text_map_propagator(TraceContextPropagator::new());
151
152 let service_name = cfg.service_name.as_deref().unwrap_or("hyperspot");
153 tracing::info!("Building OpenTelemetry layer for service: {}", service_name);
154
155 let resource = build_resource(cfg);
157 let sampler = build_sampler(cfg);
158 let (kind, endpoint, timeout) = extract_exporter_config(cfg);
159
160 tracing::info!(kind = ?kind, %endpoint, "OTLP exporter config");
161
162 let exporter = if matches!(kind, ExporterKind::OtlpHttp) {
164 build_http_exporter(cfg, endpoint, timeout)
165 } else {
166 build_grpc_exporter(cfg, endpoint, timeout)
167 }?;
168
169 let provider = SdkTracerProvider::builder()
171 .with_batch_exporter(exporter)
172 .with_sampler(sampler)
173 .with_resource(resource)
174 .build();
175
176 global::set_tracer_provider(provider.clone());
178
179 let tracer = provider.tracer("hyperspot");
181 let otel_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer);
182
183 tracing::info!("OpenTelemetry layer created successfully");
184 Ok(otel_layer)
185}
186
187#[cfg(feature = "otel")]
188fn build_headers_from_cfg_and_env(
189 cfg: &TracingConfig,
190) -> Option<std::collections::HashMap<String, String>> {
191 use std::collections::HashMap;
192 let mut out: HashMap<String, String> = HashMap::new();
193
194 if let Some(exp) = &cfg.exporter
196 && let Some(hdrs) = &exp.headers
197 {
198 for (k, v) in hdrs {
199 out.insert(k.clone(), v.clone());
200 }
201 }
202
203 if let Ok(env_hdrs) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
205 for part in env_hdrs.split(',').map(str::trim).filter(|s| !s.is_empty()) {
206 if let Some((k, v)) = part.split_once('=') {
207 out.insert(k.trim().to_owned(), v.trim().to_owned());
208 }
209 }
210 }
211
212 if out.is_empty() { None } else { Some(out) }
213}
214
215#[cfg(feature = "otel")]
216fn extend_metadata_from_source<'a, I>(md: &mut MetadataMap, source: I, context: &'static str)
217where
218 I: Iterator<Item = (&'a str, &'a str)>,
219{
220 for (k, v) in source {
221 match MetadataKey::from_bytes(k.as_bytes()) {
222 Ok(key) => match MetadataValue::try_from(v) {
223 Ok(val) => {
224 md.insert(key, val);
225 }
226 Err(_) => {
227 tracing::warn!(header = %k, context, "Skipping invalid gRPC metadata value");
228 }
229 },
230 Err(_) => {
231 tracing::warn!(header = %k, context, "Skipping invalid gRPC metadata header name");
232 }
233 }
234 }
235}
236
237#[cfg(feature = "otel")]
238fn build_metadata_from_cfg_and_env(cfg: &TracingConfig) -> Option<MetadataMap> {
239 let mut md = MetadataMap::new();
240
241 if let Some(exp) = &cfg.exporter
243 && let Some(hdrs) = &exp.headers
244 {
245 let iter = hdrs.iter().map(|(k, v)| (k.as_str(), v.as_str()));
246 extend_metadata_from_source(&mut md, iter, "config");
247 }
248
249 if let Ok(env_hdrs) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
251 let iter = env_hdrs.split(',').filter_map(|part| {
252 let part = part.trim();
253 if part.is_empty() {
254 None
255 } else {
256 part.split_once('=').map(|(k, v)| (k.trim(), v.trim()))
257 }
258 });
259 extend_metadata_from_source(&mut md, iter, "env");
260 }
261
262 if md.is_empty() { None } else { Some(md) }
263}
264
265#[cfg(not(feature = "otel"))]
268pub fn init_tracing(_cfg: &serde_json::Value) -> Option<()> {
269 tracing::info!("Tracing configuration provided but runtime feature is disabled");
270 None
271}
272
273#[cfg(feature = "otel")]
280pub fn shutdown_tracing() {
281 tracing::info!("Tracing shutdown: no-op (keep a provider handle to call `shutdown()`).");
282}
283
284#[cfg(not(feature = "otel"))]
285pub fn shutdown_tracing() {
286 tracing::info!("Tracing shutdown (no-op)");
287}
288
289#[cfg(feature = "otel")]
297pub fn otel_connectivity_probe(cfg: &super::config::TracingConfig) -> anyhow::Result<()> {
298 use opentelemetry::trace::{Span, Tracer as _};
299
300 let service_name = cfg
301 .service_name
302 .clone()
303 .unwrap_or_else(|| "hyperspot".into());
304
305 let (kind, endpoint) = cfg.exporter.as_ref().map_or_else(
306 || (ExporterKind::OtlpGrpc, "http://127.0.0.1:4317".into()),
307 |e| {
308 (
309 e.kind,
310 e.endpoint
311 .clone()
312 .unwrap_or_else(|| "http://127.0.0.1:4317".into()),
313 )
314 },
315 );
316
317 let resource = Resource::builder_empty()
319 .with_attributes([KeyValue::new("service.name", service_name)])
320 .build();
321
322 let exporter = if matches!(kind, ExporterKind::OtlpHttp) {
324 let mut b = opentelemetry_otlp::SpanExporter::builder()
325 .with_http()
326 .with_protocol(Protocol::HttpBinary)
327 .with_endpoint(endpoint);
328 if let Some(h) = build_headers_from_cfg_and_env(cfg) {
329 b = b.with_headers(h);
330 }
331 b.build()
332 .map_err(|e| anyhow::anyhow!("otlp http exporter build failed: {e}"))?
333 } else {
334 let mut b = opentelemetry_otlp::SpanExporter::builder()
335 .with_tonic()
336 .with_endpoint(endpoint);
337 if let Some(md) = build_metadata_from_cfg_and_env(cfg) {
338 b = b.with_metadata(md);
339 }
340 b.build()
341 .map_err(|e| anyhow::anyhow!("otlp grpc exporter build failed: {e}"))?
342 };
343
344 let provider = SdkTracerProvider::builder()
346 .with_simple_exporter(exporter)
347 .with_resource(resource)
348 .build();
349
350 let tracer = provider.tracer("connectivity_probe");
352 let mut span = tracer.start("otel_connectivity_probe");
353 span.end();
354
355 if let Err(e) = provider.force_flush() {
357 tracing::warn!(error = %e, "force_flush failed during OTLP connectivity probe");
358 }
359
360 provider
361 .shutdown()
362 .map_err(|e| anyhow::anyhow!("shutdown failed: {e}"))?;
363
364 tracing::info!(kind = ?kind, "OTLP connectivity probe exported a test span");
365 Ok(())
366}
367
368#[cfg(not(feature = "otel"))]
373pub fn otel_connectivity_probe(_cfg: &serde_json::Value) -> anyhow::Result<()> {
374 tracing::info!("OTLP connectivity probe skipped (otel feature disabled)");
375 Ok(())
376}
377
378#[cfg(test)]
381#[cfg_attr(coverage_nightly, coverage(off))]
382mod tests {
383 use super::*;
384 use crate::telemetry::config::{Exporter, ExporterKind, Sampler, TracingConfig};
385 use std::collections::HashMap;
386
387 #[test]
388 #[cfg(feature = "otel")]
389 fn test_init_tracing_disabled() {
390 let cfg = TracingConfig {
391 enabled: false,
392 ..Default::default()
393 };
394
395 let result = init_tracing(&cfg);
396 assert!(result.is_err());
397 }
398
399 #[tokio::test]
400 #[cfg(feature = "otel")]
401 async fn test_init_tracing_enabled() {
402 let cfg = TracingConfig {
403 enabled: true,
404 service_name: Some("test-service".to_owned()),
405 ..Default::default()
406 };
407
408 let result = init_tracing(&cfg);
409 assert!(result.is_ok());
410 }
411
412 #[test]
413 #[cfg(feature = "otel")]
414 fn test_init_tracing_with_resource_attributes() {
415 let rt = tokio::runtime::Runtime::new().unwrap();
416 let _guard = rt.enter();
417
418 let mut resource_map = HashMap::new();
419 resource_map.insert("service.version".to_owned(), "1.0.0".to_owned());
420 resource_map.insert("deployment.environment".to_owned(), "test".to_owned());
421
422 let cfg = TracingConfig {
423 enabled: true,
424 service_name: Some("test-service".to_owned()),
425 resource: Some(resource_map),
426 ..Default::default()
427 };
428
429 let result = init_tracing(&cfg);
430 assert!(result.is_ok());
431 }
432
433 #[test]
434 #[cfg(feature = "otel")]
435 fn test_init_tracing_with_always_on_sampler() {
436 let rt = tokio::runtime::Runtime::new().unwrap();
437 let _guard = rt.enter();
438
439 let cfg = TracingConfig {
440 enabled: true,
441 service_name: Some("test-service".to_owned()),
442 sampler: Some(Sampler::AlwaysOn {}),
443 ..Default::default()
444 };
445
446 let result = init_tracing(&cfg);
447 assert!(result.is_ok());
448 }
449
450 #[test]
451 #[cfg(feature = "otel")]
452 fn test_init_tracing_with_always_off_sampler() {
453 let rt = tokio::runtime::Runtime::new().unwrap();
454 let _guard = rt.enter();
455
456 let cfg = TracingConfig {
457 enabled: true,
458 service_name: Some("test-service".to_owned()),
459 sampler: Some(Sampler::AlwaysOff {}),
460 ..Default::default()
461 };
462
463 let result = init_tracing(&cfg);
464 assert!(result.is_ok());
465 }
466
467 #[test]
468 #[cfg(feature = "otel")]
469 fn test_init_tracing_with_ratio_sampler() {
470 let rt = tokio::runtime::Runtime::new().unwrap();
471 let _guard = rt.enter();
472
473 let cfg = TracingConfig {
474 enabled: true,
475 service_name: Some("test-service".to_owned()),
476 sampler: Some(Sampler::ParentBasedRatio { ratio: Some(0.5) }),
477 ..Default::default()
478 };
479
480 let result = init_tracing(&cfg);
481 assert!(result.is_ok());
482 }
483
484 #[test]
485 #[cfg(feature = "otel")]
486 fn test_init_tracing_with_http_exporter() {
487 let _rt = tokio::runtime::Runtime::new().unwrap();
488
489 let cfg = TracingConfig {
490 enabled: true,
491 service_name: Some("test-service".to_owned()),
492 exporter: Some(Exporter {
493 kind: ExporterKind::OtlpHttp,
494 endpoint: Some("http://localhost:4318".to_owned()),
495 headers: None,
496 timeout_ms: Some(5000),
497 }),
498 ..Default::default()
499 };
500
501 let result = init_tracing(&cfg);
502 assert!(result.is_ok());
503 }
504
505 #[test]
506 #[cfg(feature = "otel")]
507 fn test_init_tracing_with_grpc_exporter() {
508 let rt = tokio::runtime::Runtime::new().unwrap();
509 let _guard = rt.enter();
510
511 let cfg = TracingConfig {
512 enabled: true,
513 service_name: Some("test-service".to_owned()),
514 exporter: Some(Exporter {
515 kind: ExporterKind::OtlpGrpc,
516 endpoint: Some("http://localhost:4317".to_owned()),
517 headers: None,
518 timeout_ms: Some(5000),
519 }),
520 ..Default::default()
521 };
522
523 let result = init_tracing(&cfg);
524 assert!(result.is_ok());
525 }
526
527 #[test]
528 #[cfg(feature = "otel")]
529 fn test_build_headers_from_cfg_empty() {
530 let cfg = TracingConfig {
531 enabled: true,
532 ..Default::default()
533 };
534
535 let result = build_headers_from_cfg_and_env(&cfg);
536 assert!(result.is_none() || result.is_some());
539 }
540
541 #[test]
542 #[cfg(feature = "otel")]
543 fn test_build_headers_from_cfg_with_headers() {
544 let mut headers = HashMap::new();
545 headers.insert("authorization".to_owned(), "Bearer token".to_owned());
546
547 let cfg = TracingConfig {
548 enabled: true,
549 exporter: Some(Exporter {
550 kind: ExporterKind::OtlpHttp,
551 endpoint: Some("http://localhost:4318".to_owned()),
552 headers: Some(headers.clone()),
553 timeout_ms: None,
554 }),
555 ..Default::default()
556 };
557
558 let result = build_headers_from_cfg_and_env(&cfg);
559 assert!(result.is_some());
560 let result_headers = result.unwrap();
561 assert_eq!(
562 result_headers.get("authorization"),
563 Some(&"Bearer token".to_owned())
564 );
565 }
566
567 #[test]
568 #[cfg(feature = "otel")]
569 fn test_build_metadata_from_cfg_empty() {
570 let cfg = TracingConfig {
571 enabled: true,
572 ..Default::default()
573 };
574
575 let result = build_metadata_from_cfg_and_env(&cfg);
576 assert!(result.is_none() || result.is_some());
578 }
579
580 #[test]
581 #[cfg(feature = "otel")]
582 fn test_build_metadata_from_cfg_with_headers() {
583 let mut headers = HashMap::new();
584 headers.insert("authorization".to_owned(), "Bearer token".to_owned());
585
586 let cfg = TracingConfig {
587 enabled: true,
588 exporter: Some(Exporter {
589 kind: ExporterKind::OtlpGrpc,
590 endpoint: Some("http://localhost:4317".to_owned()),
591 headers: Some(headers.clone()),
592 timeout_ms: None,
593 }),
594 ..Default::default()
595 };
596
597 let result = build_metadata_from_cfg_and_env(&cfg);
598 assert!(result.is_some());
599 let metadata = result.unwrap();
600 assert!(!metadata.is_empty());
601 }
602
603 #[test]
604 #[cfg(feature = "otel")]
605 fn test_build_metadata_multiple_headers() {
606 let mut headers = HashMap::new();
607 headers.insert("authorization".to_owned(), "Bearer token".to_owned());
608 headers.insert("x-custom-header".to_owned(), "custom-value".to_owned());
609
610 let cfg = TracingConfig {
611 enabled: true,
612 exporter: Some(Exporter {
613 kind: ExporterKind::OtlpGrpc,
614 endpoint: Some("http://localhost:4317".to_owned()),
615 headers: Some(headers.clone()),
616 timeout_ms: None,
617 }),
618 ..Default::default()
619 };
620
621 let result = build_metadata_from_cfg_and_env(&cfg);
622 assert!(result.is_some());
623 let metadata = result.unwrap();
624 assert_eq!(metadata.len(), 2);
625 }
626
627 #[test]
628 #[cfg(feature = "otel")]
629 fn test_build_metadata_invalid_header_name_skipped() {
630 let mut headers = HashMap::new();
631 headers.insert("valid-header".to_owned(), "value1".to_owned());
632 headers.insert("invalid header with spaces".to_owned(), "value2".to_owned());
633
634 let cfg = TracingConfig {
635 enabled: true,
636 exporter: Some(Exporter {
637 kind: ExporterKind::OtlpGrpc,
638 endpoint: Some("http://localhost:4317".to_owned()),
639 headers: Some(headers.clone()),
640 timeout_ms: None,
641 }),
642 ..Default::default()
643 };
644
645 let result = build_metadata_from_cfg_and_env(&cfg);
646 assert!(result.is_some());
647 let metadata = result.unwrap();
648 assert_eq!(metadata.len(), 1);
650 }
651
652 #[test]
653 fn test_shutdown_tracing_does_not_panic() {
654 shutdown_tracing();
656 }
657}