Skip to main content

harn_vm/observability/
otel.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::{self, OpenOptions};
3use std::io::{self, Write};
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6
7#[cfg(feature = "otel")]
8use sha2::{Digest, Sha256};
9#[cfg(feature = "otel")]
10use tracing_subscriber::filter::filter_fn;
11use tracing_subscriber::fmt::MakeWriter;
12use tracing_subscriber::layer::SubscriberExt;
13#[cfg(feature = "otel")]
14use tracing_subscriber::Layer as _;
15use tracing_subscriber::{filter::LevelFilter, EnvFilter};
16
17use crate::TraceId;
18
19pub const OTEL_PARENT_SPAN_ID_HEADER: &str = "otel_parent_span_id";
20pub const OTEL_TRACEPARENT_HEADER: &str = "traceparent";
21pub const OTEL_TRACESTATE_HEADER: &str = "tracestate";
22pub type SpanRef = tracing::Span;
23pub type SpanId = String;
24
25static OBSERVABILITY_INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub enum LogFormat {
29    Text,
30    Pretty,
31    Json,
32}
33
34#[derive(Clone, Debug)]
35pub struct OrchestratorObservabilityConfig {
36    pub log_format: LogFormat,
37    pub state_dir: Option<PathBuf>,
38}
39
40pub struct ObservabilityGuard {
41    #[cfg(feature = "otel")]
42    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
43}
44
45impl ObservabilityGuard {
46    pub fn install_orchestrator_subscriber_from_env() -> Result<Self, String> {
47        Self::install_orchestrator_subscriber(OrchestratorObservabilityConfig {
48            log_format: LogFormat::Text,
49            state_dir: None,
50        })
51    }
52
53    pub fn install_orchestrator_subscriber(
54        config: OrchestratorObservabilityConfig,
55    ) -> Result<Self, String> {
56        if OBSERVABILITY_INIT.get().is_some() {
57            return Ok(Self {
58                #[cfg(feature = "otel")]
59                tracer_provider: None,
60            });
61        }
62
63        #[cfg(feature = "otel")]
64        {
65            if let Some(provider) = build_tracer_provider_from_env()? {
66                use opentelemetry::trace::TracerProvider as _;
67
68                let writer = log_writer(&config)?;
69                match config.log_format {
70                    LogFormat::Json => {
71                        let tracer = provider.tracer("harn.orchestrator");
72                        let telemetry = tracing_opentelemetry::layer()
73                            .with_tracer(tracer)
74                            .with_filter(filter_fn(|metadata| {
75                                metadata.is_span() && metadata.target().starts_with("harn")
76                            }));
77                        let subscriber = tracing_subscriber::registry()
78                            .with(env_filter())
79                            .with(
80                                tracing_subscriber::fmt::layer()
81                                    .json()
82                                    .flatten_event(true)
83                                    .with_current_span(true)
84                                    .with_writer(writer),
85                            )
86                            .with(telemetry);
87                        tracing::subscriber::set_global_default(subscriber).map_err(|error| {
88                            format!("failed to install global tracing subscriber: {error}")
89                        })?;
90                    }
91                    LogFormat::Pretty => {
92                        let tracer = provider.tracer("harn.orchestrator");
93                        let telemetry = tracing_opentelemetry::layer()
94                            .with_tracer(tracer)
95                            .with_filter(filter_fn(|metadata| {
96                                metadata.is_span() && metadata.target().starts_with("harn")
97                            }));
98                        let subscriber = tracing_subscriber::registry()
99                            .with(env_filter())
100                            .with(
101                                tracing_subscriber::fmt::layer()
102                                    .pretty()
103                                    .with_writer(writer),
104                            )
105                            .with(telemetry);
106                        tracing::subscriber::set_global_default(subscriber).map_err(|error| {
107                            format!("failed to install global tracing subscriber: {error}")
108                        })?;
109                    }
110                    LogFormat::Text => {
111                        let tracer = provider.tracer("harn.orchestrator");
112                        let telemetry = tracing_opentelemetry::layer()
113                            .with_tracer(tracer)
114                            .with_filter(filter_fn(|metadata| {
115                                metadata.is_span() && metadata.target().starts_with("harn")
116                            }));
117                        let subscriber = tracing_subscriber::registry()
118                            .with(env_filter())
119                            .with(
120                                tracing_subscriber::fmt::layer()
121                                    .compact()
122                                    .with_target(false)
123                                    .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
124                                    .with_writer(writer),
125                            )
126                            .with(telemetry);
127                        tracing::subscriber::set_global_default(subscriber).map_err(|error| {
128                            format!("failed to install global tracing subscriber: {error}")
129                        })?;
130                    }
131                }
132                let _ = OBSERVABILITY_INIT.set(());
133                return Ok(Self {
134                    tracer_provider: Some(provider),
135                });
136            }
137        }
138
139        #[cfg(not(feature = "otel"))]
140        if std::env::var("HARN_OTEL_ENDPOINT")
141            .ok()
142            .is_some_and(|value| !value.trim().is_empty())
143        {
144            return Err(
145                "HARN_OTEL_ENDPOINT is set, but this build was compiled without the `otel` feature"
146                    .to_string(),
147            );
148        }
149
150        let writer = log_writer(&config)?;
151        match config.log_format {
152            LogFormat::Json => {
153                let subscriber = tracing_subscriber::registry().with(env_filter()).with(
154                    tracing_subscriber::fmt::layer()
155                        .json()
156                        .flatten_event(true)
157                        .with_current_span(true)
158                        .with_writer(writer),
159                );
160                tracing::subscriber::set_global_default(subscriber).map_err(|error| {
161                    format!("failed to install global tracing subscriber: {error}")
162                })?;
163            }
164            LogFormat::Pretty => {
165                let subscriber = tracing_subscriber::registry().with(env_filter()).with(
166                    tracing_subscriber::fmt::layer()
167                        .pretty()
168                        .with_writer(writer),
169                );
170                tracing::subscriber::set_global_default(subscriber).map_err(|error| {
171                    format!("failed to install global tracing subscriber: {error}")
172                })?;
173            }
174            LogFormat::Text => {
175                let subscriber = tracing_subscriber::registry().with(env_filter()).with(
176                    tracing_subscriber::fmt::layer()
177                        .compact()
178                        .with_target(false)
179                        .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
180                        .with_writer(writer),
181                );
182                tracing::subscriber::set_global_default(subscriber).map_err(|error| {
183                    format!("failed to install global tracing subscriber: {error}")
184                })?;
185            }
186        }
187        let _ = OBSERVABILITY_INIT.set(());
188        Ok(Self {
189            #[cfg(feature = "otel")]
190            tracer_provider: None,
191        })
192    }
193
194    #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
195    pub fn shutdown(mut self) -> Result<(), String> {
196        #[cfg(feature = "otel")]
197        if let Some(provider) = self.tracer_provider.take() {
198            provider
199                .force_flush()
200                .map_err(|error| format!("failed to flush OTel spans: {error}"))?;
201            provider
202                .shutdown()
203                .map_err(|error| format!("failed to shut down OTel tracer provider: {error}"))?;
204        }
205        Ok(())
206    }
207}
208
209fn env_filter() -> EnvFilter {
210    EnvFilter::builder()
211        .with_default_directive(LevelFilter::INFO.into())
212        .from_env_lossy()
213}
214
215fn log_writer(config: &OrchestratorObservabilityConfig) -> Result<OrchestratorLogWriter, String> {
216    let file = if let Some(state_dir) = config.state_dir.as_ref() {
217        let log_dir = state_dir.join("logs");
218        fs::create_dir_all(&log_dir).map_err(|error| {
219            format!(
220                "failed to create orchestrator log dir {}: {error}",
221                log_dir.display()
222            )
223        })?;
224        Some(Arc::new(Mutex::new(RotatingFile::open(
225            log_dir.join("orchestrator.log"),
226        )?)))
227    } else {
228        None
229    };
230    Ok(OrchestratorLogWriter {
231        format: config.log_format,
232        file,
233    })
234}
235
236#[derive(Clone)]
237struct OrchestratorLogWriter {
238    format: LogFormat,
239    file: Option<Arc<Mutex<RotatingFile>>>,
240}
241
242impl<'a> MakeWriter<'a> for OrchestratorLogWriter {
243    type Writer = OrchestratorLogLineWriter;
244
245    fn make_writer(&'a self) -> Self::Writer {
246        OrchestratorLogLineWriter {
247            format: self.format,
248            file: self.file.clone(),
249        }
250    }
251}
252
253struct OrchestratorLogLineWriter {
254    format: LogFormat,
255    file: Option<Arc<Mutex<RotatingFile>>>,
256}
257
258impl Write for OrchestratorLogLineWriter {
259    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
260        match self.format {
261            LogFormat::Json => io::stdout().write_all(buf)?,
262            LogFormat::Text | LogFormat::Pretty => io::stderr().write_all(buf)?,
263        }
264        if let Some(file) = self.file.as_ref() {
265            file.lock()
266                .expect("orchestrator log file poisoned")
267                .write_all(buf)?;
268        }
269        Ok(buf.len())
270    }
271
272    fn flush(&mut self) -> io::Result<()> {
273        match self.format {
274            LogFormat::Json => io::stdout().flush()?,
275            LogFormat::Text | LogFormat::Pretty => io::stderr().flush()?,
276        }
277        if let Some(file) = self.file.as_ref() {
278            file.lock()
279                .expect("orchestrator log file poisoned")
280                .flush()?;
281        }
282        Ok(())
283    }
284}
285
286struct RotatingFile {
287    path: PathBuf,
288    file: fs::File,
289    bytes_written: u64,
290}
291
292impl RotatingFile {
293    const MAX_BYTES: u64 = 10 * 1024 * 1024;
294
295    fn open(path: PathBuf) -> Result<Self, String> {
296        let bytes_written = fs::metadata(&path)
297            .map(|metadata| metadata.len())
298            .unwrap_or(0);
299        let file = OpenOptions::new()
300            .create(true)
301            .append(true)
302            .open(&path)
303            .map_err(|error| {
304                format!(
305                    "failed to open orchestrator log {}: {error}",
306                    path.display()
307                )
308            })?;
309        Ok(Self {
310            path,
311            file,
312            bytes_written,
313        })
314    }
315
316    fn rotate_if_needed(&mut self, next_write_bytes: usize) -> io::Result<()> {
317        if self.bytes_written + next_write_bytes as u64 <= Self::MAX_BYTES {
318            return Ok(());
319        }
320        self.file.flush()?;
321        let rotated = self.path.with_extension("log.1");
322        let _ = fs::remove_file(&rotated);
323        if self.path.exists() {
324            fs::rename(&self.path, rotated)?;
325        }
326        self.file = OpenOptions::new()
327            .create(true)
328            .append(true)
329            .open(&self.path)?;
330        self.bytes_written = 0;
331        Ok(())
332    }
333}
334
335impl Write for RotatingFile {
336    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
337        self.rotate_if_needed(buf.len())?;
338        let written = self.file.write(buf)?;
339        self.bytes_written += written as u64;
340        Ok(written)
341    }
342
343    fn flush(&mut self) -> io::Result<()> {
344        self.file.flush()
345    }
346}
347
348impl Drop for ObservabilityGuard {
349    fn drop(&mut self) {
350        // Best-effort flush + shutdown so span batches are delivered even when
351        // the caller exits via panic or early return without calling
352        // `shutdown()` explicitly. Ignore errors — there's nothing to recover
353        // to during teardown.
354        #[cfg(feature = "otel")]
355        if let Some(provider) = self.tracer_provider.take() {
356            let _ = provider.force_flush();
357            let _ = provider.shutdown();
358        }
359    }
360}
361
362#[cfg(feature = "otel")]
363pub fn set_span_parent(
364    span: &tracing::Span,
365    trace_id: &TraceId,
366    parent_span_id: Option<&str>,
367) -> Result<(), String> {
368    use opentelemetry::trace::TraceContextExt as _;
369    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
370
371    let context = opentelemetry::Context::current()
372        .with_remote_span_context(span_context(trace_id, parent_span_id));
373    span.set_parent(context)
374        .map_err(|error| format!("failed to attach OTel parent context: {error}"))
375}
376
377#[cfg(not(feature = "otel"))]
378pub fn set_span_parent(
379    _span: &tracing::Span,
380    _trace_id: &TraceId,
381    _parent_span_id: Option<&str>,
382) -> Result<(), String> {
383    Ok(())
384}
385
386#[cfg(feature = "otel")]
387pub fn set_span_link(
388    span: &SpanRef,
389    linked_trace_id: &TraceId,
390    linked_span_id: &SpanId,
391    attributes: Option<HashMap<String, String>>,
392) -> Result<(), String> {
393    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
394
395    let attributes = attributes
396        .unwrap_or_default()
397        .into_iter()
398        .map(|(key, value)| opentelemetry::KeyValue::new(key, value))
399        .collect();
400    span.add_link_with_attributes(
401        span_context(linked_trace_id, Some(linked_span_id.as_str())),
402        attributes,
403    );
404    Ok(())
405}
406
407#[cfg(not(feature = "otel"))]
408pub fn set_span_link(
409    _span: &SpanRef,
410    _linked_trace_id: &TraceId,
411    _linked_span_id: &SpanId,
412    _attributes: Option<HashMap<String, String>>,
413) -> Result<(), String> {
414    Ok(())
415}
416
417#[cfg(feature = "otel")]
418pub fn current_span_id_hex(span: &tracing::Span) -> Option<String> {
419    use opentelemetry::trace::TraceContextExt as _;
420    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
421
422    let context = span.context();
423    let binding = context.span();
424    let span_context = binding.span_context();
425    span_context
426        .is_valid()
427        .then(|| span_context.span_id().to_string())
428}
429
430#[cfg(not(feature = "otel"))]
431pub fn current_span_id_hex(_span: &tracing::Span) -> Option<String> {
432    None
433}
434
435#[cfg(feature = "otel")]
436pub fn current_span_context_hex(span: &tracing::Span) -> Option<(String, String)> {
437    use opentelemetry::trace::TraceContextExt as _;
438    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
439
440    let context = span.context();
441    let binding = context.span();
442    let span_context = binding.span_context();
443    span_context.is_valid().then(|| {
444        (
445            span_context.trace_id().to_string(),
446            span_context.span_id().to_string(),
447        )
448    })
449}
450
451#[cfg(not(feature = "otel"))]
452pub fn current_span_context_hex(_span: &tracing::Span) -> Option<(String, String)> {
453    None
454}
455
456#[cfg(feature = "otel")]
457pub fn inject_current_context_headers(
458    span: &tracing::Span,
459    headers: &mut BTreeMap<String, String>,
460) -> Result<(), String> {
461    use opentelemetry::propagation::{Injector, TextMapPropagator as _};
462    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
463
464    struct HeaderInjector<'a>(&'a mut BTreeMap<String, String>);
465
466    impl Injector for HeaderInjector<'_> {
467        fn set(&mut self, key: &str, value: String) {
468            self.0.insert(key.to_string(), value);
469        }
470    }
471
472    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
473    propagator.inject_context(&span.context(), &mut HeaderInjector(headers));
474    Ok(())
475}
476
477#[cfg(not(feature = "otel"))]
478pub fn inject_current_context_headers(
479    _span: &tracing::Span,
480    _headers: &mut BTreeMap<String, String>,
481) -> Result<(), String> {
482    Ok(())
483}
484
485#[cfg(feature = "otel")]
486pub fn set_span_parent_from_headers(
487    span: &tracing::Span,
488    headers: &BTreeMap<String, String>,
489    trace_id: &TraceId,
490    fallback_parent_span_id: Option<&str>,
491) -> Result<(), String> {
492    use opentelemetry::propagation::{Extractor, TextMapPropagator as _};
493    use opentelemetry::trace::TraceContextExt as _;
494    use tracing_opentelemetry::OpenTelemetrySpanExt as _;
495
496    struct HeaderExtractor<'a>(&'a BTreeMap<String, String>);
497
498    impl Extractor for HeaderExtractor<'_> {
499        fn get(&self, key: &str) -> Option<&str> {
500            self.0.get(key).map(String::as_str)
501        }
502
503        fn keys(&self) -> Vec<&str> {
504            self.0.keys().map(String::as_str).collect()
505        }
506    }
507
508    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
509    let context = propagator.extract(&HeaderExtractor(headers));
510    let binding = context.span();
511    let span_context = binding.span_context();
512    if span_context.is_valid() {
513        return span
514            .set_parent(context)
515            .map_err(|error| format!("failed to attach OTel parent context: {error}"));
516    }
517    set_span_parent(span, trace_id, fallback_parent_span_id)
518}
519
520#[cfg(not(feature = "otel"))]
521pub fn set_span_parent_from_headers(
522    _span: &tracing::Span,
523    _headers: &BTreeMap<String, String>,
524    _trace_id: &TraceId,
525    _fallback_parent_span_id: Option<&str>,
526) -> Result<(), String> {
527    Ok(())
528}
529
530#[cfg(feature = "otel")]
531fn build_tracer_provider_from_env(
532) -> Result<Option<opentelemetry_sdk::trace::SdkTracerProvider>, String> {
533    use opentelemetry::global;
534    use opentelemetry_otlp::{Protocol, WithExportConfig as _, WithHttpConfig as _};
535    use opentelemetry_sdk::runtime;
536    use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
537    use opentelemetry_sdk::trace::{Sampler, SimpleSpanProcessor};
538    use opentelemetry_sdk::Resource;
539
540    let Some(raw_endpoint) = std::env::var("HARN_OTEL_ENDPOINT")
541        .ok()
542        .map(|value| value.trim().to_string())
543        .filter(|value| !value.is_empty())
544    else {
545        return Ok(None);
546    };
547
548    let endpoint = normalize_otlp_traces_endpoint(&raw_endpoint);
549    let service_name = std::env::var("HARN_OTEL_SERVICE_NAME")
550        .ok()
551        .map(|value| value.trim().to_string())
552        .filter(|value| !value.is_empty())
553        .unwrap_or_else(|| "harn-orchestrator".to_string());
554    let headers = parse_headers(&std::env::var("HARN_OTEL_HEADERS").unwrap_or_default());
555    let processor_kind = parse_span_processor_kind(
556        std::env::var("HARN_OTEL_SPAN_PROCESSOR")
557            .ok()
558            .as_deref()
559            .map(str::trim)
560            .filter(|value| !value.is_empty()),
561    )?;
562    let sample_ratio = parse_sample_ratio(
563        std::env::var("HARN_OTEL_SAMPLE_RATIO")
564            .ok()
565            .as_deref()
566            .map(str::trim)
567            .filter(|value| !value.is_empty()),
568    )?;
569
570    let exporter = opentelemetry_otlp::SpanExporter::builder()
571        .with_http()
572        .with_http_client(
573            reqwest::Client::builder()
574                .build()
575                .map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?,
576        )
577        .with_protocol(Protocol::HttpJson)
578        .with_endpoint(endpoint)
579        .with_headers(headers)
580        .build()
581        .map_err(|error| format!("failed to build OTel span exporter: {error}"))?;
582
583    let mut builder = opentelemetry_sdk::trace::SdkTracerProvider::builder()
584        .with_resource(Resource::builder().with_service_name(service_name).build());
585    if sample_ratio < 1.0 {
586        builder = builder.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
587            sample_ratio,
588        ))));
589    }
590    builder = match processor_kind {
591        SpanProcessorKind::Batch => builder
592            .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()),
593        // Simple processor exports each span synchronously when it ends. This
594        // gives test harnesses a deterministic "process exit ⇒ all spans
595        // exported" guarantee without a force_flush race or scheduled-delay
596        // wall-clock dependency. Production paths must keep using batch.
597        SpanProcessorKind::Simple => {
598            builder.with_span_processor(SimpleSpanProcessor::new(exporter))
599        }
600    };
601    let provider = builder.build();
602    global::set_tracer_provider(provider.clone());
603    Ok(Some(provider))
604}
605
606#[cfg(feature = "otel")]
607fn parse_sample_ratio(value: Option<&str>) -> Result<f64, String> {
608    let Some(value) = value else {
609        return Ok(1.0);
610    };
611    let ratio = value.parse::<f64>().map_err(|error| {
612        format!(
613            "invalid HARN_OTEL_SAMPLE_RATIO value {value:?}; expected a number in [0, 1]: {error}"
614        )
615    })?;
616    if ratio.is_finite() && (0.0..=1.0).contains(&ratio) {
617        Ok(ratio)
618    } else {
619        Err(format!(
620            "invalid HARN_OTEL_SAMPLE_RATIO value {value:?}; expected a number in [0, 1]"
621        ))
622    }
623}
624
625#[cfg(feature = "otel")]
626#[derive(Clone, Copy, Debug, PartialEq, Eq)]
627enum SpanProcessorKind {
628    Batch,
629    Simple,
630}
631
632#[cfg(feature = "otel")]
633fn parse_span_processor_kind(value: Option<&str>) -> Result<SpanProcessorKind, String> {
634    match value {
635        None => Ok(SpanProcessorKind::Batch),
636        Some(value) => match value.to_ascii_lowercase().as_str() {
637            "batch" => Ok(SpanProcessorKind::Batch),
638            "simple" => Ok(SpanProcessorKind::Simple),
639            other => Err(format!(
640                "unsupported HARN_OTEL_SPAN_PROCESSOR value {other:?}; expected 'batch' or 'simple'"
641            )),
642        },
643    }
644}
645
646#[cfg(feature = "otel")]
647fn span_context(
648    trace_id: &TraceId,
649    parent_span_id: Option<&str>,
650) -> opentelemetry::trace::SpanContext {
651    use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
652
653    let trace_id = otel_trace_id(trace_id);
654    let span_id = parent_span_id
655        .and_then(|value| SpanId::from_hex(value).ok())
656        .filter(|value| *value != SpanId::INVALID)
657        .unwrap_or_else(|| hashed_span_id(trace_id.to_string().as_bytes()));
658
659    SpanContext::new(
660        trace_id,
661        span_id,
662        TraceFlags::SAMPLED,
663        true,
664        TraceState::default(),
665    )
666}
667
668#[cfg(feature = "otel")]
669fn otel_trace_id(trace_id: &TraceId) -> opentelemetry::trace::TraceId {
670    use opentelemetry::trace::TraceId as OtelTraceId;
671
672    let normalized = trace_id
673        .0
674        .strip_prefix("trace_")
675        .unwrap_or(trace_id.0.as_str())
676        .replace('-', "");
677    if let Ok(trace_id) = OtelTraceId::from_hex(&normalized) {
678        if trace_id != OtelTraceId::INVALID {
679            return trace_id;
680        }
681    }
682    hashed_trace_id(trace_id.0.as_bytes())
683}
684
685#[cfg(feature = "otel")]
686fn hashed_trace_id(input: &[u8]) -> opentelemetry::trace::TraceId {
687    let digest = Sha256::digest(input);
688    let mut bytes = [0_u8; 16];
689    bytes.copy_from_slice(&digest[..16]);
690    opentelemetry::trace::TraceId::from_bytes(bytes)
691}
692
693#[cfg(feature = "otel")]
694fn hashed_span_id(input: &[u8]) -> opentelemetry::trace::SpanId {
695    let digest = Sha256::digest(input);
696    let mut bytes = [0_u8; 8];
697    bytes.copy_from_slice(&digest[..8]);
698    if bytes.iter().all(|byte| *byte == 0) {
699        bytes[7] = 1;
700    }
701    opentelemetry::trace::SpanId::from_bytes(bytes)
702}
703
704#[cfg(feature = "otel")]
705fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
706    let trimmed = endpoint.trim_end_matches('/');
707    if trimmed.ends_with("/v1/traces") {
708        trimmed.to_string()
709    } else {
710        format!("{trimmed}/v1/traces")
711    }
712}
713
714#[cfg(feature = "otel")]
715fn parse_headers(raw: &str) -> HashMap<String, String> {
716    raw.split([',', '\n', ';'])
717        .map(str::trim)
718        .filter(|segment| !segment.is_empty())
719        .filter_map(|segment| {
720            let (name, value) = segment
721                .split_once('=')
722                .or_else(|| segment.split_once(':'))?;
723            let name = name.trim();
724            let value = value.trim();
725            if name.is_empty() || value.is_empty() {
726                return None;
727            }
728            Some((name.to_string(), value.to_string()))
729        })
730        .collect()
731}
732
733#[cfg(all(test, feature = "otel"))]
734mod tests {
735    use super::*;
736    use opentelemetry::trace::TracerProvider as _;
737    use opentelemetry_sdk::error::OTelSdkResult;
738    use opentelemetry_sdk::trace::{SdkTracerProvider, SpanData, SpanExporter, Tracer};
739    use std::sync::{Arc, Mutex};
740
741    #[derive(Clone, Default, Debug)]
742    struct TestExporter(Arc<Mutex<Vec<SpanData>>>);
743
744    impl SpanExporter for TestExporter {
745        async fn export(&self, mut batch: Vec<SpanData>) -> OTelSdkResult {
746            let mut spans = self.0.lock().expect("test exporter lock");
747            spans.append(&mut batch);
748            Ok(())
749        }
750    }
751
752    fn test_tracer() -> (
753        Tracer,
754        SdkTracerProvider,
755        TestExporter,
756        impl tracing::Subscriber,
757    ) {
758        let exporter = TestExporter::default();
759        let provider = SdkTracerProvider::builder()
760            .with_simple_exporter(exporter.clone())
761            .build();
762        let tracer = provider.tracer("harn-test");
763        let subscriber = tracing_subscriber::registry().with(
764            tracing_opentelemetry::layer()
765                .with_tracer(tracer.clone())
766                .with_filter(LevelFilter::INFO),
767        );
768        (tracer, provider, exporter, subscriber)
769    }
770
771    #[test]
772    fn normalizes_trace_endpoint_suffix() {
773        assert_eq!(
774            normalize_otlp_traces_endpoint("http://127.0.0.1:4318"),
775            "http://127.0.0.1:4318/v1/traces"
776        );
777        assert_eq!(
778            normalize_otlp_traces_endpoint("http://127.0.0.1:4318/v1/traces"),
779            "http://127.0.0.1:4318/v1/traces"
780        );
781    }
782
783    #[test]
784    fn parses_header_lists() {
785        let headers = parse_headers("authorization=Bearer token,x-tenant-id=tenant-123;trace=true");
786        assert_eq!(
787            headers.get("authorization"),
788            Some(&"Bearer token".to_string())
789        );
790        assert_eq!(headers.get("x-tenant-id"), Some(&"tenant-123".to_string()));
791        assert_eq!(headers.get("trace"), Some(&"true".to_string()));
792    }
793
794    #[test]
795    fn parses_span_processor_kind_defaults_to_batch() {
796        assert_eq!(
797            parse_span_processor_kind(None).unwrap(),
798            SpanProcessorKind::Batch
799        );
800    }
801
802    #[test]
803    fn parses_span_processor_kind_accepts_known_values() {
804        assert_eq!(
805            parse_span_processor_kind(Some("batch")).unwrap(),
806            SpanProcessorKind::Batch
807        );
808        assert_eq!(
809            parse_span_processor_kind(Some("Batch")).unwrap(),
810            SpanProcessorKind::Batch
811        );
812        assert_eq!(
813            parse_span_processor_kind(Some("simple")).unwrap(),
814            SpanProcessorKind::Simple
815        );
816        assert_eq!(
817            parse_span_processor_kind(Some("SIMPLE")).unwrap(),
818            SpanProcessorKind::Simple
819        );
820    }
821
822    #[test]
823    fn parses_span_processor_kind_rejects_unknown_values() {
824        let error = parse_span_processor_kind(Some("forwarder")).unwrap_err();
825        assert!(error.contains("forwarder"), "{error}");
826        assert!(error.contains("batch"), "{error}");
827        assert!(error.contains("simple"), "{error}");
828    }
829
830    #[test]
831    fn parse_sample_ratio_defaults_to_keep_all() {
832        assert_eq!(parse_sample_ratio(None).unwrap(), 1.0);
833    }
834
835    #[test]
836    fn parse_sample_ratio_accepts_valid_ratio() {
837        assert_eq!(parse_sample_ratio(Some("0.1")).unwrap(), 0.1);
838        assert_eq!(parse_sample_ratio(Some("0")).unwrap(), 0.0);
839        assert_eq!(parse_sample_ratio(Some("1")).unwrap(), 1.0);
840    }
841
842    #[test]
843    fn parse_sample_ratio_rejects_invalid_values() {
844        for value in ["2.0", "-1", "x", "NaN", "inf"] {
845            let error = parse_sample_ratio(Some(value)).unwrap_err();
846            assert!(error.contains(value), "{error}");
847            assert!(error.contains("[0, 1]"), "{error}");
848        }
849    }
850
851    #[test]
852    fn set_span_link_exports_link_without_parenting() {
853        let (_tracer, provider, exporter, subscriber) = test_tracer();
854        let linked_trace_id = TraceId("1234567890abcdef1234567890abcdef".to_string());
855        let linked_span_id: SpanId = "1234567890abcdef".to_string();
856
857        tracing::subscriber::with_default(subscriber, || {
858            let span = tracing::info_span!(target: "harn.vm.lifecycle", "resume");
859            set_span_link(
860                &span,
861                &linked_trace_id,
862                &linked_span_id,
863                Some(HashMap::from([(
864                    "harn.link.kind".to_string(),
865                    "suspension".to_string(),
866                )])),
867            )
868            .expect("set span link");
869            span.in_scope(|| {});
870        });
871
872        provider.force_flush().expect("flush spans");
873        drop(provider);
874        let spans = exporter.0.lock().expect("exported spans lock");
875        assert_eq!(spans.len(), 1);
876        let resume = &spans[0];
877        assert_eq!(resume.parent_span_id, opentelemetry::trace::SpanId::INVALID);
878        assert_eq!(resume.links.len(), 1);
879        let link = &resume.links[0];
880        assert_eq!(link.span_context.trace_id().to_string(), linked_trace_id.0);
881        assert_eq!(link.span_context.span_id().to_string(), linked_span_id);
882        assert_eq!(link.attributes.len(), 1);
883        assert_eq!(link.attributes[0].key.as_str(), "harn.link.kind");
884    }
885}