1use std::collections::BTreeMap;
2#[cfg(feature = "otel")]
3use std::collections::HashMap;
4use std::fs::{self, OpenOptions};
5use std::io::{self, Write};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9#[cfg(feature = "otel")]
10use sha2::{Digest, Sha256};
11#[cfg(feature = "otel")]
12use tracing_subscriber::filter::filter_fn;
13use tracing_subscriber::fmt::MakeWriter;
14use tracing_subscriber::layer::SubscriberExt;
15#[cfg(feature = "otel")]
16use tracing_subscriber::Layer as _;
17use tracing_subscriber::{filter::LevelFilter, EnvFilter};
18
19use crate::TraceId;
20
21pub const OTEL_PARENT_SPAN_ID_HEADER: &str = "otel_parent_span_id";
22pub const OTEL_TRACEPARENT_HEADER: &str = "traceparent";
23pub const OTEL_TRACESTATE_HEADER: &str = "tracestate";
24
25static OBSERVABILITY_INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub enum LogFormat {
29 Text,
30 Pretty,
31 Json,
32}
33
34#[derive(Clone, Debug)]
35pub struct OrchestratorObservabilityConfig {
36 pub log_format: LogFormat,
37 pub state_dir: Option<PathBuf>,
38}
39
40pub struct ObservabilityGuard {
41 #[cfg(feature = "otel")]
42 tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
43}
44
45impl ObservabilityGuard {
46 pub fn install_orchestrator_subscriber_from_env() -> Result<Self, String> {
47 Self::install_orchestrator_subscriber(OrchestratorObservabilityConfig {
48 log_format: LogFormat::Text,
49 state_dir: None,
50 })
51 }
52
53 pub fn install_orchestrator_subscriber(
54 config: OrchestratorObservabilityConfig,
55 ) -> Result<Self, String> {
56 if OBSERVABILITY_INIT.get().is_some() {
57 return Ok(Self {
58 #[cfg(feature = "otel")]
59 tracer_provider: None,
60 });
61 }
62
63 #[cfg(feature = "otel")]
64 {
65 if let Some(provider) = build_tracer_provider_from_env()? {
66 use opentelemetry::trace::TracerProvider as _;
67
68 let writer = log_writer(&config)?;
69 match config.log_format {
70 LogFormat::Json => {
71 let tracer = provider.tracer("harn.orchestrator");
72 let telemetry = tracing_opentelemetry::layer()
73 .with_tracer(tracer)
74 .with_filter(filter_fn(|metadata| {
75 metadata.is_span() && metadata.target().starts_with("harn")
76 }));
77 let subscriber = tracing_subscriber::registry()
78 .with(env_filter())
79 .with(
80 tracing_subscriber::fmt::layer()
81 .json()
82 .flatten_event(true)
83 .with_current_span(true)
84 .with_writer(writer),
85 )
86 .with(telemetry);
87 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
88 format!("failed to install global tracing subscriber: {error}")
89 })?;
90 }
91 LogFormat::Pretty => {
92 let tracer = provider.tracer("harn.orchestrator");
93 let telemetry = tracing_opentelemetry::layer()
94 .with_tracer(tracer)
95 .with_filter(filter_fn(|metadata| {
96 metadata.is_span() && metadata.target().starts_with("harn")
97 }));
98 let subscriber = tracing_subscriber::registry()
99 .with(env_filter())
100 .with(
101 tracing_subscriber::fmt::layer()
102 .pretty()
103 .with_writer(writer),
104 )
105 .with(telemetry);
106 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
107 format!("failed to install global tracing subscriber: {error}")
108 })?;
109 }
110 LogFormat::Text => {
111 let tracer = provider.tracer("harn.orchestrator");
112 let telemetry = tracing_opentelemetry::layer()
113 .with_tracer(tracer)
114 .with_filter(filter_fn(|metadata| {
115 metadata.is_span() && metadata.target().starts_with("harn")
116 }));
117 let subscriber = tracing_subscriber::registry()
118 .with(env_filter())
119 .with(
120 tracing_subscriber::fmt::layer()
121 .compact()
122 .with_target(false)
123 .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
124 .with_writer(writer),
125 )
126 .with(telemetry);
127 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
128 format!("failed to install global tracing subscriber: {error}")
129 })?;
130 }
131 }
132 let _ = OBSERVABILITY_INIT.set(());
133 return Ok(Self {
134 tracer_provider: Some(provider),
135 });
136 }
137 }
138
139 #[cfg(not(feature = "otel"))]
140 if std::env::var("HARN_OTEL_ENDPOINT")
141 .ok()
142 .filter(|value| !value.trim().is_empty())
143 .is_some()
144 {
145 return Err(
146 "HARN_OTEL_ENDPOINT is set, but this build was compiled without the `otel` feature"
147 .to_string(),
148 );
149 }
150
151 let writer = log_writer(&config)?;
152 match config.log_format {
153 LogFormat::Json => {
154 let subscriber = tracing_subscriber::registry().with(env_filter()).with(
155 tracing_subscriber::fmt::layer()
156 .json()
157 .flatten_event(true)
158 .with_current_span(true)
159 .with_writer(writer),
160 );
161 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
162 format!("failed to install global tracing subscriber: {error}")
163 })?;
164 }
165 LogFormat::Pretty => {
166 let subscriber = tracing_subscriber::registry().with(env_filter()).with(
167 tracing_subscriber::fmt::layer()
168 .pretty()
169 .with_writer(writer),
170 );
171 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
172 format!("failed to install global tracing subscriber: {error}")
173 })?;
174 }
175 LogFormat::Text => {
176 let subscriber = tracing_subscriber::registry().with(env_filter()).with(
177 tracing_subscriber::fmt::layer()
178 .compact()
179 .with_target(false)
180 .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stderr()))
181 .with_writer(writer),
182 );
183 tracing::subscriber::set_global_default(subscriber).map_err(|error| {
184 format!("failed to install global tracing subscriber: {error}")
185 })?;
186 }
187 }
188 let _ = OBSERVABILITY_INIT.set(());
189 Ok(Self {
190 #[cfg(feature = "otel")]
191 tracer_provider: None,
192 })
193 }
194
195 #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
196 pub fn shutdown(mut self) -> Result<(), String> {
197 #[cfg(feature = "otel")]
198 if let Some(provider) = self.tracer_provider.take() {
199 provider
200 .force_flush()
201 .map_err(|error| format!("failed to flush OTel spans: {error}"))?;
202 provider
203 .shutdown()
204 .map_err(|error| format!("failed to shut down OTel tracer provider: {error}"))?;
205 }
206 Ok(())
207 }
208}
209
210fn env_filter() -> EnvFilter {
211 EnvFilter::builder()
212 .with_default_directive(LevelFilter::INFO.into())
213 .from_env_lossy()
214}
215
216fn log_writer(config: &OrchestratorObservabilityConfig) -> Result<OrchestratorLogWriter, String> {
217 let file = if let Some(state_dir) = config.state_dir.as_ref() {
218 let log_dir = state_dir.join("logs");
219 fs::create_dir_all(&log_dir).map_err(|error| {
220 format!(
221 "failed to create orchestrator log dir {}: {error}",
222 log_dir.display()
223 )
224 })?;
225 Some(Arc::new(Mutex::new(RotatingFile::open(
226 log_dir.join("orchestrator.log"),
227 )?)))
228 } else {
229 None
230 };
231 Ok(OrchestratorLogWriter {
232 format: config.log_format,
233 file,
234 })
235}
236
237#[derive(Clone)]
238struct OrchestratorLogWriter {
239 format: LogFormat,
240 file: Option<Arc<Mutex<RotatingFile>>>,
241}
242
243impl<'a> MakeWriter<'a> for OrchestratorLogWriter {
244 type Writer = OrchestratorLogLineWriter;
245
246 fn make_writer(&'a self) -> Self::Writer {
247 OrchestratorLogLineWriter {
248 format: self.format,
249 file: self.file.clone(),
250 }
251 }
252}
253
254struct OrchestratorLogLineWriter {
255 format: LogFormat,
256 file: Option<Arc<Mutex<RotatingFile>>>,
257}
258
259impl Write for OrchestratorLogLineWriter {
260 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
261 match self.format {
262 LogFormat::Json => io::stdout().write_all(buf)?,
263 LogFormat::Text | LogFormat::Pretty => io::stderr().write_all(buf)?,
264 }
265 if let Some(file) = self.file.as_ref() {
266 file.lock()
267 .expect("orchestrator log file poisoned")
268 .write_all(buf)?;
269 }
270 Ok(buf.len())
271 }
272
273 fn flush(&mut self) -> io::Result<()> {
274 match self.format {
275 LogFormat::Json => io::stdout().flush()?,
276 LogFormat::Text | LogFormat::Pretty => io::stderr().flush()?,
277 }
278 if let Some(file) = self.file.as_ref() {
279 file.lock()
280 .expect("orchestrator log file poisoned")
281 .flush()?;
282 }
283 Ok(())
284 }
285}
286
287struct RotatingFile {
288 path: PathBuf,
289 file: fs::File,
290 bytes_written: u64,
291}
292
293impl RotatingFile {
294 const MAX_BYTES: u64 = 10 * 1024 * 1024;
295
296 fn open(path: PathBuf) -> Result<Self, String> {
297 let bytes_written = fs::metadata(&path)
298 .map(|metadata| metadata.len())
299 .unwrap_or(0);
300 let file = OpenOptions::new()
301 .create(true)
302 .append(true)
303 .open(&path)
304 .map_err(|error| {
305 format!(
306 "failed to open orchestrator log {}: {error}",
307 path.display()
308 )
309 })?;
310 Ok(Self {
311 path,
312 file,
313 bytes_written,
314 })
315 }
316
317 fn rotate_if_needed(&mut self, next_write_bytes: usize) -> io::Result<()> {
318 if self.bytes_written + next_write_bytes as u64 <= Self::MAX_BYTES {
319 return Ok(());
320 }
321 self.file.flush()?;
322 let rotated = self.path.with_extension("log.1");
323 let _ = fs::remove_file(&rotated);
324 if self.path.exists() {
325 fs::rename(&self.path, rotated)?;
326 }
327 self.file = OpenOptions::new()
328 .create(true)
329 .append(true)
330 .open(&self.path)?;
331 self.bytes_written = 0;
332 Ok(())
333 }
334}
335
336impl Write for RotatingFile {
337 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
338 self.rotate_if_needed(buf.len())?;
339 let written = self.file.write(buf)?;
340 self.bytes_written += written as u64;
341 Ok(written)
342 }
343
344 fn flush(&mut self) -> io::Result<()> {
345 self.file.flush()
346 }
347}
348
349impl Drop for ObservabilityGuard {
350 fn drop(&mut self) {
351 #[cfg(feature = "otel")]
356 if let Some(provider) = self.tracer_provider.take() {
357 let _ = provider.force_flush();
358 let _ = provider.shutdown();
359 }
360 }
361}
362
363#[cfg(feature = "otel")]
364pub fn set_span_parent(
365 span: &tracing::Span,
366 trace_id: &TraceId,
367 parent_span_id: Option<&str>,
368) -> Result<(), String> {
369 use opentelemetry::trace::TraceContextExt as _;
370 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
371
372 let context = opentelemetry::Context::current()
373 .with_remote_span_context(span_context(trace_id, parent_span_id));
374 span.set_parent(context)
375 .map_err(|error| format!("failed to attach OTel parent context: {error}"))
376}
377
378#[cfg(not(feature = "otel"))]
379pub fn set_span_parent(
380 _span: &tracing::Span,
381 _trace_id: &TraceId,
382 _parent_span_id: Option<&str>,
383) -> Result<(), String> {
384 Ok(())
385}
386
387#[cfg(feature = "otel")]
388pub fn current_span_id_hex(span: &tracing::Span) -> Option<String> {
389 use opentelemetry::trace::TraceContextExt as _;
390 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
391
392 let context = span.context();
393 let binding = context.span();
394 let span_context = binding.span_context();
395 span_context
396 .is_valid()
397 .then(|| span_context.span_id().to_string())
398}
399
400#[cfg(not(feature = "otel"))]
401pub fn current_span_id_hex(_span: &tracing::Span) -> Option<String> {
402 None
403}
404
405#[cfg(feature = "otel")]
406pub fn inject_current_context_headers(
407 span: &tracing::Span,
408 headers: &mut BTreeMap<String, String>,
409) -> Result<(), String> {
410 use opentelemetry::propagation::{Injector, TextMapPropagator as _};
411 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
412
413 struct HeaderInjector<'a>(&'a mut BTreeMap<String, String>);
414
415 impl Injector for HeaderInjector<'_> {
416 fn set(&mut self, key: &str, value: String) {
417 self.0.insert(key.to_string(), value);
418 }
419 }
420
421 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
422 propagator.inject_context(&span.context(), &mut HeaderInjector(headers));
423 Ok(())
424}
425
426#[cfg(not(feature = "otel"))]
427pub fn inject_current_context_headers(
428 _span: &tracing::Span,
429 _headers: &mut BTreeMap<String, String>,
430) -> Result<(), String> {
431 Ok(())
432}
433
434#[cfg(feature = "otel")]
435pub fn set_span_parent_from_headers(
436 span: &tracing::Span,
437 headers: &BTreeMap<String, String>,
438 trace_id: &TraceId,
439 fallback_parent_span_id: Option<&str>,
440) -> Result<(), String> {
441 use opentelemetry::propagation::{Extractor, TextMapPropagator as _};
442 use opentelemetry::trace::TraceContextExt as _;
443 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
444
445 struct HeaderExtractor<'a>(&'a BTreeMap<String, String>);
446
447 impl Extractor for HeaderExtractor<'_> {
448 fn get(&self, key: &str) -> Option<&str> {
449 self.0.get(key).map(String::as_str)
450 }
451
452 fn keys(&self) -> Vec<&str> {
453 self.0.keys().map(String::as_str).collect()
454 }
455 }
456
457 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
458 let context = propagator.extract(&HeaderExtractor(headers));
459 let binding = context.span();
460 let span_context = binding.span_context();
461 if span_context.is_valid() {
462 return span
463 .set_parent(context)
464 .map_err(|error| format!("failed to attach OTel parent context: {error}"));
465 }
466 set_span_parent(span, trace_id, fallback_parent_span_id)
467}
468
469#[cfg(not(feature = "otel"))]
470pub fn set_span_parent_from_headers(
471 _span: &tracing::Span,
472 _headers: &BTreeMap<String, String>,
473 _trace_id: &TraceId,
474 _fallback_parent_span_id: Option<&str>,
475) -> Result<(), String> {
476 Ok(())
477}
478
479#[cfg(feature = "otel")]
480fn build_tracer_provider_from_env(
481) -> Result<Option<opentelemetry_sdk::trace::SdkTracerProvider>, String> {
482 use opentelemetry::global;
483 use opentelemetry_otlp::{Protocol, WithExportConfig as _, WithHttpConfig as _};
484 use opentelemetry_sdk::runtime;
485 use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
486 use opentelemetry_sdk::trace::SimpleSpanProcessor;
487 use opentelemetry_sdk::Resource;
488
489 let Some(raw_endpoint) = std::env::var("HARN_OTEL_ENDPOINT")
490 .ok()
491 .map(|value| value.trim().to_string())
492 .filter(|value| !value.is_empty())
493 else {
494 return Ok(None);
495 };
496
497 let endpoint = normalize_otlp_traces_endpoint(&raw_endpoint);
498 let service_name = std::env::var("HARN_OTEL_SERVICE_NAME")
499 .ok()
500 .map(|value| value.trim().to_string())
501 .filter(|value| !value.is_empty())
502 .unwrap_or_else(|| "harn-orchestrator".to_string());
503 let headers = parse_headers(&std::env::var("HARN_OTEL_HEADERS").unwrap_or_default());
504 let processor_kind = parse_span_processor_kind(
505 std::env::var("HARN_OTEL_SPAN_PROCESSOR")
506 .ok()
507 .as_deref()
508 .map(str::trim)
509 .filter(|value| !value.is_empty()),
510 )?;
511
512 let exporter = opentelemetry_otlp::SpanExporter::builder()
513 .with_http()
514 .with_http_client(
515 reqwest::Client::builder()
516 .build()
517 .map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?,
518 )
519 .with_protocol(Protocol::HttpJson)
520 .with_endpoint(endpoint)
521 .with_headers(headers)
522 .build()
523 .map_err(|error| format!("failed to build OTel span exporter: {error}"))?;
524
525 let mut builder = opentelemetry_sdk::trace::SdkTracerProvider::builder()
526 .with_resource(Resource::builder().with_service_name(service_name).build());
527 builder = match processor_kind {
528 SpanProcessorKind::Batch => builder
529 .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()),
530 SpanProcessorKind::Simple => {
535 builder.with_span_processor(SimpleSpanProcessor::new(exporter))
536 }
537 };
538 let provider = builder.build();
539 global::set_tracer_provider(provider.clone());
540 Ok(Some(provider))
541}
542
543#[cfg(feature = "otel")]
544#[derive(Clone, Copy, Debug, PartialEq, Eq)]
545enum SpanProcessorKind {
546 Batch,
547 Simple,
548}
549
550#[cfg(feature = "otel")]
551fn parse_span_processor_kind(value: Option<&str>) -> Result<SpanProcessorKind, String> {
552 match value {
553 None => Ok(SpanProcessorKind::Batch),
554 Some(value) => match value.to_ascii_lowercase().as_str() {
555 "batch" => Ok(SpanProcessorKind::Batch),
556 "simple" => Ok(SpanProcessorKind::Simple),
557 other => Err(format!(
558 "unsupported HARN_OTEL_SPAN_PROCESSOR value {other:?}; expected 'batch' or 'simple'"
559 )),
560 },
561 }
562}
563
564#[cfg(feature = "otel")]
565fn span_context(
566 trace_id: &TraceId,
567 parent_span_id: Option<&str>,
568) -> opentelemetry::trace::SpanContext {
569 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
570
571 let trace_id = otel_trace_id(trace_id);
572 let span_id = parent_span_id
573 .and_then(|value| SpanId::from_hex(value).ok())
574 .filter(|value| *value != SpanId::INVALID)
575 .unwrap_or_else(|| hashed_span_id(trace_id.to_string().as_bytes()));
576
577 SpanContext::new(
578 trace_id,
579 span_id,
580 TraceFlags::SAMPLED,
581 true,
582 TraceState::default(),
583 )
584}
585
586#[cfg(feature = "otel")]
587fn otel_trace_id(trace_id: &TraceId) -> opentelemetry::trace::TraceId {
588 use opentelemetry::trace::TraceId as OtelTraceId;
589
590 let normalized = trace_id
591 .0
592 .strip_prefix("trace_")
593 .unwrap_or(trace_id.0.as_str())
594 .replace('-', "");
595 if let Ok(trace_id) = OtelTraceId::from_hex(&normalized) {
596 if trace_id != OtelTraceId::INVALID {
597 return trace_id;
598 }
599 }
600 hashed_trace_id(trace_id.0.as_bytes())
601}
602
603#[cfg(feature = "otel")]
604fn hashed_trace_id(input: &[u8]) -> opentelemetry::trace::TraceId {
605 let digest = Sha256::digest(input);
606 let mut bytes = [0_u8; 16];
607 bytes.copy_from_slice(&digest[..16]);
608 opentelemetry::trace::TraceId::from_bytes(bytes)
609}
610
611#[cfg(feature = "otel")]
612fn hashed_span_id(input: &[u8]) -> opentelemetry::trace::SpanId {
613 let digest = Sha256::digest(input);
614 let mut bytes = [0_u8; 8];
615 bytes.copy_from_slice(&digest[..8]);
616 if bytes.iter().all(|byte| *byte == 0) {
617 bytes[7] = 1;
618 }
619 opentelemetry::trace::SpanId::from_bytes(bytes)
620}
621
622#[cfg(feature = "otel")]
623fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
624 let trimmed = endpoint.trim_end_matches('/');
625 if trimmed.ends_with("/v1/traces") {
626 trimmed.to_string()
627 } else {
628 format!("{trimmed}/v1/traces")
629 }
630}
631
632#[cfg(feature = "otel")]
633fn parse_headers(raw: &str) -> HashMap<String, String> {
634 raw.split([',', '\n', ';'])
635 .map(str::trim)
636 .filter(|segment| !segment.is_empty())
637 .filter_map(|segment| {
638 let (name, value) = segment
639 .split_once('=')
640 .or_else(|| segment.split_once(':'))?;
641 let name = name.trim();
642 let value = value.trim();
643 if name.is_empty() || value.is_empty() {
644 return None;
645 }
646 Some((name.to_string(), value.to_string()))
647 })
648 .collect()
649}
650
651#[cfg(all(test, feature = "otel"))]
652mod tests {
653 use super::*;
654
655 #[test]
656 fn normalizes_trace_endpoint_suffix() {
657 assert_eq!(
658 normalize_otlp_traces_endpoint("http://127.0.0.1:4318"),
659 "http://127.0.0.1:4318/v1/traces"
660 );
661 assert_eq!(
662 normalize_otlp_traces_endpoint("http://127.0.0.1:4318/v1/traces"),
663 "http://127.0.0.1:4318/v1/traces"
664 );
665 }
666
667 #[test]
668 fn parses_header_lists() {
669 let headers = parse_headers("authorization=Bearer token,x-tenant-id=tenant-123;trace=true");
670 assert_eq!(
671 headers.get("authorization"),
672 Some(&"Bearer token".to_string())
673 );
674 assert_eq!(headers.get("x-tenant-id"), Some(&"tenant-123".to_string()));
675 assert_eq!(headers.get("trace"), Some(&"true".to_string()));
676 }
677
678 #[test]
679 fn parses_span_processor_kind_defaults_to_batch() {
680 assert_eq!(
681 parse_span_processor_kind(None).unwrap(),
682 SpanProcessorKind::Batch
683 );
684 }
685
686 #[test]
687 fn parses_span_processor_kind_accepts_known_values() {
688 assert_eq!(
689 parse_span_processor_kind(Some("batch")).unwrap(),
690 SpanProcessorKind::Batch
691 );
692 assert_eq!(
693 parse_span_processor_kind(Some("Batch")).unwrap(),
694 SpanProcessorKind::Batch
695 );
696 assert_eq!(
697 parse_span_processor_kind(Some("simple")).unwrap(),
698 SpanProcessorKind::Simple
699 );
700 assert_eq!(
701 parse_span_processor_kind(Some("SIMPLE")).unwrap(),
702 SpanProcessorKind::Simple
703 );
704 }
705
706 #[test]
707 fn parses_span_processor_kind_rejects_unknown_values() {
708 let error = parse_span_processor_kind(Some("forwarder")).unwrap_err();
709 assert!(error.contains("forwarder"), "{error}");
710 assert!(error.contains("batch"), "{error}");
711 assert!(error.contains("simple"), "{error}");
712 }
713}