harn_vm/observability/
otel.rs1use std::collections::BTreeMap;
2#[cfg(feature = "otel")]
3use std::collections::HashMap;
4
5#[cfg(feature = "otel")]
6use sha2::{Digest, Sha256};
7use tracing::level_filters::LevelFilter;
8#[cfg(feature = "otel")]
9use tracing_subscriber::filter::filter_fn;
10use tracing_subscriber::layer::SubscriberExt;
11#[cfg(feature = "otel")]
12use tracing_subscriber::Layer as _;
13
14use crate::TraceId;
15
16pub const OTEL_PARENT_SPAN_ID_HEADER: &str = "otel_parent_span_id";
17pub const OTEL_TRACEPARENT_HEADER: &str = "traceparent";
18pub const OTEL_TRACESTATE_HEADER: &str = "tracestate";
19
20static OBSERVABILITY_INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
21
22pub struct ObservabilityGuard {
23 #[cfg(feature = "otel")]
24 tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
25}
26
27impl ObservabilityGuard {
28 pub fn install_orchestrator_subscriber_from_env() -> Result<Self, String> {
29 if OBSERVABILITY_INIT.get().is_some() {
30 return Ok(Self {
31 #[cfg(feature = "otel")]
32 tracer_provider: None,
33 });
34 }
35
36 #[cfg(feature = "otel")]
37 {
38 if let Some(provider) = build_tracer_provider_from_env()? {
39 use opentelemetry::trace::TracerProvider as _;
40
41 let tracer = provider.tracer("harn.orchestrator");
42 let telemetry = tracing_opentelemetry::layer()
43 .with_tracer(tracer)
44 .with_filter(filter_fn(|metadata| {
45 metadata.is_span() && metadata.target().starts_with("harn")
46 }));
47 let subscriber = tracing_subscriber::registry()
48 .with(LevelFilter::INFO)
49 .with(fmt_layer())
50 .with(telemetry);
51 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
52 format!("failed to install global tracing subscriber: {error}")
53 })?;
54 let _ = OBSERVABILITY_INIT.set(());
55 return Ok(Self {
56 tracer_provider: Some(provider),
57 });
58 }
59 }
60
61 #[cfg(not(feature = "otel"))]
62 if std::env::var("HARN_OTEL_ENDPOINT")
63 .ok()
64 .filter(|value| !value.trim().is_empty())
65 .is_some()
66 {
67 return Err(
68 "HARN_OTEL_ENDPOINT is set, but this build was compiled without the `otel` feature"
69 .to_string(),
70 );
71 }
72
73 let subscriber = tracing_subscriber::registry()
74 .with(LevelFilter::INFO)
75 .with(fmt_layer());
76 tracing::subscriber::set_global_default(subscriber)
77 .map_err(|error| format!("failed to install global tracing subscriber: {error}"))?;
78 let _ = OBSERVABILITY_INIT.set(());
79 Ok(Self {
80 #[cfg(feature = "otel")]
81 tracer_provider: None,
82 })
83 }
84
85 #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
86 pub fn shutdown(mut self) -> Result<(), String> {
87 #[cfg(feature = "otel")]
88 if let Some(provider) = self.tracer_provider.take() {
89 provider
90 .force_flush()
91 .map_err(|error| format!("failed to flush OTel spans: {error}"))?;
92 provider
93 .shutdown()
94 .map_err(|error| format!("failed to shut down OTel tracer provider: {error}"))?;
95 }
96 Ok(())
97 }
98}
99
100impl Drop for ObservabilityGuard {
101 fn drop(&mut self) {
102 #[cfg(feature = "otel")]
107 if let Some(provider) = self.tracer_provider.take() {
108 let _ = provider.force_flush();
109 let _ = provider.shutdown();
110 }
111 }
112}
113
114#[cfg(feature = "otel")]
115pub fn set_span_parent(
116 span: &tracing::Span,
117 trace_id: &TraceId,
118 parent_span_id: Option<&str>,
119) -> Result<(), String> {
120 use opentelemetry::trace::TraceContextExt as _;
121 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
122
123 let context = opentelemetry::Context::current()
124 .with_remote_span_context(span_context(trace_id, parent_span_id));
125 span.set_parent(context)
126 .map_err(|error| format!("failed to attach OTel parent context: {error}"))
127}
128
129#[cfg(not(feature = "otel"))]
130pub fn set_span_parent(
131 _span: &tracing::Span,
132 _trace_id: &TraceId,
133 _parent_span_id: Option<&str>,
134) -> Result<(), String> {
135 Ok(())
136}
137
138#[cfg(feature = "otel")]
139pub fn current_span_id_hex(span: &tracing::Span) -> Option<String> {
140 use opentelemetry::trace::TraceContextExt as _;
141 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
142
143 let context = span.context();
144 let binding = context.span();
145 let span_context = binding.span_context();
146 span_context
147 .is_valid()
148 .then(|| span_context.span_id().to_string())
149}
150
151#[cfg(not(feature = "otel"))]
152pub fn current_span_id_hex(_span: &tracing::Span) -> Option<String> {
153 None
154}
155
156#[cfg(feature = "otel")]
157pub fn inject_current_context_headers(
158 span: &tracing::Span,
159 headers: &mut BTreeMap<String, String>,
160) -> Result<(), String> {
161 use opentelemetry::propagation::{Injector, TextMapPropagator as _};
162 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
163
164 struct HeaderInjector<'a>(&'a mut BTreeMap<String, String>);
165
166 impl Injector for HeaderInjector<'_> {
167 fn set(&mut self, key: &str, value: String) {
168 self.0.insert(key.to_string(), value);
169 }
170 }
171
172 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
173 propagator.inject_context(&span.context(), &mut HeaderInjector(headers));
174 Ok(())
175}
176
177#[cfg(not(feature = "otel"))]
178pub fn inject_current_context_headers(
179 _span: &tracing::Span,
180 _headers: &mut BTreeMap<String, String>,
181) -> Result<(), String> {
182 Ok(())
183}
184
185#[cfg(feature = "otel")]
186pub fn set_span_parent_from_headers(
187 span: &tracing::Span,
188 headers: &BTreeMap<String, String>,
189 trace_id: &TraceId,
190 fallback_parent_span_id: Option<&str>,
191) -> Result<(), String> {
192 use opentelemetry::propagation::{Extractor, TextMapPropagator as _};
193 use opentelemetry::trace::TraceContextExt as _;
194 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
195
196 struct HeaderExtractor<'a>(&'a BTreeMap<String, String>);
197
198 impl Extractor for HeaderExtractor<'_> {
199 fn get(&self, key: &str) -> Option<&str> {
200 self.0.get(key).map(String::as_str)
201 }
202
203 fn keys(&self) -> Vec<&str> {
204 self.0.keys().map(String::as_str).collect()
205 }
206 }
207
208 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
209 let context = propagator.extract(&HeaderExtractor(headers));
210 let binding = context.span();
211 let span_context = binding.span_context();
212 if span_context.is_valid() {
213 return span
214 .set_parent(context)
215 .map_err(|error| format!("failed to attach OTel parent context: {error}"));
216 }
217 set_span_parent(span, trace_id, fallback_parent_span_id)
218}
219
220#[cfg(not(feature = "otel"))]
221pub fn set_span_parent_from_headers(
222 _span: &tracing::Span,
223 _headers: &BTreeMap<String, String>,
224 _trace_id: &TraceId,
225 _fallback_parent_span_id: Option<&str>,
226) -> Result<(), String> {
227 Ok(())
228}
229
230fn fmt_layer<S>() -> impl tracing_subscriber::Layer<S> + Send + Sync
231where
232 S: tracing::Subscriber,
233 for<'span> S: tracing_subscriber::registry::LookupSpan<'span>,
234{
235 tracing_subscriber::fmt::layer()
236 .with_target(false)
237 .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
238 .with_writer(std::io::stderr)
239 .compact()
240}
241
242#[cfg(feature = "otel")]
243fn build_tracer_provider_from_env(
244) -> Result<Option<opentelemetry_sdk::trace::SdkTracerProvider>, String> {
245 use opentelemetry::global;
246 use opentelemetry_otlp::{Protocol, WithExportConfig as _, WithHttpConfig as _};
247 use opentelemetry_sdk::runtime;
248 use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
249 use opentelemetry_sdk::Resource;
250
251 let Some(raw_endpoint) = std::env::var("HARN_OTEL_ENDPOINT")
252 .ok()
253 .map(|value| value.trim().to_string())
254 .filter(|value| !value.is_empty())
255 else {
256 return Ok(None);
257 };
258
259 let endpoint = normalize_otlp_traces_endpoint(&raw_endpoint);
260 let service_name = std::env::var("HARN_OTEL_SERVICE_NAME")
261 .ok()
262 .map(|value| value.trim().to_string())
263 .filter(|value| !value.is_empty())
264 .unwrap_or_else(|| "harn-orchestrator".to_string());
265 let headers = parse_headers(&std::env::var("HARN_OTEL_HEADERS").unwrap_or_default());
266
267 let exporter = opentelemetry_otlp::SpanExporter::builder()
268 .with_http()
269 .with_http_client(
270 reqwest::Client::builder()
271 .build()
272 .map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?,
273 )
274 .with_protocol(Protocol::HttpJson)
275 .with_endpoint(endpoint)
276 .with_headers(headers)
277 .build()
278 .map_err(|error| format!("failed to build OTel span exporter: {error}"))?;
279
280 let batch = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
281 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
282 .with_resource(Resource::builder().with_service_name(service_name).build())
283 .with_span_processor(batch)
284 .build();
285 global::set_tracer_provider(provider.clone());
286 Ok(Some(provider))
287}
288
289#[cfg(feature = "otel")]
290fn span_context(
291 trace_id: &TraceId,
292 parent_span_id: Option<&str>,
293) -> opentelemetry::trace::SpanContext {
294 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
295
296 let trace_id = otel_trace_id(trace_id);
297 let span_id = parent_span_id
298 .and_then(|value| SpanId::from_hex(value).ok())
299 .filter(|value| *value != SpanId::INVALID)
300 .unwrap_or_else(|| hashed_span_id(trace_id.to_string().as_bytes()));
301
302 SpanContext::new(
303 trace_id,
304 span_id,
305 TraceFlags::SAMPLED,
306 true,
307 TraceState::default(),
308 )
309}
310
311#[cfg(feature = "otel")]
312fn otel_trace_id(trace_id: &TraceId) -> opentelemetry::trace::TraceId {
313 use opentelemetry::trace::TraceId as OtelTraceId;
314
315 let normalized = trace_id
316 .0
317 .strip_prefix("trace_")
318 .unwrap_or(trace_id.0.as_str())
319 .replace('-', "");
320 if let Ok(trace_id) = OtelTraceId::from_hex(&normalized) {
321 if trace_id != OtelTraceId::INVALID {
322 return trace_id;
323 }
324 }
325 hashed_trace_id(trace_id.0.as_bytes())
326}
327
328#[cfg(feature = "otel")]
329fn hashed_trace_id(input: &[u8]) -> opentelemetry::trace::TraceId {
330 let digest = Sha256::digest(input);
331 let mut bytes = [0_u8; 16];
332 bytes.copy_from_slice(&digest[..16]);
333 opentelemetry::trace::TraceId::from_bytes(bytes)
334}
335
336#[cfg(feature = "otel")]
337fn hashed_span_id(input: &[u8]) -> opentelemetry::trace::SpanId {
338 let digest = Sha256::digest(input);
339 let mut bytes = [0_u8; 8];
340 bytes.copy_from_slice(&digest[..8]);
341 if bytes.iter().all(|byte| *byte == 0) {
342 bytes[7] = 1;
343 }
344 opentelemetry::trace::SpanId::from_bytes(bytes)
345}
346
347#[cfg(feature = "otel")]
348fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
349 let trimmed = endpoint.trim_end_matches('/');
350 if trimmed.ends_with("/v1/traces") {
351 trimmed.to_string()
352 } else {
353 format!("{trimmed}/v1/traces")
354 }
355}
356
357#[cfg(feature = "otel")]
358fn parse_headers(raw: &str) -> HashMap<String, String> {
359 raw.split([',', '\n', ';'])
360 .map(str::trim)
361 .filter(|segment| !segment.is_empty())
362 .filter_map(|segment| {
363 let (name, value) = segment
364 .split_once('=')
365 .or_else(|| segment.split_once(':'))?;
366 let name = name.trim();
367 let value = value.trim();
368 if name.is_empty() || value.is_empty() {
369 return None;
370 }
371 Some((name.to_string(), value.to_string()))
372 })
373 .collect()
374}
375
376#[cfg(all(test, feature = "otel"))]
377mod tests {
378 use super::*;
379
380 #[test]
381 fn normalizes_trace_endpoint_suffix() {
382 assert_eq!(
383 normalize_otlp_traces_endpoint("http://127.0.0.1:4318"),
384 "http://127.0.0.1:4318/v1/traces"
385 );
386 assert_eq!(
387 normalize_otlp_traces_endpoint("http://127.0.0.1:4318/v1/traces"),
388 "http://127.0.0.1:4318/v1/traces"
389 );
390 }
391
392 #[test]
393 fn parses_header_lists() {
394 let headers = parse_headers("authorization=Bearer token,x-tenant-id=tenant-123;trace=true");
395 assert_eq!(
396 headers.get("authorization"),
397 Some(&"Bearer token".to_string())
398 );
399 assert_eq!(headers.get("x-tenant-id"), Some(&"tenant-123".to_string()));
400 assert_eq!(headers.get("trace"), Some(&"true".to_string()));
401 }
402}