1use std::collections::{BTreeMap, HashMap};
2use std::fs::{self, OpenOptions};
3use std::io::{self, Write};
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6
7#[cfg(feature = "otel")]
8use sha2::{Digest, Sha256};
9#[cfg(feature = "otel")]
10use tracing_subscriber::filter::filter_fn;
11use tracing_subscriber::fmt::MakeWriter;
12use tracing_subscriber::layer::SubscriberExt;
13#[cfg(feature = "otel")]
14use tracing_subscriber::Layer as _;
15use tracing_subscriber::{filter::LevelFilter, EnvFilter};
16
17use crate::TraceId;
18
19pub const OTEL_PARENT_SPAN_ID_HEADER: &str = "otel_parent_span_id";
20pub const OTEL_TRACEPARENT_HEADER: &str = "traceparent";
21pub const OTEL_TRACESTATE_HEADER: &str = "tracestate";
22pub type SpanRef = tracing::Span;
23pub type SpanId = String;
24
25static OBSERVABILITY_INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub enum LogFormat {
29 Text,
30 Pretty,
31 Json,
32}
33
34#[derive(Clone, Debug)]
35pub struct OrchestratorObservabilityConfig {
36 pub log_format: LogFormat,
37 pub state_dir: Option<PathBuf>,
38}
39
40pub struct ObservabilityGuard {
41 #[cfg(feature = "otel")]
42 tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
43}
44
45impl ObservabilityGuard {
46 pub fn install_orchestrator_subscriber_from_env() -> Result<Self, String> {
47 Self::install_orchestrator_subscriber(OrchestratorObservabilityConfig {
48 log_format: LogFormat::Text,
49 state_dir: None,
50 })
51 }
52
53 pub fn install_orchestrator_subscriber(
54 config: OrchestratorObservabilityConfig,
55 ) -> Result<Self, String> {
56 if OBSERVABILITY_INIT.get().is_some() {
57 return Ok(Self {
58 #[cfg(feature = "otel")]
59 tracer_provider: None,
60 });
61 }
62
63 #[cfg(feature = "otel")]
64 {
65 if let Some(provider) = build_tracer_provider_from_env()? {
66 use opentelemetry::trace::TracerProvider as _;
67
68 let writer = log_writer(&config)?;
69 match config.log_format {
70 LogFormat::Json => {
71 let tracer = provider.tracer("harn.orchestrator");
72 let telemetry = tracing_opentelemetry::layer()
73 .with_tracer(tracer)
74 .with_filter(filter_fn(|metadata| {
75 metadata.is_span() && metadata.target().starts_with("harn")
76 }));
77 let subscriber = tracing_subscriber::registry()
78 .with(env_filter())
79 .with(
80 tracing_subscriber::fmt::layer()
81 .json()
82 .flatten_event(true)
83 .with_current_span(true)
84 .with_writer(writer),
85 )
86 .with(telemetry);
87 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
88 format!("failed to install global tracing subscriber: {error}")
89 })?;
90 }
91 LogFormat::Pretty => {
92 let tracer = provider.tracer("harn.orchestrator");
93 let telemetry = tracing_opentelemetry::layer()
94 .with_tracer(tracer)
95 .with_filter(filter_fn(|metadata| {
96 metadata.is_span() && metadata.target().starts_with("harn")
97 }));
98 let subscriber = tracing_subscriber::registry()
99 .with(env_filter())
100 .with(
101 tracing_subscriber::fmt::layer()
102 .pretty()
103 .with_writer(writer),
104 )
105 .with(telemetry);
106 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
107 format!("failed to install global tracing subscriber: {error}")
108 })?;
109 }
110 LogFormat::Text => {
111 let tracer = provider.tracer("harn.orchestrator");
112 let telemetry = tracing_opentelemetry::layer()
113 .with_tracer(tracer)
114 .with_filter(filter_fn(|metadata| {
115 metadata.is_span() && metadata.target().starts_with("harn")
116 }));
117 let subscriber = tracing_subscriber::registry()
118 .with(env_filter())
119 .with(
120 tracing_subscriber::fmt::layer()
121 .compact()
122 .with_target(false)
123 .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
124 .with_writer(writer),
125 )
126 .with(telemetry);
127 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
128 format!("failed to install global tracing subscriber: {error}")
129 })?;
130 }
131 }
132 let _ = OBSERVABILITY_INIT.set(());
133 return Ok(Self {
134 tracer_provider: Some(provider),
135 });
136 }
137 }
138
139 #[cfg(not(feature = "otel"))]
140 if std::env::var("HARN_OTEL_ENDPOINT")
141 .ok()
142 .is_some_and(|value| !value.trim().is_empty())
143 {
144 return Err(
145 "HARN_OTEL_ENDPOINT is set, but this build was compiled without the `otel` feature"
146 .to_string(),
147 );
148 }
149
150 let writer = log_writer(&config)?;
151 match config.log_format {
152 LogFormat::Json => {
153 let subscriber = tracing_subscriber::registry().with(env_filter()).with(
154 tracing_subscriber::fmt::layer()
155 .json()
156 .flatten_event(true)
157 .with_current_span(true)
158 .with_writer(writer),
159 );
160 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
161 format!("failed to install global tracing subscriber: {error}")
162 })?;
163 }
164 LogFormat::Pretty => {
165 let subscriber = tracing_subscriber::registry().with(env_filter()).with(
166 tracing_subscriber::fmt::layer()
167 .pretty()
168 .with_writer(writer),
169 );
170 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
171 format!("failed to install global tracing subscriber: {error}")
172 })?;
173 }
174 LogFormat::Text => {
175 let subscriber = tracing_subscriber::registry().with(env_filter()).with(
176 tracing_subscriber::fmt::layer()
177 .compact()
178 .with_target(false)
179 .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
180 .with_writer(writer),
181 );
182 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
183 format!("failed to install global tracing subscriber: {error}")
184 })?;
185 }
186 }
187 let _ = OBSERVABILITY_INIT.set(());
188 Ok(Self {
189 #[cfg(feature = "otel")]
190 tracer_provider: None,
191 })
192 }
193
194 #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
195 pub fn shutdown(mut self) -> Result<(), String> {
196 #[cfg(feature = "otel")]
197 if let Some(provider) = self.tracer_provider.take() {
198 provider
199 .force_flush()
200 .map_err(|error| format!("failed to flush OTel spans: {error}"))?;
201 provider
202 .shutdown()
203 .map_err(|error| format!("failed to shut down OTel tracer provider: {error}"))?;
204 }
205 Ok(())
206 }
207}
208
209fn env_filter() -> EnvFilter {
210 EnvFilter::builder()
211 .with_default_directive(LevelFilter::INFO.into())
212 .from_env_lossy()
213}
214
215fn log_writer(config: &OrchestratorObservabilityConfig) -> Result<OrchestratorLogWriter, String> {
216 let file = if let Some(state_dir) = config.state_dir.as_ref() {
217 let log_dir = state_dir.join("logs");
218 fs::create_dir_all(&log_dir).map_err(|error| {
219 format!(
220 "failed to create orchestrator log dir {}: {error}",
221 log_dir.display()
222 )
223 })?;
224 Some(Arc::new(Mutex::new(RotatingFile::open(
225 log_dir.join("orchestrator.log"),
226 )?)))
227 } else {
228 None
229 };
230 Ok(OrchestratorLogWriter {
231 format: config.log_format,
232 file,
233 })
234}
235
236#[derive(Clone)]
237struct OrchestratorLogWriter {
238 format: LogFormat,
239 file: Option<Arc<Mutex<RotatingFile>>>,
240}
241
242impl<'a> MakeWriter<'a> for OrchestratorLogWriter {
243 type Writer = OrchestratorLogLineWriter;
244
245 fn make_writer(&'a self) -> Self::Writer {
246 OrchestratorLogLineWriter {
247 format: self.format,
248 file: self.file.clone(),
249 }
250 }
251}
252
253struct OrchestratorLogLineWriter {
254 format: LogFormat,
255 file: Option<Arc<Mutex<RotatingFile>>>,
256}
257
258impl Write for OrchestratorLogLineWriter {
259 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
260 match self.format {
261 LogFormat::Json => io::stdout().write_all(buf)?,
262 LogFormat::Text | LogFormat::Pretty => io::stderr().write_all(buf)?,
263 }
264 if let Some(file) = self.file.as_ref() {
265 file.lock()
266 .expect("orchestrator log file poisoned")
267 .write_all(buf)?;
268 }
269 Ok(buf.len())
270 }
271
272 fn flush(&mut self) -> io::Result<()> {
273 match self.format {
274 LogFormat::Json => io::stdout().flush()?,
275 LogFormat::Text | LogFormat::Pretty => io::stderr().flush()?,
276 }
277 if let Some(file) = self.file.as_ref() {
278 file.lock()
279 .expect("orchestrator log file poisoned")
280 .flush()?;
281 }
282 Ok(())
283 }
284}
285
286struct RotatingFile {
287 path: PathBuf,
288 file: fs::File,
289 bytes_written: u64,
290}
291
292impl RotatingFile {
293 const MAX_BYTES: u64 = 10 * 1024 * 1024;
294
295 fn open(path: PathBuf) -> Result<Self, String> {
296 let bytes_written = fs::metadata(&path)
297 .map(|metadata| metadata.len())
298 .unwrap_or(0);
299 let file = OpenOptions::new()
300 .create(true)
301 .append(true)
302 .open(&path)
303 .map_err(|error| {
304 format!(
305 "failed to open orchestrator log {}: {error}",
306 path.display()
307 )
308 })?;
309 Ok(Self {
310 path,
311 file,
312 bytes_written,
313 })
314 }
315
316 fn rotate_if_needed(&mut self, next_write_bytes: usize) -> io::Result<()> {
317 if self.bytes_written + next_write_bytes as u64 <= Self::MAX_BYTES {
318 return Ok(());
319 }
320 self.file.flush()?;
321 let rotated = self.path.with_extension("log.1");
322 let _ = fs::remove_file(&rotated);
323 if self.path.exists() {
324 fs::rename(&self.path, rotated)?;
325 }
326 self.file = OpenOptions::new()
327 .create(true)
328 .append(true)
329 .open(&self.path)?;
330 self.bytes_written = 0;
331 Ok(())
332 }
333}
334
335impl Write for RotatingFile {
336 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
337 self.rotate_if_needed(buf.len())?;
338 let written = self.file.write(buf)?;
339 self.bytes_written += written as u64;
340 Ok(written)
341 }
342
343 fn flush(&mut self) -> io::Result<()> {
344 self.file.flush()
345 }
346}
347
348impl Drop for ObservabilityGuard {
349 fn drop(&mut self) {
350 #[cfg(feature = "otel")]
355 if let Some(provider) = self.tracer_provider.take() {
356 let _ = provider.force_flush();
357 let _ = provider.shutdown();
358 }
359 }
360}
361
362#[cfg(feature = "otel")]
363pub fn set_span_parent(
364 span: &tracing::Span,
365 trace_id: &TraceId,
366 parent_span_id: Option<&str>,
367) -> Result<(), String> {
368 use opentelemetry::trace::TraceContextExt as _;
369 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
370
371 let context = opentelemetry::Context::current()
372 .with_remote_span_context(span_context(trace_id, parent_span_id));
373 span.set_parent(context)
374 .map_err(|error| format!("failed to attach OTel parent context: {error}"))
375}
376
377#[cfg(not(feature = "otel"))]
378pub fn set_span_parent(
379 _span: &tracing::Span,
380 _trace_id: &TraceId,
381 _parent_span_id: Option<&str>,
382) -> Result<(), String> {
383 Ok(())
384}
385
386#[cfg(feature = "otel")]
387pub fn set_span_link(
388 span: &SpanRef,
389 linked_trace_id: &TraceId,
390 linked_span_id: &SpanId,
391 attributes: Option<HashMap<String, String>>,
392) -> Result<(), String> {
393 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
394
395 let attributes = attributes
396 .unwrap_or_default()
397 .into_iter()
398 .map(|(key, value)| opentelemetry::KeyValue::new(key, value))
399 .collect();
400 span.add_link_with_attributes(
401 span_context(linked_trace_id, Some(linked_span_id.as_str())),
402 attributes,
403 );
404 Ok(())
405}
406
407#[cfg(not(feature = "otel"))]
408pub fn set_span_link(
409 _span: &SpanRef,
410 _linked_trace_id: &TraceId,
411 _linked_span_id: &SpanId,
412 _attributes: Option<HashMap<String, String>>,
413) -> Result<(), String> {
414 Ok(())
415}
416
417#[cfg(feature = "otel")]
418pub fn current_span_id_hex(span: &tracing::Span) -> Option<String> {
419 use opentelemetry::trace::TraceContextExt as _;
420 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
421
422 let context = span.context();
423 let binding = context.span();
424 let span_context = binding.span_context();
425 span_context
426 .is_valid()
427 .then(|| span_context.span_id().to_string())
428}
429
430#[cfg(not(feature = "otel"))]
431pub fn current_span_id_hex(_span: &tracing::Span) -> Option<String> {
432 None
433}
434
435#[cfg(feature = "otel")]
436pub fn current_span_context_hex(span: &tracing::Span) -> Option<(String, String)> {
437 use opentelemetry::trace::TraceContextExt as _;
438 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
439
440 let context = span.context();
441 let binding = context.span();
442 let span_context = binding.span_context();
443 span_context.is_valid().then(|| {
444 (
445 span_context.trace_id().to_string(),
446 span_context.span_id().to_string(),
447 )
448 })
449}
450
451#[cfg(not(feature = "otel"))]
452pub fn current_span_context_hex(_span: &tracing::Span) -> Option<(String, String)> {
453 None
454}
455
456#[cfg(feature = "otel")]
457pub fn inject_current_context_headers(
458 span: &tracing::Span,
459 headers: &mut BTreeMap<String, String>,
460) -> Result<(), String> {
461 use opentelemetry::propagation::{Injector, TextMapPropagator as _};
462 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
463
464 struct HeaderInjector<'a>(&'a mut BTreeMap<String, String>);
465
466 impl Injector for HeaderInjector<'_> {
467 fn set(&mut self, key: &str, value: String) {
468 self.0.insert(key.to_string(), value);
469 }
470 }
471
472 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
473 propagator.inject_context(&span.context(), &mut HeaderInjector(headers));
474 Ok(())
475}
476
477#[cfg(not(feature = "otel"))]
478pub fn inject_current_context_headers(
479 _span: &tracing::Span,
480 _headers: &mut BTreeMap<String, String>,
481) -> Result<(), String> {
482 Ok(())
483}
484
485#[cfg(feature = "otel")]
486pub fn set_span_parent_from_headers(
487 span: &tracing::Span,
488 headers: &BTreeMap<String, String>,
489 trace_id: &TraceId,
490 fallback_parent_span_id: Option<&str>,
491) -> Result<(), String> {
492 use opentelemetry::propagation::{Extractor, TextMapPropagator as _};
493 use opentelemetry::trace::TraceContextExt as _;
494 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
495
496 struct HeaderExtractor<'a>(&'a BTreeMap<String, String>);
497
498 impl Extractor for HeaderExtractor<'_> {
499 fn get(&self, key: &str) -> Option<&str> {
500 self.0.get(key).map(String::as_str)
501 }
502
503 fn keys(&self) -> Vec<&str> {
504 self.0.keys().map(String::as_str).collect()
505 }
506 }
507
508 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
509 let context = propagator.extract(&HeaderExtractor(headers));
510 let binding = context.span();
511 let span_context = binding.span_context();
512 if span_context.is_valid() {
513 return span
514 .set_parent(context)
515 .map_err(|error| format!("failed to attach OTel parent context: {error}"));
516 }
517 set_span_parent(span, trace_id, fallback_parent_span_id)
518}
519
520#[cfg(not(feature = "otel"))]
521pub fn set_span_parent_from_headers(
522 _span: &tracing::Span,
523 _headers: &BTreeMap<String, String>,
524 _trace_id: &TraceId,
525 _fallback_parent_span_id: Option<&str>,
526) -> Result<(), String> {
527 Ok(())
528}
529
530#[cfg(feature = "otel")]
531fn build_tracer_provider_from_env(
532) -> Result<Option<opentelemetry_sdk::trace::SdkTracerProvider>, String> {
533 use opentelemetry::global;
534 use opentelemetry_otlp::{Protocol, WithExportConfig as _, WithHttpConfig as _};
535 use opentelemetry_sdk::runtime;
536 use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
537 use opentelemetry_sdk::trace::{Sampler, SimpleSpanProcessor};
538 use opentelemetry_sdk::Resource;
539
540 let Some(raw_endpoint) = std::env::var("HARN_OTEL_ENDPOINT")
541 .ok()
542 .map(|value| value.trim().to_string())
543 .filter(|value| !value.is_empty())
544 else {
545 return Ok(None);
546 };
547
548 let endpoint = normalize_otlp_traces_endpoint(&raw_endpoint);
549 let service_name = std::env::var("HARN_OTEL_SERVICE_NAME")
550 .ok()
551 .map(|value| value.trim().to_string())
552 .filter(|value| !value.is_empty())
553 .unwrap_or_else(|| "harn-orchestrator".to_string());
554 let headers = parse_headers(&std::env::var("HARN_OTEL_HEADERS").unwrap_or_default());
555 let processor_kind = parse_span_processor_kind(
556 std::env::var("HARN_OTEL_SPAN_PROCESSOR")
557 .ok()
558 .as_deref()
559 .map(str::trim)
560 .filter(|value| !value.is_empty()),
561 )?;
562 let sample_ratio = parse_sample_ratio(
563 std::env::var("HARN_OTEL_SAMPLE_RATIO")
564 .ok()
565 .as_deref()
566 .map(str::trim)
567 .filter(|value| !value.is_empty()),
568 )?;
569
570 let exporter = opentelemetry_otlp::SpanExporter::builder()
571 .with_http()
572 .with_http_client(
573 reqwest::Client::builder()
574 .build()
575 .map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?,
576 )
577 .with_protocol(Protocol::HttpJson)
578 .with_endpoint(endpoint)
579 .with_headers(headers)
580 .build()
581 .map_err(|error| format!("failed to build OTel span exporter: {error}"))?;
582
583 let mut builder = opentelemetry_sdk::trace::SdkTracerProvider::builder()
584 .with_resource(Resource::builder().with_service_name(service_name).build());
585 if sample_ratio < 1.0 {
586 builder = builder.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
587 sample_ratio,
588 ))));
589 }
590 builder = match processor_kind {
591 SpanProcessorKind::Batch => builder
592 .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()),
593 SpanProcessorKind::Simple => {
598 builder.with_span_processor(SimpleSpanProcessor::new(exporter))
599 }
600 };
601 let provider = builder.build();
602 global::set_tracer_provider(provider.clone());
603 Ok(Some(provider))
604}
605
606#[cfg(feature = "otel")]
607fn parse_sample_ratio(value: Option<&str>) -> Result<f64, String> {
608 let Some(value) = value else {
609 return Ok(1.0);
610 };
611 let ratio = value.parse::<f64>().map_err(|error| {
612 format!(
613 "invalid HARN_OTEL_SAMPLE_RATIO value {value:?}; expected a number in [0, 1]: {error}"
614 )
615 })?;
616 if ratio.is_finite() && (0.0..=1.0).contains(&ratio) {
617 Ok(ratio)
618 } else {
619 Err(format!(
620 "invalid HARN_OTEL_SAMPLE_RATIO value {value:?}; expected a number in [0, 1]"
621 ))
622 }
623}
624
625#[cfg(feature = "otel")]
626#[derive(Clone, Copy, Debug, PartialEq, Eq)]
627enum SpanProcessorKind {
628 Batch,
629 Simple,
630}
631
632#[cfg(feature = "otel")]
633fn parse_span_processor_kind(value: Option<&str>) -> Result<SpanProcessorKind, String> {
634 match value {
635 None => Ok(SpanProcessorKind::Batch),
636 Some(value) => match value.to_ascii_lowercase().as_str() {
637 "batch" => Ok(SpanProcessorKind::Batch),
638 "simple" => Ok(SpanProcessorKind::Simple),
639 other => Err(format!(
640 "unsupported HARN_OTEL_SPAN_PROCESSOR value {other:?}; expected 'batch' or 'simple'"
641 )),
642 },
643 }
644}
645
646#[cfg(feature = "otel")]
647fn span_context(
648 trace_id: &TraceId,
649 parent_span_id: Option<&str>,
650) -> opentelemetry::trace::SpanContext {
651 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
652
653 let trace_id = otel_trace_id(trace_id);
654 let span_id = parent_span_id
655 .and_then(|value| SpanId::from_hex(value).ok())
656 .filter(|value| *value != SpanId::INVALID)
657 .unwrap_or_else(|| hashed_span_id(trace_id.to_string().as_bytes()));
658
659 SpanContext::new(
660 trace_id,
661 span_id,
662 TraceFlags::SAMPLED,
663 true,
664 TraceState::default(),
665 )
666}
667
668#[cfg(feature = "otel")]
669fn otel_trace_id(trace_id: &TraceId) -> opentelemetry::trace::TraceId {
670 use opentelemetry::trace::TraceId as OtelTraceId;
671
672 let normalized = trace_id
673 .0
674 .strip_prefix("trace_")
675 .unwrap_or(trace_id.0.as_str())
676 .replace('-', "");
677 if let Ok(trace_id) = OtelTraceId::from_hex(&normalized) {
678 if trace_id != OtelTraceId::INVALID {
679 return trace_id;
680 }
681 }
682 hashed_trace_id(trace_id.0.as_bytes())
683}
684
685#[cfg(feature = "otel")]
686fn hashed_trace_id(input: &[u8]) -> opentelemetry::trace::TraceId {
687 let digest = Sha256::digest(input);
688 let mut bytes = [0_u8; 16];
689 bytes.copy_from_slice(&digest[..16]);
690 opentelemetry::trace::TraceId::from_bytes(bytes)
691}
692
693#[cfg(feature = "otel")]
694fn hashed_span_id(input: &[u8]) -> opentelemetry::trace::SpanId {
695 let digest = Sha256::digest(input);
696 let mut bytes = [0_u8; 8];
697 bytes.copy_from_slice(&digest[..8]);
698 if bytes.iter().all(|byte| *byte == 0) {
699 bytes[7] = 1;
700 }
701 opentelemetry::trace::SpanId::from_bytes(bytes)
702}
703
704#[cfg(feature = "otel")]
705fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
706 let trimmed = endpoint.trim_end_matches('/');
707 if trimmed.ends_with("/v1/traces") {
708 trimmed.to_string()
709 } else {
710 format!("{trimmed}/v1/traces")
711 }
712}
713
714#[cfg(feature = "otel")]
715fn parse_headers(raw: &str) -> HashMap<String, String> {
716 raw.split([',', '\n', ';'])
717 .map(str::trim)
718 .filter(|segment| !segment.is_empty())
719 .filter_map(|segment| {
720 let (name, value) = segment
721 .split_once('=')
722 .or_else(|| segment.split_once(':'))?;
723 let name = name.trim();
724 let value = value.trim();
725 if name.is_empty() || value.is_empty() {
726 return None;
727 }
728 Some((name.to_string(), value.to_string()))
729 })
730 .collect()
731}
732
733#[cfg(all(test, feature = "otel"))]
734mod tests {
735 use super::*;
736 use opentelemetry::trace::TracerProvider as _;
737 use opentelemetry_sdk::error::OTelSdkResult;
738 use opentelemetry_sdk::trace::{SdkTracerProvider, SpanData, SpanExporter, Tracer};
739 use std::sync::{Arc, Mutex};
740
741 #[derive(Clone, Default, Debug)]
742 struct TestExporter(Arc<Mutex<Vec<SpanData>>>);
743
744 impl SpanExporter for TestExporter {
745 async fn export(&self, mut batch: Vec<SpanData>) -> OTelSdkResult {
746 let mut spans = self.0.lock().expect("test exporter lock");
747 spans.append(&mut batch);
748 Ok(())
749 }
750 }
751
752 fn test_tracer() -> (
753 Tracer,
754 SdkTracerProvider,
755 TestExporter,
756 impl tracing::Subscriber,
757 ) {
758 let exporter = TestExporter::default();
759 let provider = SdkTracerProvider::builder()
760 .with_simple_exporter(exporter.clone())
761 .build();
762 let tracer = provider.tracer("harn-test");
763 let subscriber = tracing_subscriber::registry().with(
764 tracing_opentelemetry::layer()
765 .with_tracer(tracer.clone())
766 .with_filter(LevelFilter::INFO),
767 );
768 (tracer, provider, exporter, subscriber)
769 }
770
771 #[test]
772 fn normalizes_trace_endpoint_suffix() {
773 assert_eq!(
774 normalize_otlp_traces_endpoint("http://127.0.0.1:4318"),
775 "http://127.0.0.1:4318/v1/traces"
776 );
777 assert_eq!(
778 normalize_otlp_traces_endpoint("http://127.0.0.1:4318/v1/traces"),
779 "http://127.0.0.1:4318/v1/traces"
780 );
781 }
782
783 #[test]
784 fn parses_header_lists() {
785 let headers = parse_headers("authorization=Bearer token,x-tenant-id=tenant-123;trace=true");
786 assert_eq!(
787 headers.get("authorization"),
788 Some(&"Bearer token".to_string())
789 );
790 assert_eq!(headers.get("x-tenant-id"), Some(&"tenant-123".to_string()));
791 assert_eq!(headers.get("trace"), Some(&"true".to_string()));
792 }
793
794 #[test]
795 fn parses_span_processor_kind_defaults_to_batch() {
796 assert_eq!(
797 parse_span_processor_kind(None).unwrap(),
798 SpanProcessorKind::Batch
799 );
800 }
801
802 #[test]
803 fn parses_span_processor_kind_accepts_known_values() {
804 assert_eq!(
805 parse_span_processor_kind(Some("batch")).unwrap(),
806 SpanProcessorKind::Batch
807 );
808 assert_eq!(
809 parse_span_processor_kind(Some("Batch")).unwrap(),
810 SpanProcessorKind::Batch
811 );
812 assert_eq!(
813 parse_span_processor_kind(Some("simple")).unwrap(),
814 SpanProcessorKind::Simple
815 );
816 assert_eq!(
817 parse_span_processor_kind(Some("SIMPLE")).unwrap(),
818 SpanProcessorKind::Simple
819 );
820 }
821
822 #[test]
823 fn parses_span_processor_kind_rejects_unknown_values() {
824 let error = parse_span_processor_kind(Some("forwarder")).unwrap_err();
825 assert!(error.contains("forwarder"), "{error}");
826 assert!(error.contains("batch"), "{error}");
827 assert!(error.contains("simple"), "{error}");
828 }
829
830 #[test]
831 fn parse_sample_ratio_defaults_to_keep_all() {
832 assert_eq!(parse_sample_ratio(None).unwrap(), 1.0);
833 }
834
835 #[test]
836 fn parse_sample_ratio_accepts_valid_ratio() {
837 assert_eq!(parse_sample_ratio(Some("0.1")).unwrap(), 0.1);
838 assert_eq!(parse_sample_ratio(Some("0")).unwrap(), 0.0);
839 assert_eq!(parse_sample_ratio(Some("1")).unwrap(), 1.0);
840 }
841
842 #[test]
843 fn parse_sample_ratio_rejects_invalid_values() {
844 for value in ["2.0", "-1", "x", "NaN", "inf"] {
845 let error = parse_sample_ratio(Some(value)).unwrap_err();
846 assert!(error.contains(value), "{error}");
847 assert!(error.contains("[0, 1]"), "{error}");
848 }
849 }
850
851 #[test]
852 fn set_span_link_exports_link_without_parenting() {
853 let (_tracer, provider, exporter, subscriber) = test_tracer();
854 let linked_trace_id = TraceId("1234567890abcdef1234567890abcdef".to_string());
855 let linked_span_id: SpanId = "1234567890abcdef".to_string();
856
857 tracing::subscriber::with_default(subscriber, || {
858 let span = tracing::info_span!(target: "harn.vm.lifecycle", "resume");
859 set_span_link(
860 &span,
861 &linked_trace_id,
862 &linked_span_id,
863 Some(HashMap::from([(
864 "harn.link.kind".to_string(),
865 "suspension".to_string(),
866 )])),
867 )
868 .expect("set span link");
869 span.in_scope(|| {});
870 });
871
872 provider.force_flush().expect("flush spans");
873 drop(provider);
874 let spans = exporter.0.lock().expect("exported spans lock");
875 assert_eq!(spans.len(), 1);
876 let resume = &spans[0];
877 assert_eq!(resume.parent_span_id, opentelemetry::trace::SpanId::INVALID);
878 assert_eq!(resume.links.len(), 1);
879 let link = &resume.links[0];
880 assert_eq!(link.span_context.trace_id().to_string(), linked_trace_id.0);
881 assert_eq!(link.span_context.span_id().to_string(), linked_span_id);
882 assert_eq!(link.attributes.len(), 1);
883 assert_eq!(link.attributes[0].key.as_str(), "harn.link.kind");
884 }
885}