#[cfg(feature = "otel")]
use anyhow::Context;
#[cfg(feature = "otel")]
use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
#[cfg(feature = "otel")]
use opentelemetry_otlp::{Protocol, WithExportConfig};
#[cfg(feature = "otel")]
use opentelemetry_otlp::{WithHttpConfig, WithTonicConfig};
#[cfg(feature = "otel")]
use opentelemetry_sdk::{
Resource,
propagation::TraceContextPropagator,
trace::{Sampler, SdkTracerProvider},
};
use super::config::MetricsConfig;
#[cfg(feature = "otel")]
use super::config::TracingConfig;
#[cfg(feature = "otel")]
use crate::telemetry::config::ExporterKind;
#[cfg(feature = "otel")]
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
#[cfg(feature = "otel")]
pub(crate) fn build_resource(cfg: &TracingConfig) -> Resource {
let service_name = cfg.service_name.as_deref().unwrap_or("hyperspot");
let mut attrs = vec![KeyValue::new("service.name", service_name.to_owned())];
if let Some(resource_map) = &cfg.resource {
for (k, v) in resource_map {
attrs.push(KeyValue::new(k.clone(), v.clone()));
}
}
Resource::builder_empty().with_attributes(attrs).build()
}
#[cfg(feature = "otel")]
fn build_sampler(cfg: &TracingConfig) -> Sampler {
match cfg.sampler.as_ref() {
Some(crate::telemetry::config::Sampler::AlwaysOff { .. }) => Sampler::AlwaysOff,
Some(crate::telemetry::config::Sampler::AlwaysOn { .. }) => Sampler::AlwaysOn,
Some(crate::telemetry::config::Sampler::ParentBasedAlwaysOn { .. }) => {
Sampler::ParentBased(Box::new(Sampler::AlwaysOn))
}
Some(crate::telemetry::config::Sampler::ParentBasedRatio { ratio }) => {
let ratio = ratio.unwrap_or(0.1);
Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(ratio)))
}
None => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
}
}
#[cfg(feature = "otel")]
fn extract_exporter_config(
cfg: &TracingConfig,
) -> (ExporterKind, String, Option<std::time::Duration>) {
let (kind, endpoint) = cfg.exporter.as_ref().map_or_else(
|| (ExporterKind::OtlpGrpc, "http://127.0.0.1:4317".into()),
|e| {
(
e.kind,
e.endpoint
.clone()
.unwrap_or_else(|| "http://127.0.0.1:4317".into()),
)
},
);
let timeout = cfg
.exporter
.as_ref()
.and_then(|e| e.timeout_ms)
.map(std::time::Duration::from_millis);
(kind, endpoint, timeout)
}
#[cfg(feature = "otel")]
fn build_http_exporter(
cfg: &TracingConfig,
endpoint: String,
timeout: Option<std::time::Duration>,
) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
let mut b = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint(endpoint);
if let Some(t) = timeout {
b = b.with_timeout(t);
}
if let Some(hmap) = build_headers_from_cfg_and_env(cfg.exporter.as_ref()) {
b = b.with_headers(hmap);
}
#[allow(clippy::expect_used)]
b.build().context("build OTLP HTTP exporter")
}
#[cfg(feature = "otel")]
fn build_grpc_exporter(
cfg: &TracingConfig,
endpoint: String,
timeout: Option<std::time::Duration>,
) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
let mut b = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint);
if let Some(t) = timeout {
b = b.with_timeout(t);
}
if let Some(md) = build_metadata_from_cfg_and_env(cfg.exporter.as_ref()) {
b = b.with_metadata(md);
}
b.build().context("build OTLP gRPC exporter")
}
#[cfg(feature = "otel")]
pub fn init_tracing(
cfg: &TracingConfig,
) -> anyhow::Result<
tracing_opentelemetry::OpenTelemetryLayer<
tracing_subscriber::Registry,
opentelemetry_sdk::trace::Tracer,
>,
> {
if !cfg.enabled {
return Err(anyhow::anyhow!("tracing is disabled"));
}
global::set_text_map_propagator(TraceContextPropagator::new());
let service_name = cfg.service_name.as_deref().unwrap_or("hyperspot");
tracing::info!("Building OpenTelemetry layer for service: {}", service_name);
let resource = build_resource(cfg);
let sampler = build_sampler(cfg);
let (kind, endpoint, timeout) = extract_exporter_config(cfg);
tracing::info!(kind = ?kind, %endpoint, "OTLP exporter config");
let exporter = if matches!(kind, ExporterKind::OtlpHttp) {
build_http_exporter(cfg, endpoint, timeout)
} else {
build_grpc_exporter(cfg, endpoint, timeout)
}?;
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_sampler(sampler)
.with_resource(resource)
.build();
global::set_tracer_provider(provider.clone());
let tracer = provider.tracer("hyperspot");
let otel_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer);
tracing::info!("OpenTelemetry layer created successfully");
Ok(otel_layer)
}
#[cfg(feature = "otel")]
pub(crate) fn build_headers_from_cfg_and_env(
exporter: Option<&crate::telemetry::config::Exporter>,
) -> Option<std::collections::HashMap<String, String>> {
use std::collections::HashMap;
let mut out: HashMap<String, String> = HashMap::new();
if let Some(exp) = exporter
&& let Some(hdrs) = &exp.headers
{
for (k, v) in hdrs {
out.insert(k.clone(), v.clone());
}
}
if let Ok(env_hdrs) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
for part in env_hdrs.split(',').map(str::trim).filter(|s| !s.is_empty()) {
if let Some((k, v)) = part.split_once('=') {
out.insert(k.trim().to_owned(), v.trim().to_owned());
}
}
}
if out.is_empty() { None } else { Some(out) }
}
#[cfg(feature = "otel")]
pub(crate) fn extend_metadata_from_source<'a, I>(
md: &mut MetadataMap,
source: I,
context: &'static str,
) where
I: Iterator<Item = (&'a str, &'a str)>,
{
for (k, v) in source {
match MetadataKey::from_bytes(k.as_bytes()) {
Ok(key) => match MetadataValue::try_from(v) {
Ok(val) => {
md.insert(key, val);
}
Err(_) => {
tracing::warn!(header = %k, context, "Skipping invalid gRPC metadata value");
}
},
Err(_) => {
tracing::warn!(header = %k, context, "Skipping invalid gRPC metadata header name");
}
}
}
}
#[cfg(feature = "otel")]
pub(crate) fn build_metadata_from_cfg_and_env(
exporter: Option<&crate::telemetry::config::Exporter>,
) -> Option<MetadataMap> {
let mut md = MetadataMap::new();
if let Some(exp) = exporter
&& let Some(hdrs) = &exp.headers
{
let iter = hdrs.iter().map(|(k, v)| (k.as_str(), v.as_str()));
extend_metadata_from_source(&mut md, iter, "config");
}
if let Ok(env_hdrs) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
let iter = env_hdrs.split(',').filter_map(|part| {
let part = part.trim();
if part.is_empty() {
None
} else {
part.split_once('=').map(|(k, v)| (k.trim(), v.trim()))
}
});
extend_metadata_from_source(&mut md, iter, "env");
}
if md.is_empty() { None } else { Some(md) }
}
#[cfg(not(feature = "otel"))]
pub fn init_tracing(_cfg: &serde_json::Value) -> Option<()> {
tracing::info!("Tracing configuration provided but runtime feature is disabled");
None
}
#[cfg(feature = "otel")]
pub fn shutdown_tracing() {
tracing::info!("Tracing shutdown: no-op (keep a provider handle to call `shutdown()`).");
}
#[cfg(not(feature = "otel"))]
pub fn shutdown_tracing() {
tracing::info!("Tracing shutdown (no-op)");
}
#[cfg(feature = "otel")]
pub fn shutdown_metrics() {
tracing::info!("Metrics shutdown: no-op (keep a provider handle to call `shutdown()`).");
}
#[cfg(not(feature = "otel"))]
pub fn shutdown_metrics() {
tracing::info!("Metrics shutdown (no-op)");
}
#[cfg(feature = "otel")]
#[allow(clippy::needless_pass_by_value)]
pub fn init_metrics(
cfg: &MetricsConfig,
provider: opentelemetry_sdk::metrics::SdkMeterProvider,
) -> anyhow::Result<()> {
if !cfg.enabled {
return Err(anyhow::anyhow!("metrics is disabled"));
}
global::set_meter_provider(provider);
tracing::info!("OpenTelemetry metrics initialized successfully");
Ok(())
}
#[cfg(not(feature = "otel"))]
pub fn init_metrics(_cfg: &MetricsConfig, _provider: ()) -> anyhow::Result<()> {
tracing::info!("Metrics configuration provided but runtime feature is disabled");
Err(anyhow::anyhow!("otel feature is disabled"))
}
#[cfg(feature = "otel")]
pub fn otel_connectivity_probe(cfg: &super::config::TracingConfig) -> anyhow::Result<()> {
use opentelemetry::trace::{Span, Tracer as _};
let service_name = cfg
.service_name
.clone()
.unwrap_or_else(|| "hyperspot".into());
let (kind, endpoint) = cfg.exporter.as_ref().map_or_else(
|| (ExporterKind::OtlpGrpc, "http://127.0.0.1:4317".into()),
|e| {
(
e.kind,
e.endpoint
.clone()
.unwrap_or_else(|| "http://127.0.0.1:4317".into()),
)
},
);
let resource = Resource::builder_empty()
.with_attributes([KeyValue::new("service.name", service_name)])
.build();
let exporter = if matches!(kind, ExporterKind::OtlpHttp) {
let mut b = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint(endpoint);
if let Some(h) = build_headers_from_cfg_and_env(cfg.exporter.as_ref()) {
b = b.with_headers(h);
}
b.build()
.map_err(|e| anyhow::anyhow!("otlp http exporter build failed: {e}"))?
} else {
let mut b = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint);
if let Some(md) = build_metadata_from_cfg_and_env(cfg.exporter.as_ref()) {
b = b.with_metadata(md);
}
b.build()
.map_err(|e| anyhow::anyhow!("otlp grpc exporter build failed: {e}"))?
};
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter)
.with_resource(resource)
.build();
let tracer = provider.tracer("connectivity_probe");
let mut span = tracer.start("otel_connectivity_probe");
span.end();
if let Err(e) = provider.force_flush() {
tracing::warn!(error = %e, "force_flush failed during OTLP connectivity probe");
}
provider
.shutdown()
.map_err(|e| anyhow::anyhow!("shutdown failed: {e}"))?;
tracing::info!(kind = ?kind, "OTLP connectivity probe exported a test span");
Ok(())
}
#[cfg(not(feature = "otel"))]
pub fn otel_connectivity_probe(_cfg: &serde_json::Value) -> anyhow::Result<()> {
tracing::info!("OTLP connectivity probe skipped (otel feature disabled)");
Ok(())
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use crate::telemetry::config::{Exporter, ExporterKind, Sampler, TracingConfig};
use std::collections::HashMap;
#[test]
#[cfg(feature = "otel")]
fn test_init_tracing_disabled() {
let cfg = TracingConfig {
enabled: false,
..Default::default()
};
let result = init_tracing(&cfg);
assert!(result.is_err());
}
#[tokio::test]
#[cfg(feature = "otel")]
async fn test_init_tracing_enabled() {
let cfg = TracingConfig {
enabled: true,
service_name: Some("test-service".to_owned()),
..Default::default()
};
let result = init_tracing(&cfg);
assert!(result.is_ok());
}
#[test]
#[cfg(feature = "otel")]
fn test_init_tracing_with_resource_attributes() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let mut resource_map = HashMap::new();
resource_map.insert("service.version".to_owned(), "1.0.0".to_owned());
resource_map.insert("deployment.environment".to_owned(), "test".to_owned());
let cfg = TracingConfig {
enabled: true,
service_name: Some("test-service".to_owned()),
resource: Some(resource_map),
..Default::default()
};
let result = init_tracing(&cfg);
assert!(result.is_ok());
}
#[test]
#[cfg(feature = "otel")]
fn test_init_tracing_with_always_on_sampler() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let cfg = TracingConfig {
enabled: true,
service_name: Some("test-service".to_owned()),
sampler: Some(Sampler::AlwaysOn {}),
..Default::default()
};
let result = init_tracing(&cfg);
assert!(result.is_ok());
}
#[test]
#[cfg(feature = "otel")]
fn test_init_tracing_with_always_off_sampler() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let cfg = TracingConfig {
enabled: true,
service_name: Some("test-service".to_owned()),
sampler: Some(Sampler::AlwaysOff {}),
..Default::default()
};
let result = init_tracing(&cfg);
assert!(result.is_ok());
}
#[test]
#[cfg(feature = "otel")]
fn test_init_tracing_with_ratio_sampler() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let cfg = TracingConfig {
enabled: true,
service_name: Some("test-service".to_owned()),
sampler: Some(Sampler::ParentBasedRatio { ratio: Some(0.5) }),
..Default::default()
};
let result = init_tracing(&cfg);
assert!(result.is_ok());
}
#[test]
#[cfg(feature = "otel")]
fn test_init_tracing_with_http_exporter() {
let _rt = tokio::runtime::Runtime::new().unwrap();
let cfg = TracingConfig {
enabled: true,
service_name: Some("test-service".to_owned()),
exporter: Some(Exporter {
kind: ExporterKind::OtlpHttp,
endpoint: Some("http://localhost:4318".to_owned()),
headers: None,
timeout_ms: Some(5000),
}),
..Default::default()
};
let result = init_tracing(&cfg);
assert!(result.is_ok());
}
#[test]
#[cfg(feature = "otel")]
fn test_init_tracing_with_grpc_exporter() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let cfg = TracingConfig {
enabled: true,
service_name: Some("test-service".to_owned()),
exporter: Some(Exporter {
kind: ExporterKind::OtlpGrpc,
endpoint: Some("http://localhost:4317".to_owned()),
headers: None,
timeout_ms: Some(5000),
}),
..Default::default()
};
let result = init_tracing(&cfg);
assert!(result.is_ok());
}
#[test]
#[cfg(feature = "otel")]
fn test_build_headers_from_cfg_empty() {
let cfg = TracingConfig {
enabled: true,
..Default::default()
};
let result = build_headers_from_cfg_and_env(cfg.exporter.as_ref());
assert!(result.is_none() || result.is_some());
}
#[test]
#[cfg(feature = "otel")]
fn test_build_headers_from_cfg_with_headers() {
let mut headers = HashMap::new();
headers.insert("authorization".to_owned(), "Bearer token".to_owned());
let cfg = TracingConfig {
enabled: true,
exporter: Some(Exporter {
kind: ExporterKind::OtlpHttp,
endpoint: Some("http://localhost:4318".to_owned()),
headers: Some(headers.clone()),
timeout_ms: None,
}),
..Default::default()
};
let result = build_headers_from_cfg_and_env(cfg.exporter.as_ref());
assert!(result.is_some());
let result_headers = result.unwrap();
assert_eq!(
result_headers.get("authorization"),
Some(&"Bearer token".to_owned())
);
}
#[test]
#[cfg(feature = "otel")]
fn test_build_metadata_from_cfg_empty() {
let cfg = TracingConfig {
enabled: true,
..Default::default()
};
let result = build_metadata_from_cfg_and_env(cfg.exporter.as_ref());
assert!(result.is_none() || result.is_some());
}
#[test]
#[cfg(feature = "otel")]
fn test_build_metadata_from_cfg_with_headers() {
let mut headers = HashMap::new();
headers.insert("authorization".to_owned(), "Bearer token".to_owned());
let cfg = TracingConfig {
enabled: true,
exporter: Some(Exporter {
kind: ExporterKind::OtlpGrpc,
endpoint: Some("http://localhost:4317".to_owned()),
headers: Some(headers.clone()),
timeout_ms: None,
}),
..Default::default()
};
let result = build_metadata_from_cfg_and_env(cfg.exporter.as_ref());
assert!(result.is_some());
let metadata = result.unwrap();
assert!(!metadata.is_empty());
}
#[test]
#[cfg(feature = "otel")]
fn test_build_metadata_multiple_headers() {
let mut headers = HashMap::new();
headers.insert("authorization".to_owned(), "Bearer token".to_owned());
headers.insert("x-custom-header".to_owned(), "custom-value".to_owned());
let cfg = TracingConfig {
enabled: true,
exporter: Some(Exporter {
kind: ExporterKind::OtlpGrpc,
endpoint: Some("http://localhost:4317".to_owned()),
headers: Some(headers.clone()),
timeout_ms: None,
}),
..Default::default()
};
let result = build_metadata_from_cfg_and_env(cfg.exporter.as_ref());
assert!(result.is_some());
let metadata = result.unwrap();
assert_eq!(metadata.len(), 2);
}
#[test]
#[cfg(feature = "otel")]
fn test_build_metadata_invalid_header_name_skipped() {
let mut headers = HashMap::new();
headers.insert("valid-header".to_owned(), "value1".to_owned());
headers.insert("invalid header with spaces".to_owned(), "value2".to_owned());
let cfg = TracingConfig {
enabled: true,
exporter: Some(Exporter {
kind: ExporterKind::OtlpGrpc,
endpoint: Some("http://localhost:4317".to_owned()),
headers: Some(headers.clone()),
timeout_ms: None,
}),
..Default::default()
};
let result = build_metadata_from_cfg_and_env(cfg.exporter.as_ref());
assert!(result.is_some());
let metadata = result.unwrap();
assert_eq!(metadata.len(), 1);
}
#[test]
fn test_shutdown_tracing_does_not_panic() {
shutdown_tracing();
}
#[test]
#[cfg(feature = "otel")]
fn test_init_metrics_disabled() {
use crate::telemetry::config::MetricsConfig;
let cfg = MetricsConfig {
enabled: false,
exporter: None,
..Default::default()
};
let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder().build();
let result = init_metrics(&cfg, provider);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("disabled"));
}
#[test]
#[cfg(feature = "otel")]
fn test_init_metrics_enabled() {
use crate::telemetry::config::MetricsConfig;
let cfg = MetricsConfig {
enabled: true,
exporter: None,
..Default::default()
};
let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder().build();
let result = init_metrics(&cfg, provider);
assert!(result.is_ok());
}
}