Skip to main content

harn_vm/observability/
otel.rs

1use std::collections::BTreeMap;
2#[cfg(feature = "otel")]
3use std::collections::HashMap;
4
5#[cfg(feature = "otel")]
6use sha2::{Digest, Sha256};
7use tracing::level_filters::LevelFilter;
8#[cfg(feature = "otel")]
9use tracing_subscriber::filter::filter_fn;
10use tracing_subscriber::layer::SubscriberExt;
11#[cfg(feature = "otel")]
12use tracing_subscriber::Layer as _;
13
14use crate::TraceId;
15
16pub const OTEL_PARENT_SPAN_ID_HEADER: &str = "otel_parent_span_id";
17pub const OTEL_TRACEPARENT_HEADER: &str = "traceparent";
18pub const OTEL_TRACESTATE_HEADER: &str = "tracestate";
19
20static OBSERVABILITY_INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
21
22pub struct ObservabilityGuard {
23    #[cfg(feature = "otel")]
24    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
25}
26
27impl ObservabilityGuard {
28    pub fn install_orchestrator_subscriber_from_env() -> Result<Self, String> {
29        if OBSERVABILITY_INIT.get().is_some() {
30            return Ok(Self {
31                #[cfg(feature = "otel")]
32                tracer_provider: None,
33            });
34        }
35
36        #[cfg(feature = "otel")]
37        {
38            if let Some(provider) = build_tracer_provider_from_env()? {
39                use opentelemetry::trace::TracerProvider as _;
40
41                let tracer = provider.tracer("harn.orchestrator");
42                let telemetry = tracing_opentelemetry::layer()
43                    .with_tracer(tracer)
44                    .with_filter(filter_fn(|metadata| {
45                        metadata.is_span() && metadata.target().starts_with("harn")
46                    }));
47                let subscriber = tracing_subscriber::registry()
48                    .with(LevelFilter::INFO)
49                    .with(fmt_layer())
50                    .with(telemetry);
51                tracing::subscriber::set_global_default(subscriber).map_err(|error| {
52                    format!("failed to install global tracing subscriber: {error}")
53                })?;
54                let _ = OBSERVABILITY_INIT.set(());
55                return Ok(Self {
56                    tracer_provider: Some(provider),
57                });
58            }
59        }
60
61        #[cfg(not(feature = "otel"))]
62        if std::env::var("HARN_OTEL_ENDPOINT")
63            .ok()
64            .filter(|value| !value.trim().is_empty())
65            .is_some()
66        {
67            return Err(
68                "HARN_OTEL_ENDPOINT is set, but this build was compiled without the `otel` feature"
69                    .to_string(),
70            );
71        }
72
73        let subscriber = tracing_subscriber::registry()
74            .with(LevelFilter::INFO)
75            .with(fmt_layer());
76        tracing::subscriber::set_global_default(subscriber)
77            .map_err(|error| format!("failed to install global tracing subscriber: {error}"))?;
78        let _ = OBSERVABILITY_INIT.set(());
79        Ok(Self {
80            #[cfg(feature = "otel")]
81            tracer_provider: None,
82        })
83    }
84
85    #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
86    pub fn shutdown(mut self) -> Result<(), String> {
87        #[cfg(feature = "otel")]
88        if let Some(provider) = self.tracer_provider.take() {
89            provider
90                .force_flush()
91                .map_err(|error| format!("failed to flush OTel spans: {error}"))?;
92            provider
93                .shutdown()
94                .map_err(|error| format!("failed to shut down OTel tracer provider: {error}"))?;
95        }
96        Ok(())
97    }
98}
99
100impl Drop for ObservabilityGuard {
101    fn drop(&mut self) {
102        // Best-effort flush + shutdown so span batches are delivered even when
103        // the caller exits via panic or early return without calling
104        // `shutdown()` explicitly. Ignore errors — there's nothing to recover
105        // to during teardown.
106        #[cfg(feature = "otel")]
107        if let Some(provider) = self.tracer_provider.take() {
108            let _ = provider.force_flush();
109            let _ = provider.shutdown();
110        }
111    }
112}
113
114#[cfg(feature = "otel")]
115pub fn set_span_parent(
116    span: &tracing::Span,
117    trace_id: &TraceId,
118    parent_span_id: Option<&str>,
119) -> Result<(), String> {
120    use opentelemetry::trace::TraceContextExt as _;
121    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
122
123    let context = opentelemetry::Context::current()
124        .with_remote_span_context(span_context(trace_id, parent_span_id));
125    span.set_parent(context)
126        .map_err(|error| format!("failed to attach OTel parent context: {error}"))
127}
128
129#[cfg(not(feature = "otel"))]
130pub fn set_span_parent(
131    _span: &tracing::Span,
132    _trace_id: &TraceId,
133    _parent_span_id: Option<&str>,
134) -> Result<(), String> {
135    Ok(())
136}
137
138#[cfg(feature = "otel")]
139pub fn current_span_id_hex(span: &tracing::Span) -> Option<String> {
140    use opentelemetry::trace::TraceContextExt as _;
141    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
142
143    let context = span.context();
144    let binding = context.span();
145    let span_context = binding.span_context();
146    span_context
147        .is_valid()
148        .then(|| span_context.span_id().to_string())
149}
150
151#[cfg(not(feature = "otel"))]
152pub fn current_span_id_hex(_span: &tracing::Span) -> Option<String> {
153    None
154}
155
156#[cfg(feature = "otel")]
157pub fn inject_current_context_headers(
158    span: &tracing::Span,
159    headers: &mut BTreeMap<String, String>,
160) -> Result<(), String> {
161    use opentelemetry::propagation::{Injector, TextMapPropagator as _};
162    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
163
164    struct HeaderInjector<'a>(&'a mut BTreeMap<String, String>);
165
166    impl Injector for HeaderInjector<'_> {
167        fn set(&mut self, key: &str, value: String) {
168            self.0.insert(key.to_string(), value);
169        }
170    }
171
172    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
173    propagator.inject_context(&span.context(), &mut HeaderInjector(headers));
174    Ok(())
175}
176
177#[cfg(not(feature = "otel"))]
178pub fn inject_current_context_headers(
179    _span: &tracing::Span,
180    _headers: &mut BTreeMap<String, String>,
181) -> Result<(), String> {
182    Ok(())
183}
184
185#[cfg(feature = "otel")]
186pub fn set_span_parent_from_headers(
187    span: &tracing::Span,
188    headers: &BTreeMap<String, String>,
189    trace_id: &TraceId,
190    fallback_parent_span_id: Option<&str>,
191) -> Result<(), String> {
192    use opentelemetry::propagation::{Extractor, TextMapPropagator as _};
193    use opentelemetry::trace::TraceContextExt as _;
194    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
195
196    struct HeaderExtractor<'a>(&'a BTreeMap<String, String>);
197
198    impl Extractor for HeaderExtractor<'_> {
199        fn get(&self, key: &str) -> Option<&str> {
200            self.0.get(key).map(String::as_str)
201        }
202
203        fn keys(&self) -> Vec<&str> {
204            self.0.keys().map(String::as_str).collect()
205        }
206    }
207
208    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
209    let context = propagator.extract(&HeaderExtractor(headers));
210    let binding = context.span();
211    let span_context = binding.span_context();
212    if span_context.is_valid() {
213        return span
214            .set_parent(context)
215            .map_err(|error| format!("failed to attach OTel parent context: {error}"));
216    }
217    set_span_parent(span, trace_id, fallback_parent_span_id)
218}
219
220#[cfg(not(feature = "otel"))]
221pub fn set_span_parent_from_headers(
222    _span: &tracing::Span,
223    _headers: &BTreeMap<String, String>,
224    _trace_id: &TraceId,
225    _fallback_parent_span_id: Option<&str>,
226) -> Result<(), String> {
227    Ok(())
228}
229
230fn fmt_layer<S>() -> impl tracing_subscriber::Layer<S> + Send + Sync
231where
232    S: tracing::Subscriber,
233    for<'span> S: tracing_subscriber::registry::LookupSpan<'span>,
234{
235    tracing_subscriber::fmt::layer()
236        .with_target(false)
237        .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
238        .with_writer(std::io::stderr)
239        .compact()
240}
241
242#[cfg(feature = "otel")]
243fn build_tracer_provider_from_env(
244) -> Result<Option<opentelemetry_sdk::trace::SdkTracerProvider>, String> {
245    use opentelemetry::global;
246    use opentelemetry_otlp::{Protocol, WithExportConfig as _, WithHttpConfig as _};
247    use opentelemetry_sdk::runtime;
248    use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
249    use opentelemetry_sdk::Resource;
250
251    let Some(raw_endpoint) = std::env::var("HARN_OTEL_ENDPOINT")
252        .ok()
253        .map(|value| value.trim().to_string())
254        .filter(|value| !value.is_empty())
255    else {
256        return Ok(None);
257    };
258
259    let endpoint = normalize_otlp_traces_endpoint(&raw_endpoint);
260    let service_name = std::env::var("HARN_OTEL_SERVICE_NAME")
261        .ok()
262        .map(|value| value.trim().to_string())
263        .filter(|value| !value.is_empty())
264        .unwrap_or_else(|| "harn-orchestrator".to_string());
265    let headers = parse_headers(&std::env::var("HARN_OTEL_HEADERS").unwrap_or_default());
266
267    let exporter = opentelemetry_otlp::SpanExporter::builder()
268        .with_http()
269        .with_http_client(
270            reqwest::Client::builder()
271                .build()
272                .map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?,
273        )
274        .with_protocol(Protocol::HttpJson)
275        .with_endpoint(endpoint)
276        .with_headers(headers)
277        .build()
278        .map_err(|error| format!("failed to build OTel span exporter: {error}"))?;
279
280    let batch = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
281    let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
282        .with_resource(Resource::builder().with_service_name(service_name).build())
283        .with_span_processor(batch)
284        .build();
285    global::set_tracer_provider(provider.clone());
286    Ok(Some(provider))
287}
288
289#[cfg(feature = "otel")]
290fn span_context(
291    trace_id: &TraceId,
292    parent_span_id: Option<&str>,
293) -> opentelemetry::trace::SpanContext {
294    use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
295
296    let trace_id = otel_trace_id(trace_id);
297    let span_id = parent_span_id
298        .and_then(|value| SpanId::from_hex(value).ok())
299        .filter(|value| *value != SpanId::INVALID)
300        .unwrap_or_else(|| hashed_span_id(trace_id.to_string().as_bytes()));
301
302    SpanContext::new(
303        trace_id,
304        span_id,
305        TraceFlags::SAMPLED,
306        true,
307        TraceState::default(),
308    )
309}
310
311#[cfg(feature = "otel")]
312fn otel_trace_id(trace_id: &TraceId) -> opentelemetry::trace::TraceId {
313    use opentelemetry::trace::TraceId as OtelTraceId;
314
315    let normalized = trace_id
316        .0
317        .strip_prefix("trace_")
318        .unwrap_or(trace_id.0.as_str())
319        .replace('-', "");
320    if let Ok(trace_id) = OtelTraceId::from_hex(&normalized) {
321        if trace_id != OtelTraceId::INVALID {
322            return trace_id;
323        }
324    }
325    hashed_trace_id(trace_id.0.as_bytes())
326}
327
328#[cfg(feature = "otel")]
329fn hashed_trace_id(input: &[u8]) -> opentelemetry::trace::TraceId {
330    let digest = Sha256::digest(input);
331    let mut bytes = [0_u8; 16];
332    bytes.copy_from_slice(&digest[..16]);
333    opentelemetry::trace::TraceId::from_bytes(bytes)
334}
335
336#[cfg(feature = "otel")]
337fn hashed_span_id(input: &[u8]) -> opentelemetry::trace::SpanId {
338    let digest = Sha256::digest(input);
339    let mut bytes = [0_u8; 8];
340    bytes.copy_from_slice(&digest[..8]);
341    if bytes.iter().all(|byte| *byte == 0) {
342        bytes[7] = 1;
343    }
344    opentelemetry::trace::SpanId::from_bytes(bytes)
345}
346
347#[cfg(feature = "otel")]
348fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
349    let trimmed = endpoint.trim_end_matches('/');
350    if trimmed.ends_with("/v1/traces") {
351        trimmed.to_string()
352    } else {
353        format!("{trimmed}/v1/traces")
354    }
355}
356
357#[cfg(feature = "otel")]
358fn parse_headers(raw: &str) -> HashMap<String, String> {
359    raw.split([',', '\n', ';'])
360        .map(str::trim)
361        .filter(|segment| !segment.is_empty())
362        .filter_map(|segment| {
363            let (name, value) = segment
364                .split_once('=')
365                .or_else(|| segment.split_once(':'))?;
366            let name = name.trim();
367            let value = value.trim();
368            if name.is_empty() || value.is_empty() {
369                return None;
370            }
371            Some((name.to_string(), value.to_string()))
372        })
373        .collect()
374}
375
376#[cfg(all(test, feature = "otel"))]
377mod tests {
378    use super::*;
379
380    #[test]
381    fn normalizes_trace_endpoint_suffix() {
382        assert_eq!(
383            normalize_otlp_traces_endpoint("http://127.0.0.1:4318"),
384            "http://127.0.0.1:4318/v1/traces"
385        );
386        assert_eq!(
387            normalize_otlp_traces_endpoint("http://127.0.0.1:4318/v1/traces"),
388            "http://127.0.0.1:4318/v1/traces"
389        );
390    }
391
392    #[test]
393    fn parses_header_lists() {
394        let headers = parse_headers("authorization=Bearer token,x-tenant-id=tenant-123;trace=true");
395        assert_eq!(
396            headers.get("authorization"),
397            Some(&"Bearer token".to_string())
398        );
399        assert_eq!(headers.get("x-tenant-id"), Some(&"tenant-123".to_string()));
400        assert_eq!(headers.get("trace"), Some(&"true".to_string()));
401    }
402}