1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum EventLevel {
15 Trace,
16 Debug,
17 Info,
18 Warn,
19 Error,
20}
21
22#[derive(Clone, Debug)]
24pub struct LogEvent {
25 pub level: EventLevel,
26 pub category: String,
27 pub message: String,
28 pub metadata: BTreeMap<String, serde_json::Value>,
29}
30
31#[derive(Clone, Debug)]
33pub struct SpanEvent {
34 pub span_id: u64,
35 pub parent_id: Option<u64>,
36 pub name: String,
37 pub kind: String,
38 pub metadata: BTreeMap<String, serde_json::Value>,
39}
40
41pub trait EventSink {
43 fn emit_log(&self, event: &LogEvent);
44 fn emit_span_start(&self, event: &SpanEvent);
45 fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
46}
47
48pub struct StderrSink;
50
51impl EventSink for StderrSink {
52 fn emit_log(&self, event: &LogEvent) {
53 let level_str = match event.level {
54 EventLevel::Trace => "TRACE",
55 EventLevel::Debug => "DEBUG",
56 EventLevel::Info => "INFO",
57 EventLevel::Warn => "WARN",
58 EventLevel::Error => "ERROR",
59 };
60 match event.level {
63 EventLevel::Warn => {
64 eprintln!("[harn] warning: {}", event.message);
65 }
66 EventLevel::Error => {
67 eprintln!("[harn] error: {}", event.message);
68 }
69 _ => {
70 eprintln!("[{level_str}] [{}] {}", event.category, event.message);
71 }
72 }
73 }
74
75 fn emit_span_start(&self, _event: &SpanEvent) {
76 }
78
79 fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
80}
81
82pub struct CollectorSink {
84 pub logs: RefCell<Vec<LogEvent>>,
85 pub spans: RefCell<Vec<SpanEvent>>,
86}
87
88impl CollectorSink {
89 pub fn new() -> Self {
90 Self {
91 logs: RefCell::new(Vec::new()),
92 spans: RefCell::new(Vec::new()),
93 }
94 }
95}
96
97impl Default for CollectorSink {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl EventSink for CollectorSink {
104 fn emit_log(&self, event: &LogEvent) {
105 self.logs.borrow_mut().push(event.clone());
106 }
107
108 fn emit_span_start(&self, event: &SpanEvent) {
109 self.spans.borrow_mut().push(event.clone());
110 }
111
112 fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
113}
114
115thread_local! {
116 static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
117}
118
119pub fn add_event_sink(sink: Rc<dyn EventSink>) {
121 EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
122}
123
124pub fn clear_event_sinks() {
126 EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
127}
128
129pub fn reset_event_sinks() {
131 EVENT_SINKS.with(|sinks| {
132 let mut s = sinks.borrow_mut();
133 s.clear();
134 s.push(Rc::new(StderrSink));
135 });
136}
137
138pub fn emit_log(
140 level: EventLevel,
141 category: &str,
142 message: &str,
143 metadata: BTreeMap<String, serde_json::Value>,
144) {
145 let event = LogEvent {
146 level,
147 category: category.to_string(),
148 message: message.to_string(),
149 metadata,
150 };
151 EVENT_SINKS.with(|sinks| {
152 for sink in sinks.borrow().iter() {
153 sink.emit_log(&event);
154 }
155 });
156}
157
158pub fn emit_span_start(
160 span_id: u64,
161 parent_id: Option<u64>,
162 name: &str,
163 kind: &str,
164 metadata: BTreeMap<String, serde_json::Value>,
165) {
166 let event = SpanEvent {
167 span_id,
168 parent_id,
169 name: name.to_string(),
170 kind: kind.to_string(),
171 metadata,
172 };
173 EVENT_SINKS.with(|sinks| {
174 for sink in sinks.borrow().iter() {
175 sink.emit_span_start(&event);
176 }
177 });
178}
179
180pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
182 EVENT_SINKS.with(|sinks| {
183 for sink in sinks.borrow().iter() {
184 sink.emit_span_end(span_id, &metadata);
185 }
186 });
187}
188
189pub fn log_info(category: &str, message: &str) {
191 emit_log(EventLevel::Info, category, message, BTreeMap::new());
192}
193
194pub fn log_warn(category: &str, message: &str) {
196 emit_log(EventLevel::Warn, category, message, BTreeMap::new());
197}
198
199pub fn log_error(category: &str, message: &str) {
201 emit_log(EventLevel::Error, category, message, BTreeMap::new());
202}
203
204pub fn log_debug(category: &str, message: &str) {
206 emit_log(EventLevel::Debug, category, message, BTreeMap::new());
207}
208
209pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
211 emit_log(EventLevel::Info, category, message, metadata);
212}
213
214pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
216 emit_log(EventLevel::Warn, category, message, metadata);
217}
218
219#[cfg(feature = "otel")]
225pub struct OtelSink {
226 provider: opentelemetry_sdk::trace::SdkTracerProvider,
227 active_spans:
228 std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
229}
230
231#[cfg(feature = "otel")]
232impl OtelSink {
233 pub fn new() -> Result<Self, String> {
241 use opentelemetry::global;
242 use opentelemetry_otlp::{
243 Protocol, SpanExporter, WithExportConfig as _, WithHttpConfig as _,
244 };
245 use opentelemetry_sdk::runtime;
246 use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
247 use opentelemetry_sdk::trace::SdkTracerProvider;
248 use opentelemetry_sdk::Resource;
249
250 let endpoint = otel_endpoint_from_env();
251 let headers = otel_headers_from_env();
252 let service_name = otel_service_name_from_env();
253
254 let http_client = reqwest::Client::builder()
261 .build()
262 .map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?;
263
264 let mut exporter_builder = SpanExporter::builder()
265 .with_http()
266 .with_http_client(http_client)
267 .with_protocol(Protocol::HttpJson)
268 .with_headers(headers);
269 if let Some(endpoint) = endpoint.as_deref() {
270 exporter_builder =
271 exporter_builder.with_endpoint(normalize_otlp_traces_endpoint(endpoint));
272 }
273 let exporter = exporter_builder
274 .build()
275 .map_err(|e| format!("OTel span exporter init failed: {e}"))?;
276
277 let provider = SdkTracerProvider::builder()
284 .with_resource(Resource::builder().with_service_name(service_name).build())
285 .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
286 .build();
287
288 global::set_tracer_provider(provider.clone());
289
290 Ok(Self {
291 provider,
292 active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
293 })
294 }
295}
296
297#[cfg(feature = "otel")]
304fn otel_endpoint_from_env() -> Option<String> {
305 for name in ["HARN_OTEL_ENDPOINT", "OTEL_EXPORTER_OTLP_ENDPOINT"] {
306 if let Ok(value) = std::env::var(name) {
307 let trimmed = value.trim();
308 if !trimmed.is_empty() {
309 return Some(trimmed.to_string());
310 }
311 }
312 }
313 None
314}
315
316#[cfg(feature = "otel")]
317fn otel_service_name_from_env() -> String {
318 for name in ["HARN_OTEL_SERVICE_NAME", "OTEL_SERVICE_NAME"] {
319 if let Ok(value) = std::env::var(name) {
320 let trimmed = value.trim();
321 if !trimmed.is_empty() {
322 return trimmed.to_string();
323 }
324 }
325 }
326 "harn".to_string()
327}
328
329#[cfg(feature = "otel")]
330fn otel_headers_from_env() -> std::collections::HashMap<String, String> {
331 let raw = std::env::var("HARN_OTEL_HEADERS")
332 .ok()
333 .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_HEADERS").ok())
334 .unwrap_or_default();
335 raw.split([',', '\n', ';'])
336 .map(str::trim)
337 .filter(|segment| !segment.is_empty())
338 .filter_map(|segment| {
339 let (name, value) = segment
340 .split_once('=')
341 .or_else(|| segment.split_once(':'))?;
342 let name = name.trim();
343 let value = value.trim();
344 if name.is_empty() || value.is_empty() {
345 return None;
346 }
347 Some((name.to_string(), value.to_string()))
348 })
349 .collect()
350}
351
352#[cfg(feature = "otel")]
353fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
354 let trimmed = endpoint.trim_end_matches('/');
355 if trimmed.ends_with("/v1/traces") {
356 trimmed.to_string()
357 } else {
358 format!("{trimmed}/v1/traces")
359 }
360}
361
362#[cfg(feature = "otel")]
368static OTEL_PROVIDER: std::sync::OnceLock<
369 std::sync::Mutex<Option<opentelemetry_sdk::trace::SdkTracerProvider>>,
370> = std::sync::OnceLock::new();
371
372#[cfg(feature = "otel")]
391pub fn install_otel_sink_from_env() -> Result<bool, String> {
392 if otel_endpoint_from_env().is_none() {
393 return Ok(false);
394 }
395 let provider_slot = OTEL_PROVIDER.get_or_init(|| std::sync::Mutex::new(None));
396 {
397 let guard = provider_slot.lock().expect("otel provider mutex poisoned");
398 if guard.is_some() {
399 return Ok(false);
402 }
403 }
404 let sink = OtelSink::new()?;
405 let provider = sink.provider.clone();
406 add_event_sink(Rc::new(sink));
407 provider_slot
408 .lock()
409 .expect("otel provider mutex poisoned")
410 .replace(provider);
411 Ok(true)
412}
413
414#[cfg(feature = "otel")]
426pub fn shutdown_otel_sink() -> Result<bool, String> {
427 let Some(slot) = OTEL_PROVIDER.get() else {
428 return Ok(false);
429 };
430 let provider = {
431 let mut guard = slot.lock().expect("otel provider mutex poisoned");
432 guard.take()
433 };
434 let Some(provider) = provider else {
435 return Ok(false);
436 };
437 provider
438 .force_flush()
439 .map_err(|error| format!("OTel force_flush failed: {error}"))?;
440 provider
441 .shutdown()
442 .map_err(|error| format!("OTel shutdown failed: {error}"))?;
443 Ok(true)
444}
445
446#[cfg(not(feature = "otel"))]
450pub fn install_otel_sink_from_env() -> Result<bool, String> {
451 Ok(false)
452}
453
454#[cfg(not(feature = "otel"))]
455pub fn shutdown_otel_sink() -> Result<bool, String> {
456 Ok(false)
457}
458
459#[cfg(feature = "otel")]
460impl EventSink for OtelSink {
461 fn emit_log(&self, event: &LogEvent) {
462 use opentelemetry::trace::{Tracer, TracerProvider};
463 let tracer = self.provider.tracer("harn");
464 let policy = crate::redact::current_policy();
470 let _span = tracer
472 .span_builder(format!("log.{}", event.category))
473 .with_attributes(vec![
474 opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
475 opentelemetry::KeyValue::new(
476 "message",
477 policy.redact_string(&event.message).into_owned(),
478 ),
479 opentelemetry::KeyValue::new("category", event.category.clone()),
480 ])
481 .start(&tracer);
482 }
483
484 fn emit_span_start(&self, event: &SpanEvent) {
485 use opentelemetry::trace::{Tracer, TracerProvider};
486 let tracer = self.provider.tracer("harn");
487 let span = tracer
488 .span_builder(event.name.clone())
489 .with_attributes(vec![
490 opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
491 opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
492 ])
493 .start(&tracer);
494 self.active_spans.borrow_mut().insert(event.span_id, span);
495 }
496
497 fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
498 use opentelemetry::trace::Span;
499 if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
500 let policy = crate::redact::current_policy();
506 for (key, value) in metadata {
507 let raw = format!("{value}");
508 let redacted = policy.redact_string(&raw).into_owned();
509 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), redacted));
510 }
511 span.end();
512 }
513 }
514}
515
516#[cfg(feature = "otel")]
517impl Drop for OtelSink {
518 fn drop(&mut self) {
519 self.active_spans.borrow_mut().clear();
521 let _ = self.provider.shutdown();
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528
529 #[test]
530 fn test_collector_sink_captures_logs() {
531 let sink = Rc::new(CollectorSink::new());
532 clear_event_sinks();
533 add_event_sink(sink.clone());
534
535 log_info("llm", "test message");
536 log_warn("llm.cost", "cost warning");
537 log_error("llm.agent", "agent error");
538
539 let logs = sink.logs.borrow();
540 assert_eq!(logs.len(), 3);
541 assert_eq!(logs[0].level, EventLevel::Info);
542 assert_eq!(logs[0].category, "llm");
543 assert_eq!(logs[0].message, "test message");
544 assert_eq!(logs[1].level, EventLevel::Warn);
545 assert_eq!(logs[2].level, EventLevel::Error);
546
547 reset_event_sinks();
549 }
550
551 #[test]
552 fn test_collector_sink_captures_spans() {
553 let sink = Rc::new(CollectorSink::new());
554 clear_event_sinks();
555 add_event_sink(sink.clone());
556
557 emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
558 emit_span_end(1, BTreeMap::new());
559
560 let spans = sink.spans.borrow();
561 assert_eq!(spans.len(), 1);
562 assert_eq!(spans[0].span_id, 1);
563 assert_eq!(spans[0].name, "agent_loop");
564
565 reset_event_sinks();
566 }
567
568 #[test]
569 fn test_stderr_sink_does_not_panic() {
570 let sink = StderrSink;
571 let event = LogEvent {
572 level: EventLevel::Warn,
573 category: "test".into(),
574 message: "hello".into(),
575 metadata: BTreeMap::new(),
576 };
577 sink.emit_log(&event);
578 sink.emit_span_start(&SpanEvent {
579 span_id: 1,
580 parent_id: None,
581 name: "x".into(),
582 kind: "y".into(),
583 metadata: BTreeMap::new(),
584 });
585 sink.emit_span_end(1, &BTreeMap::new());
586 }
587
588 #[test]
589 fn test_multiple_sinks() {
590 let a = Rc::new(CollectorSink::new());
591 let b = Rc::new(CollectorSink::new());
592 clear_event_sinks();
593 add_event_sink(a.clone());
594 add_event_sink(b.clone());
595
596 log_debug("x", "msg");
597
598 assert_eq!(a.logs.borrow().len(), 1);
599 assert_eq!(b.logs.borrow().len(), 1);
600
601 reset_event_sinks();
602 }
603
604 #[test]
605 fn test_log_with_metadata() {
606 let sink = Rc::new(CollectorSink::new());
607 clear_event_sinks();
608 add_event_sink(sink.clone());
609
610 let mut meta = BTreeMap::new();
611 meta.insert("tokens".into(), serde_json::json!(42));
612 log_info_meta("llm", "token usage", meta);
613
614 let logs = sink.logs.borrow();
615 assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
616
617 reset_event_sinks();
618 }
619
620 #[cfg(feature = "otel")]
621 mod otel_env {
622 use super::super::*;
623 use std::sync::{Mutex, MutexGuard, OnceLock};
624
625 fn lock() -> MutexGuard<'static, ()> {
629 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
630 LOCK.get_or_init(|| Mutex::new(()))
631 .lock()
632 .expect("otel env lock")
633 }
634
635 struct ScopedEnvVar {
639 key: &'static str,
640 previous: Option<String>,
641 }
642
643 impl ScopedEnvVar {
644 fn set(key: &'static str, value: &str) -> Self {
645 let previous = std::env::var(key).ok();
646 unsafe { std::env::set_var(key, value) };
650 Self { key, previous }
651 }
652
653 fn remove(key: &'static str) -> Self {
654 let previous = std::env::var(key).ok();
655 unsafe { std::env::remove_var(key) };
657 Self { key, previous }
658 }
659 }
660
661 impl Drop for ScopedEnvVar {
662 fn drop(&mut self) {
663 match &self.previous {
666 Some(value) => unsafe { std::env::set_var(self.key, value) },
667 None => unsafe { std::env::remove_var(self.key) },
668 }
669 }
670 }
671
672 #[test]
673 fn install_returns_false_when_endpoint_unset() {
674 let _guard = lock();
675 let _endpoint = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
676 let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
677
678 let installed = install_otel_sink_from_env()
679 .expect("install must not error when endpoint is unset");
680 assert!(!installed, "expected no sink registration without endpoint");
681 }
682
683 #[test]
684 fn endpoint_helper_prefers_harn_variable() {
685 let _guard = lock();
686 let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", "http://harn.example.test:4318");
687 let _standard = ScopedEnvVar::set(
688 "OTEL_EXPORTER_OTLP_ENDPOINT",
689 "http://generic.example.test:4318",
690 );
691
692 assert_eq!(
693 otel_endpoint_from_env().as_deref(),
694 Some("http://harn.example.test:4318"),
695 );
696 }
697
698 #[test]
699 fn endpoint_helper_falls_back_to_standard_variable() {
700 let _guard = lock();
701 let _harn = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
702 let _standard = ScopedEnvVar::set(
703 "OTEL_EXPORTER_OTLP_ENDPOINT",
704 "http://generic.example.test:4318",
705 );
706
707 assert_eq!(
708 otel_endpoint_from_env().as_deref(),
709 Some("http://generic.example.test:4318"),
710 );
711 }
712
713 #[test]
714 fn endpoint_helper_ignores_whitespace_only_values() {
715 let _guard = lock();
716 let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", " ");
717 let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
718
719 assert!(otel_endpoint_from_env().is_none());
720 }
721
722 #[test]
723 fn service_name_helper_layers_defaults() {
724 let _guard = lock();
725 let _harn = ScopedEnvVar::remove("HARN_OTEL_SERVICE_NAME");
726 let _standard = ScopedEnvVar::remove("OTEL_SERVICE_NAME");
727 assert_eq!(otel_service_name_from_env(), "harn");
728
729 let _standard = ScopedEnvVar::set("OTEL_SERVICE_NAME", "burin-code");
730 assert_eq!(otel_service_name_from_env(), "burin-code");
731
732 let _harn = ScopedEnvVar::set("HARN_OTEL_SERVICE_NAME", "burin-tui");
733 assert_eq!(otel_service_name_from_env(), "burin-tui");
734 }
735
736 #[test]
737 fn headers_helper_parses_comma_separated_pairs() {
738 let _guard = lock();
739 let _harn = ScopedEnvVar::set(
740 "HARN_OTEL_HEADERS",
741 "x-honeycomb-team=abc123, x-other=val ,blank=",
742 );
743
744 let headers = otel_headers_from_env();
745 assert_eq!(
746 headers.get("x-honeycomb-team").map(String::as_str),
747 Some("abc123"),
748 );
749 assert_eq!(headers.get("x-other").map(String::as_str), Some("val"));
750 assert!(
751 !headers.contains_key("blank"),
752 "empty values must be dropped to match the orchestrator helper",
753 );
754 }
755
756 #[test]
757 fn normalize_endpoint_appends_traces_path_when_missing() {
758 assert_eq!(
759 normalize_otlp_traces_endpoint("http://localhost:4318"),
760 "http://localhost:4318/v1/traces",
761 );
762 assert_eq!(
763 normalize_otlp_traces_endpoint("http://localhost:4318/"),
764 "http://localhost:4318/v1/traces",
765 );
766 assert_eq!(
767 normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces"),
768 "http://localhost:4318/v1/traces",
769 );
770 assert_eq!(
771 normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces/"),
772 "http://localhost:4318/v1/traces",
773 );
774 }
775 }
776
777 #[cfg(not(feature = "otel"))]
778 #[test]
779 fn install_otel_sink_returns_ok_false_on_non_otel_builds() {
780 let installed = install_otel_sink_from_env().expect("non-otel stub never errors");
781 assert!(!installed);
782 }
783}