Skip to main content

harn_vm/observability/
otel.rs

1use std::collections::BTreeMap;
2#[cfg(feature = "otel")]
3use std::collections::HashMap;
4use std::fs::{self, OpenOptions};
5use std::io::{self, Write};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9#[cfg(feature = "otel")]
10use sha2::{Digest, Sha256};
11#[cfg(feature = "otel")]
12use tracing_subscriber::filter::filter_fn;
13use tracing_subscriber::fmt::MakeWriter;
14use tracing_subscriber::layer::SubscriberExt;
15#[cfg(feature = "otel")]
16use tracing_subscriber::Layer as _;
17use tracing_subscriber::{filter::LevelFilter, EnvFilter};
18
19use crate::TraceId;
20
21pub const OTEL_PARENT_SPAN_ID_HEADER: &str = "otel_parent_span_id";
22pub const OTEL_TRACEPARENT_HEADER: &str = "traceparent";
23pub const OTEL_TRACESTATE_HEADER: &str = "tracestate";
24
25static OBSERVABILITY_INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub enum LogFormat {
29    Text,
30    Pretty,
31    Json,
32}
33
34#[derive(Clone, Debug)]
35pub struct OrchestratorObservabilityConfig {
36    pub log_format: LogFormat,
37    pub state_dir: Option<PathBuf>,
38}
39
40pub struct ObservabilityGuard {
41    #[cfg(feature = "otel")]
42    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
43}
44
45impl ObservabilityGuard {
46    pub fn install_orchestrator_subscriber_from_env() -> Result<Self, String> {
47        Self::install_orchestrator_subscriber(OrchestratorObservabilityConfig {
48            log_format: LogFormat::Text,
49            state_dir: None,
50        })
51    }
52
53    pub fn install_orchestrator_subscriber(
54        config: OrchestratorObservabilityConfig,
55    ) -> Result<Self, String> {
56        if OBSERVABILITY_INIT.get().is_some() {
57            return Ok(Self {
58                #[cfg(feature = "otel")]
59                tracer_provider: None,
60            });
61        }
62
63        #[cfg(feature = "otel")]
64        {
65            if let Some(provider) = build_tracer_provider_from_env()? {
66                use opentelemetry::trace::TracerProvider as _;
67
68                let writer = log_writer(&config)?;
69                match config.log_format {
70                    LogFormat::Json => {
71                        let tracer = provider.tracer("harn.orchestrator");
72                        let telemetry = tracing_opentelemetry::layer()
73                            .with_tracer(tracer)
74                            .with_filter(filter_fn(|metadata| {
75                                metadata.is_span() && metadata.target().starts_with("harn")
76                            }));
77                        let subscriber = tracing_subscriber::registry()
78                            .with(env_filter())
79                            .with(
80                                tracing_subscriber::fmt::layer()
81                                    .json()
82                                    .flatten_event(true)
83                                    .with_current_span(true)
84                                    .with_writer(writer),
85                            )
86                            .with(telemetry);
87                        tracing::subscriber::set_global_default(subscriber).map_err(|error| {
88                            format!("failed to install global tracing subscriber: {error}")
89                        })?;
90                    }
91                    LogFormat::Pretty => {
92                        let tracer = provider.tracer("harn.orchestrator");
93                        let telemetry = tracing_opentelemetry::layer()
94                            .with_tracer(tracer)
95                            .with_filter(filter_fn(|metadata| {
96                                metadata.is_span() && metadata.target().starts_with("harn")
97                            }));
98                        let subscriber = tracing_subscriber::registry()
99                            .with(env_filter())
100                            .with(
101                                tracing_subscriber::fmt::layer()
102                                    .pretty()
103                                    .with_writer(writer),
104                            )
105                            .with(telemetry);
106                        tracing::subscriber::set_global_default(subscriber).map_err(|error| {
107                            format!("failed to install global tracing subscriber: {error}")
108                        })?;
109                    }
110                    LogFormat::Text => {
111                        let tracer = provider.tracer("harn.orchestrator");
112                        let telemetry = tracing_opentelemetry::layer()
113                            .with_tracer(tracer)
114                            .with_filter(filter_fn(|metadata| {
115                                metadata.is_span() && metadata.target().starts_with("harn")
116                            }));
117                        let subscriber = tracing_subscriber::registry()
118                            .with(env_filter())
119                            .with(
120                                tracing_subscriber::fmt::layer()
121                                    .compact()
122                                    .with_target(false)
123                                    .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
124                                    .with_writer(writer),
125                            )
126                            .with(telemetry);
127                        tracing::subscriber::set_global_default(subscriber).map_err(|error| {
128                            format!("failed to install global tracing subscriber: {error}")
129                        })?;
130                    }
131                }
132                let _ = OBSERVABILITY_INIT.set(());
133                return Ok(Self {
134                    tracer_provider: Some(provider),
135                });
136            }
137        }
138
139        #[cfg(not(feature = "otel"))]
140        if std::env::var("HARN_OTEL_ENDPOINT")
141            .ok()
142            .filter(|value| !value.trim().is_empty())
143            .is_some()
144        {
145            return Err(
146                "HARN_OTEL_ENDPOINT is set, but this build was compiled without the `otel` feature"
147                    .to_string(),
148            );
149        }
150
151        let writer = log_writer(&config)?;
152        match config.log_format {
153            LogFormat::Json => {
154                let subscriber = tracing_subscriber::registry().with(env_filter()).with(
155                    tracing_subscriber::fmt::layer()
156                        .json()
157                        .flatten_event(true)
158                        .with_current_span(true)
159                        .with_writer(writer),
160                );
161                tracing::subscriber::set_global_default(subscriber).map_err(|error| {
162                    format!("failed to install global tracing subscriber: {error}")
163                })?;
164            }
165            LogFormat::Pretty => {
166                let subscriber = tracing_subscriber::registry().with(env_filter()).with(
167                    tracing_subscriber::fmt::layer()
168                        .pretty()
169                        .with_writer(writer),
170                );
171                tracing::subscriber::set_global_default(subscriber).map_err(|error| {
172                    format!("failed to install global tracing subscriber: {error}")
173                })?;
174            }
175            LogFormat::Text => {
176                let subscriber = tracing_subscriber::registry().with(env_filter()).with(
177                    tracing_subscriber::fmt::layer()
178                        .compact()
179                        .with_target(false)
180                        .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
181                        .with_writer(writer),
182                );
183                tracing::subscriber::set_global_default(subscriber).map_err(|error| {
184                    format!("failed to install global tracing subscriber: {error}")
185                })?;
186            }
187        }
188        let _ = OBSERVABILITY_INIT.set(());
189        Ok(Self {
190            #[cfg(feature = "otel")]
191            tracer_provider: None,
192        })
193    }
194
195    #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
196    pub fn shutdown(mut self) -> Result<(), String> {
197        #[cfg(feature = "otel")]
198        if let Some(provider) = self.tracer_provider.take() {
199            provider
200                .force_flush()
201                .map_err(|error| format!("failed to flush OTel spans: {error}"))?;
202            provider
203                .shutdown()
204                .map_err(|error| format!("failed to shut down OTel tracer provider: {error}"))?;
205        }
206        Ok(())
207    }
208}
209
210fn env_filter() -> EnvFilter {
211    EnvFilter::builder()
212        .with_default_directive(LevelFilter::INFO.into())
213        .from_env_lossy()
214}
215
216fn log_writer(config: &OrchestratorObservabilityConfig) -> Result<OrchestratorLogWriter, String> {
217    let file = if let Some(state_dir) = config.state_dir.as_ref() {
218        let log_dir = state_dir.join("logs");
219        fs::create_dir_all(&log_dir).map_err(|error| {
220            format!(
221                "failed to create orchestrator log dir {}: {error}",
222                log_dir.display()
223            )
224        })?;
225        Some(Arc::new(Mutex::new(RotatingFile::open(
226            log_dir.join("orchestrator.log"),
227        )?)))
228    } else {
229        None
230    };
231    Ok(OrchestratorLogWriter {
232        format: config.log_format,
233        file,
234    })
235}
236
237#[derive(Clone)]
238struct OrchestratorLogWriter {
239    format: LogFormat,
240    file: Option<Arc<Mutex<RotatingFile>>>,
241}
242
243impl<'a> MakeWriter<'a> for OrchestratorLogWriter {
244    type Writer = OrchestratorLogLineWriter;
245
246    fn make_writer(&'a self) -> Self::Writer {
247        OrchestratorLogLineWriter {
248            format: self.format,
249            file: self.file.clone(),
250        }
251    }
252}
253
254struct OrchestratorLogLineWriter {
255    format: LogFormat,
256    file: Option<Arc<Mutex<RotatingFile>>>,
257}
258
259impl Write for OrchestratorLogLineWriter {
260    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
261        match self.format {
262            LogFormat::Json => io::stdout().write_all(buf)?,
263            LogFormat::Text | LogFormat::Pretty => io::stderr().write_all(buf)?,
264        }
265        if let Some(file) = self.file.as_ref() {
266            file.lock()
267                .expect("orchestrator log file poisoned")
268                .write_all(buf)?;
269        }
270        Ok(buf.len())
271    }
272
273    fn flush(&mut self) -> io::Result<()> {
274        match self.format {
275            LogFormat::Json => io::stdout().flush()?,
276            LogFormat::Text | LogFormat::Pretty => io::stderr().flush()?,
277        }
278        if let Some(file) = self.file.as_ref() {
279            file.lock()
280                .expect("orchestrator log file poisoned")
281                .flush()?;
282        }
283        Ok(())
284    }
285}
286
287struct RotatingFile {
288    path: PathBuf,
289    file: fs::File,
290    bytes_written: u64,
291}
292
293impl RotatingFile {
294    const MAX_BYTES: u64 = 10 * 1024 * 1024;
295
296    fn open(path: PathBuf) -> Result<Self, String> {
297        let bytes_written = fs::metadata(&path)
298            .map(|metadata| metadata.len())
299            .unwrap_or(0);
300        let file = OpenOptions::new()
301            .create(true)
302            .append(true)
303            .open(&path)
304            .map_err(|error| {
305                format!(
306                    "failed to open orchestrator log {}: {error}",
307                    path.display()
308                )
309            })?;
310        Ok(Self {
311            path,
312            file,
313            bytes_written,
314        })
315    }
316
317    fn rotate_if_needed(&mut self, next_write_bytes: usize) -> io::Result<()> {
318        if self.bytes_written + next_write_bytes as u64 <= Self::MAX_BYTES {
319            return Ok(());
320        }
321        self.file.flush()?;
322        let rotated = self.path.with_extension("log.1");
323        let _ = fs::remove_file(&rotated);
324        if self.path.exists() {
325            fs::rename(&self.path, rotated)?;
326        }
327        self.file = OpenOptions::new()
328            .create(true)
329            .append(true)
330            .open(&self.path)?;
331        self.bytes_written = 0;
332        Ok(())
333    }
334}
335
336impl Write for RotatingFile {
337    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
338        self.rotate_if_needed(buf.len())?;
339        let written = self.file.write(buf)?;
340        self.bytes_written += written as u64;
341        Ok(written)
342    }
343
344    fn flush(&mut self) -> io::Result<()> {
345        self.file.flush()
346    }
347}
348
349impl Drop for ObservabilityGuard {
350    fn drop(&mut self) {
351        // Best-effort flush + shutdown so span batches are delivered even when
352        // the caller exits via panic or early return without calling
353        // `shutdown()` explicitly. Ignore errors — there's nothing to recover
354        // to during teardown.
355        #[cfg(feature = "otel")]
356        if let Some(provider) = self.tracer_provider.take() {
357            let _ = provider.force_flush();
358            let _ = provider.shutdown();
359        }
360    }
361}
362
363#[cfg(feature = "otel")]
364pub fn set_span_parent(
365    span: &tracing::Span,
366    trace_id: &TraceId,
367    parent_span_id: Option<&str>,
368) -> Result<(), String> {
369    use opentelemetry::trace::TraceContextExt as _;
370    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
371
372    let context = opentelemetry::Context::current()
373        .with_remote_span_context(span_context(trace_id, parent_span_id));
374    span.set_parent(context)
375        .map_err(|error| format!("failed to attach OTel parent context: {error}"))
376}
377
378#[cfg(not(feature = "otel"))]
379pub fn set_span_parent(
380    _span: &tracing::Span,
381    _trace_id: &TraceId,
382    _parent_span_id: Option<&str>,
383) -> Result<(), String> {
384    Ok(())
385}
386
387#[cfg(feature = "otel")]
388pub fn current_span_id_hex(span: &tracing::Span) -> Option<String> {
389    use opentelemetry::trace::TraceContextExt as _;
390    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
391
392    let context = span.context();
393    let binding = context.span();
394    let span_context = binding.span_context();
395    span_context
396        .is_valid()
397        .then(|| span_context.span_id().to_string())
398}
399
400#[cfg(not(feature = "otel"))]
401pub fn current_span_id_hex(_span: &tracing::Span) -> Option<String> {
402    None
403}
404
405#[cfg(feature = "otel")]
406pub fn inject_current_context_headers(
407    span: &tracing::Span,
408    headers: &mut BTreeMap<String, String>,
409) -> Result<(), String> {
410    use opentelemetry::propagation::{Injector, TextMapPropagator as _};
411    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
412
413    struct HeaderInjector<'a>(&'a mut BTreeMap<String, String>);
414
415    impl Injector for HeaderInjector<'_> {
416        fn set(&mut self, key: &str, value: String) {
417            self.0.insert(key.to_string(), value);
418        }
419    }
420
421    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
422    propagator.inject_context(&span.context(), &mut HeaderInjector(headers));
423    Ok(())
424}
425
426#[cfg(not(feature = "otel"))]
427pub fn inject_current_context_headers(
428    _span: &tracing::Span,
429    _headers: &mut BTreeMap<String, String>,
430) -> Result<(), String> {
431    Ok(())
432}
433
434#[cfg(feature = "otel")]
435pub fn set_span_parent_from_headers(
436    span: &tracing::Span,
437    headers: &BTreeMap<String, String>,
438    trace_id: &TraceId,
439    fallback_parent_span_id: Option<&str>,
440) -> Result<(), String> {
441    use opentelemetry::propagation::{Extractor, TextMapPropagator as _};
442    use opentelemetry::trace::TraceContextExt as _;
443    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
444
445    struct HeaderExtractor<'a>(&'a BTreeMap<String, String>);
446
447    impl Extractor for HeaderExtractor<'_> {
448        fn get(&self, key: &str) -> Option<&str> {
449            self.0.get(key).map(String::as_str)
450        }
451
452        fn keys(&self) -> Vec<&str> {
453            self.0.keys().map(String::as_str).collect()
454        }
455    }
456
457    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
458    let context = propagator.extract(&HeaderExtractor(headers));
459    let binding = context.span();
460    let span_context = binding.span_context();
461    if span_context.is_valid() {
462        return span
463            .set_parent(context)
464            .map_err(|error| format!("failed to attach OTel parent context: {error}"));
465    }
466    set_span_parent(span, trace_id, fallback_parent_span_id)
467}
468
469#[cfg(not(feature = "otel"))]
470pub fn set_span_parent_from_headers(
471    _span: &tracing::Span,
472    _headers: &BTreeMap<String, String>,
473    _trace_id: &TraceId,
474    _fallback_parent_span_id: Option<&str>,
475) -> Result<(), String> {
476    Ok(())
477}
478
479#[cfg(feature = "otel")]
480fn build_tracer_provider_from_env(
481) -> Result<Option<opentelemetry_sdk::trace::SdkTracerProvider>, String> {
482    use opentelemetry::global;
483    use opentelemetry_otlp::{Protocol, WithExportConfig as _, WithHttpConfig as _};
484    use opentelemetry_sdk::runtime;
485    use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
486    use opentelemetry_sdk::Resource;
487
488    let Some(raw_endpoint) = std::env::var("HARN_OTEL_ENDPOINT")
489        .ok()
490        .map(|value| value.trim().to_string())
491        .filter(|value| !value.is_empty())
492    else {
493        return Ok(None);
494    };
495
496    let endpoint = normalize_otlp_traces_endpoint(&raw_endpoint);
497    let service_name = std::env::var("HARN_OTEL_SERVICE_NAME")
498        .ok()
499        .map(|value| value.trim().to_string())
500        .filter(|value| !value.is_empty())
501        .unwrap_or_else(|| "harn-orchestrator".to_string());
502    let headers = parse_headers(&std::env::var("HARN_OTEL_HEADERS").unwrap_or_default());
503
504    let exporter = opentelemetry_otlp::SpanExporter::builder()
505        .with_http()
506        .with_http_client(
507            reqwest::Client::builder()
508                .build()
509                .map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?,
510        )
511        .with_protocol(Protocol::HttpJson)
512        .with_endpoint(endpoint)
513        .with_headers(headers)
514        .build()
515        .map_err(|error| format!("failed to build OTel span exporter: {error}"))?;
516
517    let batch = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
518    let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
519        .with_resource(Resource::builder().with_service_name(service_name).build())
520        .with_span_processor(batch)
521        .build();
522    global::set_tracer_provider(provider.clone());
523    Ok(Some(provider))
524}
525
526#[cfg(feature = "otel")]
527fn span_context(
528    trace_id: &TraceId,
529    parent_span_id: Option<&str>,
530) -> opentelemetry::trace::SpanContext {
531    use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
532
533    let trace_id = otel_trace_id(trace_id);
534    let span_id = parent_span_id
535        .and_then(|value| SpanId::from_hex(value).ok())
536        .filter(|value| *value != SpanId::INVALID)
537        .unwrap_or_else(|| hashed_span_id(trace_id.to_string().as_bytes()));
538
539    SpanContext::new(
540        trace_id,
541        span_id,
542        TraceFlags::SAMPLED,
543        true,
544        TraceState::default(),
545    )
546}
547
548#[cfg(feature = "otel")]
549fn otel_trace_id(trace_id: &TraceId) -> opentelemetry::trace::TraceId {
550    use opentelemetry::trace::TraceId as OtelTraceId;
551
552    let normalized = trace_id
553        .0
554        .strip_prefix("trace_")
555        .unwrap_or(trace_id.0.as_str())
556        .replace('-', "");
557    if let Ok(trace_id) = OtelTraceId::from_hex(&normalized) {
558        if trace_id != OtelTraceId::INVALID {
559            return trace_id;
560        }
561    }
562    hashed_trace_id(trace_id.0.as_bytes())
563}
564
565#[cfg(feature = "otel")]
566fn hashed_trace_id(input: &[u8]) -> opentelemetry::trace::TraceId {
567    let digest = Sha256::digest(input);
568    let mut bytes = [0_u8; 16];
569    bytes.copy_from_slice(&digest[..16]);
570    opentelemetry::trace::TraceId::from_bytes(bytes)
571}
572
573#[cfg(feature = "otel")]
574fn hashed_span_id(input: &[u8]) -> opentelemetry::trace::SpanId {
575    let digest = Sha256::digest(input);
576    let mut bytes = [0_u8; 8];
577    bytes.copy_from_slice(&digest[..8]);
578    if bytes.iter().all(|byte| *byte == 0) {
579        bytes[7] = 1;
580    }
581    opentelemetry::trace::SpanId::from_bytes(bytes)
582}
583
584#[cfg(feature = "otel")]
585fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
586    let trimmed = endpoint.trim_end_matches('/');
587    if trimmed.ends_with("/v1/traces") {
588        trimmed.to_string()
589    } else {
590        format!("{trimmed}/v1/traces")
591    }
592}
593
594#[cfg(feature = "otel")]
595fn parse_headers(raw: &str) -> HashMap<String, String> {
596    raw.split([',', '\n', ';'])
597        .map(str::trim)
598        .filter(|segment| !segment.is_empty())
599        .filter_map(|segment| {
600            let (name, value) = segment
601                .split_once('=')
602                .or_else(|| segment.split_once(':'))?;
603            let name = name.trim();
604            let value = value.trim();
605            if name.is_empty() || value.is_empty() {
606                return None;
607            }
608            Some((name.to_string(), value.to_string()))
609        })
610        .collect()
611}
612
613#[cfg(all(test, feature = "otel"))]
614mod tests {
615    use super::*;
616
617    #[test]
618    fn normalizes_trace_endpoint_suffix() {
619        assert_eq!(
620            normalize_otlp_traces_endpoint("http://127.0.0.1:4318"),
621            "http://127.0.0.1:4318/v1/traces"
622        );
623        assert_eq!(
624            normalize_otlp_traces_endpoint("http://127.0.0.1:4318/v1/traces"),
625            "http://127.0.0.1:4318/v1/traces"
626        );
627    }
628
629    #[test]
630    fn parses_header_lists() {
631        let headers = parse_headers("authorization=Bearer token,x-tenant-id=tenant-123;trace=true");
632        assert_eq!(
633            headers.get("authorization"),
634            Some(&"Bearer token".to_string())
635        );
636        assert_eq!(headers.get("x-tenant-id"), Some(&"tenant-123".to_string()));
637        assert_eq!(headers.get("trace"), Some(&"true".to_string()));
638    }
639}