use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EventLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
#[derive(Clone, Debug)]
pub struct LogEvent {
pub level: EventLevel,
pub category: String,
pub message: String,
pub metadata: BTreeMap<String, serde_json::Value>,
}
#[derive(Clone, Debug)]
pub struct SpanEvent {
pub span_id: u64,
pub parent_id: Option<u64>,
pub name: String,
pub kind: String,
pub metadata: BTreeMap<String, serde_json::Value>,
}
pub trait EventSink {
fn emit_log(&self, event: &LogEvent);
fn emit_span_start(&self, event: &SpanEvent);
fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
}
pub struct StderrSink;
impl EventSink for StderrSink {
fn emit_log(&self, event: &LogEvent) {
let level_str = match event.level {
EventLevel::Trace => "TRACE",
EventLevel::Debug => "DEBUG",
EventLevel::Info => "INFO",
EventLevel::Warn => "WARN",
EventLevel::Error => "ERROR",
};
match event.level {
EventLevel::Warn => {
eprintln!("[harn] warning: {}", event.message);
}
EventLevel::Error => {
eprintln!("[harn] error: {}", event.message);
}
_ => {
eprintln!("[{level_str}] [{}] {}", event.category, event.message);
}
}
}
fn emit_span_start(&self, _event: &SpanEvent) {
}
fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
}
pub struct CollectorSink {
pub logs: RefCell<Vec<LogEvent>>,
pub spans: RefCell<Vec<SpanEvent>>,
}
impl CollectorSink {
pub fn new() -> Self {
Self {
logs: RefCell::new(Vec::new()),
spans: RefCell::new(Vec::new()),
}
}
}
impl Default for CollectorSink {
fn default() -> Self {
Self::new()
}
}
impl EventSink for CollectorSink {
fn emit_log(&self, event: &LogEvent) {
self.logs.borrow_mut().push(event.clone());
}
fn emit_span_start(&self, event: &SpanEvent) {
self.spans.borrow_mut().push(event.clone());
}
fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
}
thread_local! {
static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
}
pub fn add_event_sink(sink: Rc<dyn EventSink>) {
EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
}
pub fn clear_event_sinks() {
EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
}
pub fn reset_event_sinks() {
EVENT_SINKS.with(|sinks| {
let mut s = sinks.borrow_mut();
s.clear();
s.push(Rc::new(StderrSink));
});
}
pub fn emit_log(
level: EventLevel,
category: &str,
message: &str,
metadata: BTreeMap<String, serde_json::Value>,
) {
let event = LogEvent {
level,
category: category.to_string(),
message: message.to_string(),
metadata,
};
EVENT_SINKS.with(|sinks| {
for sink in sinks.borrow().iter() {
sink.emit_log(&event);
}
});
}
pub fn emit_span_start(
span_id: u64,
parent_id: Option<u64>,
name: &str,
kind: &str,
metadata: BTreeMap<String, serde_json::Value>,
) {
let event = SpanEvent {
span_id,
parent_id,
name: name.to_string(),
kind: kind.to_string(),
metadata,
};
EVENT_SINKS.with(|sinks| {
for sink in sinks.borrow().iter() {
sink.emit_span_start(&event);
}
});
}
pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
EVENT_SINKS.with(|sinks| {
for sink in sinks.borrow().iter() {
sink.emit_span_end(span_id, &metadata);
}
});
}
pub fn log_info(category: &str, message: &str) {
emit_log(EventLevel::Info, category, message, BTreeMap::new());
}
pub fn log_warn(category: &str, message: &str) {
emit_log(EventLevel::Warn, category, message, BTreeMap::new());
}
pub fn log_error(category: &str, message: &str) {
emit_log(EventLevel::Error, category, message, BTreeMap::new());
}
pub fn log_debug(category: &str, message: &str) {
emit_log(EventLevel::Debug, category, message, BTreeMap::new());
}
pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
emit_log(EventLevel::Info, category, message, metadata);
}
pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
emit_log(EventLevel::Warn, category, message, metadata);
}
#[cfg(feature = "otel")]
pub struct OtelSink {
provider: opentelemetry_sdk::trace::SdkTracerProvider,
active_spans:
std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
}
#[cfg(feature = "otel")]
impl OtelSink {
pub fn new() -> Result<Self, String> {
use opentelemetry::global;
use opentelemetry_otlp::{
Protocol, SpanExporter, WithExportConfig as _, WithHttpConfig as _,
};
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::Resource;
let endpoint = otel_endpoint_from_env();
let headers = otel_headers_from_env();
let service_name = otel_service_name_from_env();
let http_client = reqwest::Client::builder()
.build()
.map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?;
let mut exporter_builder = SpanExporter::builder()
.with_http()
.with_http_client(http_client)
.with_protocol(Protocol::HttpJson)
.with_headers(headers);
if let Some(endpoint) = endpoint.as_deref() {
exporter_builder =
exporter_builder.with_endpoint(normalize_otlp_traces_endpoint(endpoint));
}
let exporter = exporter_builder
.build()
.map_err(|e| format!("OTel span exporter init failed: {e}"))?;
let provider = SdkTracerProvider::builder()
.with_resource(Resource::builder().with_service_name(service_name).build())
.with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
.build();
global::set_tracer_provider(provider.clone());
Ok(Self {
provider,
active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
})
}
}
#[cfg(feature = "otel")]
fn otel_endpoint_from_env() -> Option<String> {
for name in ["HARN_OTEL_ENDPOINT", "OTEL_EXPORTER_OTLP_ENDPOINT"] {
if let Ok(value) = std::env::var(name) {
let trimmed = value.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
}
None
}
#[cfg(feature = "otel")]
fn otel_service_name_from_env() -> String {
for name in ["HARN_OTEL_SERVICE_NAME", "OTEL_SERVICE_NAME"] {
if let Ok(value) = std::env::var(name) {
let trimmed = value.trim();
if !trimmed.is_empty() {
return trimmed.to_string();
}
}
}
"harn".to_string()
}
#[cfg(feature = "otel")]
fn otel_headers_from_env() -> std::collections::HashMap<String, String> {
let raw = std::env::var("HARN_OTEL_HEADERS")
.ok()
.or_else(|| std::env::var("OTEL_EXPORTER_OTLP_HEADERS").ok())
.unwrap_or_default();
raw.split([',', '\n', ';'])
.map(str::trim)
.filter(|segment| !segment.is_empty())
.filter_map(|segment| {
let (name, value) = segment
.split_once('=')
.or_else(|| segment.split_once(':'))?;
let name = name.trim();
let value = value.trim();
if name.is_empty() || value.is_empty() {
return None;
}
Some((name.to_string(), value.to_string()))
})
.collect()
}
#[cfg(feature = "otel")]
fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
let trimmed = endpoint.trim_end_matches('/');
if trimmed.ends_with("/v1/traces") {
trimmed.to_string()
} else {
format!("{trimmed}/v1/traces")
}
}
#[cfg(feature = "otel")]
static OTEL_PROVIDER: std::sync::OnceLock<
std::sync::Mutex<Option<opentelemetry_sdk::trace::SdkTracerProvider>>,
> = std::sync::OnceLock::new();
#[cfg(feature = "otel")]
pub fn install_otel_sink_from_env() -> Result<bool, String> {
if otel_endpoint_from_env().is_none() {
return Ok(false);
}
let provider_slot = OTEL_PROVIDER.get_or_init(|| std::sync::Mutex::new(None));
{
let guard = provider_slot.lock().expect("otel provider mutex poisoned");
if guard.is_some() {
return Ok(false);
}
}
let sink = OtelSink::new()?;
let provider = sink.provider.clone();
add_event_sink(Rc::new(sink));
provider_slot
.lock()
.expect("otel provider mutex poisoned")
.replace(provider);
Ok(true)
}
#[cfg(feature = "otel")]
pub fn shutdown_otel_sink() -> Result<bool, String> {
let Some(slot) = OTEL_PROVIDER.get() else {
return Ok(false);
};
let provider = {
let mut guard = slot.lock().expect("otel provider mutex poisoned");
guard.take()
};
let Some(provider) = provider else {
return Ok(false);
};
provider
.force_flush()
.map_err(|error| format!("OTel force_flush failed: {error}"))?;
provider
.shutdown()
.map_err(|error| format!("OTel shutdown failed: {error}"))?;
Ok(true)
}
#[cfg(not(feature = "otel"))]
pub fn install_otel_sink_from_env() -> Result<bool, String> {
Ok(false)
}
#[cfg(not(feature = "otel"))]
pub fn shutdown_otel_sink() -> Result<bool, String> {
Ok(false)
}
#[cfg(feature = "otel")]
impl EventSink for OtelSink {
fn emit_log(&self, event: &LogEvent) {
use opentelemetry::trace::{Tracer, TracerProvider};
let tracer = self.provider.tracer("harn");
let policy = crate::redact::current_policy();
let _span = tracer
.span_builder(format!("log.{}", event.category))
.with_attributes(vec![
opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
opentelemetry::KeyValue::new(
"message",
policy.redact_string(&event.message).into_owned(),
),
opentelemetry::KeyValue::new("category", event.category.clone()),
])
.start(&tracer);
}
fn emit_span_start(&self, event: &SpanEvent) {
use opentelemetry::trace::{Tracer, TracerProvider};
let tracer = self.provider.tracer("harn");
let span = tracer
.span_builder(event.name.clone())
.with_attributes(vec![
opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
])
.start(&tracer);
self.active_spans.borrow_mut().insert(event.span_id, span);
}
fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
use opentelemetry::trace::Span;
if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
let policy = crate::redact::current_policy();
for (key, value) in metadata {
let raw = format!("{value}");
let redacted = policy.redact_string(&raw).into_owned();
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), redacted));
}
span.end();
}
}
}
#[cfg(feature = "otel")]
impl Drop for OtelSink {
fn drop(&mut self) {
self.active_spans.borrow_mut().clear();
let _ = self.provider.shutdown();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_collector_sink_captures_logs() {
let sink = Rc::new(CollectorSink::new());
clear_event_sinks();
add_event_sink(sink.clone());
log_info("llm", "test message");
log_warn("llm.cost", "cost warning");
log_error("llm.agent", "agent error");
let logs = sink.logs.borrow();
assert_eq!(logs.len(), 3);
assert_eq!(logs[0].level, EventLevel::Info);
assert_eq!(logs[0].category, "llm");
assert_eq!(logs[0].message, "test message");
assert_eq!(logs[1].level, EventLevel::Warn);
assert_eq!(logs[2].level, EventLevel::Error);
reset_event_sinks();
}
#[test]
fn test_collector_sink_captures_spans() {
let sink = Rc::new(CollectorSink::new());
clear_event_sinks();
add_event_sink(sink.clone());
emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
emit_span_end(1, BTreeMap::new());
let spans = sink.spans.borrow();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].span_id, 1);
assert_eq!(spans[0].name, "agent_loop");
reset_event_sinks();
}
#[test]
fn test_stderr_sink_does_not_panic() {
let sink = StderrSink;
let event = LogEvent {
level: EventLevel::Warn,
category: "test".into(),
message: "hello".into(),
metadata: BTreeMap::new(),
};
sink.emit_log(&event);
sink.emit_span_start(&SpanEvent {
span_id: 1,
parent_id: None,
name: "x".into(),
kind: "y".into(),
metadata: BTreeMap::new(),
});
sink.emit_span_end(1, &BTreeMap::new());
}
#[test]
fn test_multiple_sinks() {
let a = Rc::new(CollectorSink::new());
let b = Rc::new(CollectorSink::new());
clear_event_sinks();
add_event_sink(a.clone());
add_event_sink(b.clone());
log_debug("x", "msg");
assert_eq!(a.logs.borrow().len(), 1);
assert_eq!(b.logs.borrow().len(), 1);
reset_event_sinks();
}
#[test]
fn test_log_with_metadata() {
let sink = Rc::new(CollectorSink::new());
clear_event_sinks();
add_event_sink(sink.clone());
let mut meta = BTreeMap::new();
meta.insert("tokens".into(), serde_json::json!(42));
log_info_meta("llm", "token usage", meta);
let logs = sink.logs.borrow();
assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
reset_event_sinks();
}
#[cfg(feature = "otel")]
mod otel_env {
use super::super::*;
use std::sync::{Mutex, MutexGuard, OnceLock};
fn lock() -> MutexGuard<'static, ()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
.lock()
.expect("otel env lock")
}
struct ScopedEnvVar {
key: &'static str,
previous: Option<String>,
}
impl ScopedEnvVar {
fn set(key: &'static str, value: &str) -> Self {
let previous = std::env::var(key).ok();
unsafe { std::env::set_var(key, value) };
Self { key, previous }
}
fn remove(key: &'static str) -> Self {
let previous = std::env::var(key).ok();
unsafe { std::env::remove_var(key) };
Self { key, previous }
}
}
impl Drop for ScopedEnvVar {
fn drop(&mut self) {
match &self.previous {
Some(value) => unsafe { std::env::set_var(self.key, value) },
None => unsafe { std::env::remove_var(self.key) },
}
}
}
#[test]
fn install_returns_false_when_endpoint_unset() {
let _guard = lock();
let _endpoint = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
let installed = install_otel_sink_from_env()
.expect("install must not error when endpoint is unset");
assert!(!installed, "expected no sink registration without endpoint");
}
#[test]
fn endpoint_helper_prefers_harn_variable() {
let _guard = lock();
let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", "http://harn.example.test:4318");
let _standard = ScopedEnvVar::set(
"OTEL_EXPORTER_OTLP_ENDPOINT",
"http://generic.example.test:4318",
);
assert_eq!(
otel_endpoint_from_env().as_deref(),
Some("http://harn.example.test:4318"),
);
}
#[test]
fn endpoint_helper_falls_back_to_standard_variable() {
let _guard = lock();
let _harn = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
let _standard = ScopedEnvVar::set(
"OTEL_EXPORTER_OTLP_ENDPOINT",
"http://generic.example.test:4318",
);
assert_eq!(
otel_endpoint_from_env().as_deref(),
Some("http://generic.example.test:4318"),
);
}
#[test]
fn endpoint_helper_ignores_whitespace_only_values() {
let _guard = lock();
let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", " ");
let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
assert!(otel_endpoint_from_env().is_none());
}
#[test]
fn service_name_helper_layers_defaults() {
let _guard = lock();
let _harn = ScopedEnvVar::remove("HARN_OTEL_SERVICE_NAME");
let _standard = ScopedEnvVar::remove("OTEL_SERVICE_NAME");
assert_eq!(otel_service_name_from_env(), "harn");
let _standard = ScopedEnvVar::set("OTEL_SERVICE_NAME", "burin-code");
assert_eq!(otel_service_name_from_env(), "burin-code");
let _harn = ScopedEnvVar::set("HARN_OTEL_SERVICE_NAME", "burin-tui");
assert_eq!(otel_service_name_from_env(), "burin-tui");
}
#[test]
fn headers_helper_parses_comma_separated_pairs() {
let _guard = lock();
let _harn = ScopedEnvVar::set(
"HARN_OTEL_HEADERS",
"x-honeycomb-team=abc123, x-other=val ,blank=",
);
let headers = otel_headers_from_env();
assert_eq!(
headers.get("x-honeycomb-team").map(String::as_str),
Some("abc123"),
);
assert_eq!(headers.get("x-other").map(String::as_str), Some("val"));
assert!(
!headers.contains_key("blank"),
"empty values must be dropped to match the orchestrator helper",
);
}
#[test]
fn normalize_endpoint_appends_traces_path_when_missing() {
assert_eq!(
normalize_otlp_traces_endpoint("http://localhost:4318"),
"http://localhost:4318/v1/traces",
);
assert_eq!(
normalize_otlp_traces_endpoint("http://localhost:4318/"),
"http://localhost:4318/v1/traces",
);
assert_eq!(
normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces"),
"http://localhost:4318/v1/traces",
);
assert_eq!(
normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces/"),
"http://localhost:4318/v1/traces",
);
}
}
#[cfg(not(feature = "otel"))]
#[test]
fn install_otel_sink_returns_ok_false_on_non_otel_builds() {
let installed = install_otel_sink_from_env().expect("non-otel stub never errors");
assert!(!installed);
}
}