Skip to main content

harn_vm/
events.rs

1//! Structured event emission for observability.
2//!
3//! Provides an `EventSink` trait and a thread-local sink registry so that the
4//! VM (and especially the LLM layer) can emit structured log and span events
5//! instead of raw `eprintln!` calls. Consumers register one or more sinks.
6
7use std::cell::RefCell;
8use std::collections::BTreeMap;
9use std::rc::Rc;
10
11/// Severity level for log events.
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13pub enum EventLevel {
14    Trace,
15    Debug,
16    Info,
17    Warn,
18    Error,
19}
20
21/// A structured log event.
22#[derive(Clone, Debug)]
23pub struct LogEvent {
24    pub level: EventLevel,
25    pub category: String,
26    pub message: String,
27    pub metadata: BTreeMap<String, serde_json::Value>,
28}
29
30/// A structured span event (start or end).
31#[derive(Clone, Debug)]
32pub struct SpanEvent {
33    pub span_id: u64,
34    pub parent_id: Option<u64>,
35    pub name: String,
36    pub kind: String,
37    pub metadata: BTreeMap<String, serde_json::Value>,
38}
39
40/// Trait for receiving structured events from the VM.
41pub trait EventSink {
42    fn emit_log(&self, event: &LogEvent);
43    fn emit_span_start(&self, event: &SpanEvent);
44    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
45}
46
47/// Default sink that writes formatted output to stderr.
48pub struct StderrSink;
49
50impl EventSink for StderrSink {
51    fn emit_log(&self, event: &LogEvent) {
52        if !stderr_level_enabled(event.level) {
53            return;
54        }
55        let level_str = match event.level {
56            EventLevel::Trace => "TRACE",
57            EventLevel::Debug => "DEBUG",
58            EventLevel::Info => "INFO",
59            EventLevel::Warn => "WARN",
60            EventLevel::Error => "ERROR",
61        };
62        // "[harn]" prefix for warn/error is relied on by downstream
63        // tooling and tests that parse stderr.
64        match event.level {
65            EventLevel::Warn => {
66                eprintln!("[harn] warning: {}", event.message);
67            }
68            EventLevel::Error => {
69                eprintln!("[harn] error: {}", event.message);
70            }
71            _ => {
72                eprintln!("[{level_str}] [{}] {}", event.category, event.message);
73            }
74        }
75    }
76
77    fn emit_span_start(&self, _event: &SpanEvent) {
78        // Silent by default — spans are for observability backends.
79    }
80
81    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
82}
83
84fn stderr_level_enabled(level: EventLevel) -> bool {
85    let threshold = std::env::var("HARN_LOG_LEVEL")
86        .ok()
87        .map(|value| value.trim().to_ascii_lowercase())
88        .unwrap_or_else(|| "info".to_string());
89    let min_level = match threshold.as_str() {
90        "trace" => EventLevel::Trace,
91        "debug" => EventLevel::Debug,
92        "info" | "" => EventLevel::Info,
93        "warn" | "warning" => EventLevel::Warn,
94        "error" => EventLevel::Error,
95        "off" | "none" | "silent" => return false,
96        _ => EventLevel::Info,
97    };
98    event_level_rank(level) >= event_level_rank(min_level)
99}
100
101fn event_level_rank(level: EventLevel) -> u8 {
102    match level {
103        EventLevel::Trace => 0,
104        EventLevel::Debug => 1,
105        EventLevel::Info => 2,
106        EventLevel::Warn => 3,
107        EventLevel::Error => 4,
108    }
109}
110
111/// A sink that collects events for later retrieval (testing, inspection).
112pub struct CollectorSink {
113    pub logs: RefCell<Vec<LogEvent>>,
114    pub spans: RefCell<Vec<SpanEvent>>,
115}
116
117impl CollectorSink {
118    pub fn new() -> Self {
119        Self {
120            logs: RefCell::new(Vec::new()),
121            spans: RefCell::new(Vec::new()),
122        }
123    }
124}
125
126impl Default for CollectorSink {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132impl EventSink for CollectorSink {
133    fn emit_log(&self, event: &LogEvent) {
134        self.logs.borrow_mut().push(event.clone());
135    }
136
137    fn emit_span_start(&self, event: &SpanEvent) {
138        self.spans.borrow_mut().push(event.clone());
139    }
140
141    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
142}
143
144thread_local! {
145    static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
146}
147
148/// Register an additional event sink.
149pub fn add_event_sink(sink: Rc<dyn EventSink>) {
150    EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
151}
152
153/// Remove all sinks (including the default `StderrSink`).
154pub fn clear_event_sinks() {
155    EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
156}
157
158/// Reset sinks to just the default `StderrSink`.
159pub fn reset_event_sinks() {
160    EVENT_SINKS.with(|sinks| {
161        let mut s = sinks.borrow_mut();
162        s.clear();
163        s.push(Rc::new(StderrSink));
164    });
165}
166
167/// Emit a structured log event to all registered sinks.
168pub fn emit_log(
169    level: EventLevel,
170    category: &str,
171    message: &str,
172    metadata: BTreeMap<String, serde_json::Value>,
173) {
174    let event = LogEvent {
175        level,
176        category: category.to_string(),
177        message: message.to_string(),
178        metadata,
179    };
180    EVENT_SINKS.with(|sinks| {
181        for sink in sinks.borrow().iter() {
182            sink.emit_log(&event);
183        }
184    });
185}
186
187/// Emit a span-start event to all registered sinks.
188pub fn emit_span_start(
189    span_id: u64,
190    parent_id: Option<u64>,
191    name: &str,
192    kind: &str,
193    metadata: BTreeMap<String, serde_json::Value>,
194) {
195    let event = SpanEvent {
196        span_id,
197        parent_id,
198        name: name.to_string(),
199        kind: kind.to_string(),
200        metadata,
201    };
202    EVENT_SINKS.with(|sinks| {
203        for sink in sinks.borrow().iter() {
204            sink.emit_span_start(&event);
205        }
206    });
207}
208
209/// Emit a span-end event to all registered sinks.
210pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
211    EVENT_SINKS.with(|sinks| {
212        for sink in sinks.borrow().iter() {
213            sink.emit_span_end(span_id, &metadata);
214        }
215    });
216}
217
218/// Log at Info level with no metadata.
219pub fn log_info(category: &str, message: &str) {
220    emit_log(EventLevel::Info, category, message, BTreeMap::new());
221}
222
223/// Log at Warn level with no metadata.
224pub fn log_warn(category: &str, message: &str) {
225    emit_log(EventLevel::Warn, category, message, BTreeMap::new());
226}
227
228/// Log at Error level with no metadata.
229pub fn log_error(category: &str, message: &str) {
230    emit_log(EventLevel::Error, category, message, BTreeMap::new());
231}
232
233/// Log at Debug level with no metadata.
234pub fn log_debug(category: &str, message: &str) {
235    emit_log(EventLevel::Debug, category, message, BTreeMap::new());
236}
237
238/// Log at Info level with metadata.
239pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
240    emit_log(EventLevel::Info, category, message, metadata);
241}
242
243/// Log at Debug level with metadata.
244pub fn log_debug_meta(
245    category: &str,
246    message: &str,
247    metadata: BTreeMap<String, serde_json::Value>,
248) {
249    emit_log(EventLevel::Debug, category, message, metadata);
250}
251
252/// Log at Warn level with metadata.
253pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
254    emit_log(EventLevel::Warn, category, message, metadata);
255}
256
257/// OpenTelemetry exporter sink. Requires the `otel` feature flag.
258/// Forwards Harn log events and span lifecycle to OTLP collectors.
259///
260/// Active spans are stored keyed by Harn's `span_id` so that
261/// `emit_span_end` can close the correct OTel span.
262#[cfg(feature = "otel")]
263pub struct OtelSink {
264    provider: opentelemetry_sdk::trace::SdkTracerProvider,
265    active_spans:
266        std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
267}
268
269#[cfg(feature = "otel")]
270impl OtelSink {
271    /// Create a new OTel sink. Reads OTLP configuration from the
272    /// environment (endpoint, service name, headers). Errors when the
273    /// span exporter fails to initialise — a missing endpoint is **not**
274    /// an error; the exporter falls back to OpenTelemetry's default
275    /// (`http://localhost:4318/v1/traces`). Callers that want
276    /// presence-of-endpoint to gate registration should use
277    /// [`install_otel_sink_from_env`].
278    pub fn new() -> Result<Self, String> {
279        use opentelemetry::global;
280        use opentelemetry_otlp::{
281            Protocol, SpanExporter, WithExportConfig as _, WithHttpConfig as _,
282        };
283        use opentelemetry_sdk::runtime;
284        use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
285        use opentelemetry_sdk::trace::SdkTracerProvider;
286        use opentelemetry_sdk::Resource;
287
288        let endpoint = otel_endpoint_from_env();
289        let headers = otel_headers_from_env();
290        let service_name = otel_service_name_from_env();
291
292        let mut exporter_builder = SpanExporter::builder()
293            .with_http()
294            .with_protocol(Protocol::HttpJson)
295            .with_headers(headers);
296        if let Some(endpoint) = endpoint.as_deref() {
297            exporter_builder =
298                exporter_builder.with_endpoint(normalize_otlp_traces_endpoint(endpoint));
299        }
300        let exporter = exporter_builder
301            .build()
302            .map_err(|e| format!("OTel span exporter init failed: {e}"))?;
303
304        // Drive the batch processor on the current Tokio runtime so
305        // the exporter's reqwest client can reach the network. The
306        // default SDK processor spawns its own thread, which has no
307        // Tokio reactor and panics on the first send — we need the
308        // async-runtime variant for the same reason the orchestrator
309        // path uses it.
310        let provider = SdkTracerProvider::builder()
311            .with_resource(Resource::builder().with_service_name(service_name).build())
312            .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
313            .build();
314
315        global::set_tracer_provider(provider.clone());
316
317        Ok(Self {
318            provider,
319            active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
320        })
321    }
322}
323
324/// The host spawns a fresh `harn` child per session, so the only
325/// reliable way to opt into local trace export is via environment
326/// variables read at startup. Prefer the Harn-specific variable so a
327/// caller that points an unrelated process at an OTLP collector via
328/// the shared OpenTelemetry variable doesn't accidentally enable Harn
329/// emission too.
330#[cfg(feature = "otel")]
331fn otel_endpoint_from_env() -> Option<String> {
332    for name in ["HARN_OTEL_ENDPOINT", "OTEL_EXPORTER_OTLP_ENDPOINT"] {
333        if let Ok(value) = std::env::var(name) {
334            let trimmed = value.trim();
335            if !trimmed.is_empty() {
336                return Some(trimmed.to_string());
337            }
338        }
339    }
340    None
341}
342
343#[cfg(feature = "otel")]
344fn otel_service_name_from_env() -> String {
345    for name in ["HARN_OTEL_SERVICE_NAME", "OTEL_SERVICE_NAME"] {
346        if let Ok(value) = std::env::var(name) {
347            let trimmed = value.trim();
348            if !trimmed.is_empty() {
349                return trimmed.to_string();
350            }
351        }
352    }
353    "harn".to_string()
354}
355
356#[cfg(feature = "otel")]
357fn otel_headers_from_env() -> std::collections::HashMap<String, String> {
358    let raw = std::env::var("HARN_OTEL_HEADERS")
359        .ok()
360        .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_HEADERS").ok())
361        .unwrap_or_default();
362    raw.split([',', '\n', ';'])
363        .map(str::trim)
364        .filter(|segment| !segment.is_empty())
365        .filter_map(|segment| {
366            let (name, value) = segment
367                .split_once('=')
368                .or_else(|| segment.split_once(':'))?;
369            let name = name.trim();
370            let value = value.trim();
371            if name.is_empty() || value.is_empty() {
372                return None;
373            }
374            Some((name.to_string(), value.to_string()))
375        })
376        .collect()
377}
378
379#[cfg(feature = "otel")]
380fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
381    let trimmed = endpoint.trim_end_matches('/');
382    if trimmed.ends_with("/v1/traces") {
383        trimmed.to_string()
384    } else {
385        format!("{trimmed}/v1/traces")
386    }
387}
388
389/// Stable OTel span-end attribute keys exported as top-level attributes.
390///
391/// Keep this list narrow: span metadata can be script-controlled, and OTel
392/// backends charge or index by attribute key. Runtime code that needs another
393/// top-level key should add an exact `harn.*` key here rather than letting
394/// arbitrary metadata become a new exported attribute name.
395#[cfg(feature = "otel")]
396const ALLOWED_SPAN_ATTR_KEYS: &[&str] = &[
397    "harn.duration_ms",
398    "harn.error",
399    "harn.error.kind",
400    "harn.kind",
401    "harn.span_id",
402    "harn.status",
403];
404
405/// Runtime-owned low-cardinality attribute namespaces.
406///
407/// These prefixes are for stable schema families, not dynamic suffixes such as
408/// run ids, file paths, or UUIDs. Metadata outside this exact-key/prefix
409/// allowlist is folded into `harn.meta_json` so OTel sees one bounded key.
410#[cfg(feature = "otel")]
411const ALLOWED_SPAN_ATTR_PREFIXES: &[&str] = &[
412    "harn.cost.",
413    "harn.lifecycle.",
414    "harn.llm.",
415    "harn.step.",
416    "harn.timing.",
417    "harn.token.",
418    "harn.tool.",
419    "harn.worker.",
420];
421
422#[cfg(feature = "otel")]
423fn is_low_cardinality_attr_key(key: &str) -> bool {
424    ALLOWED_SPAN_ATTR_KEYS.contains(&key)
425        || ALLOWED_SPAN_ATTR_PREFIXES
426            .iter()
427            .any(|prefix| key.starts_with(prefix))
428}
429
430#[cfg(feature = "otel")]
431fn otel_span_end_attributes(
432    metadata: &BTreeMap<String, serde_json::Value>,
433) -> Vec<(String, String)> {
434    let policy = crate::redact::current_policy();
435    let mut attributes = Vec::new();
436    let mut meta_json = BTreeMap::new();
437
438    for (key, value) in metadata {
439        if is_low_cardinality_attr_key(key) {
440            let raw = format!("{value}");
441            let redacted = policy.redact_string(&raw).into_owned();
442            attributes.push((key.clone(), redacted));
443        } else {
444            meta_json.insert(key.clone(), value.clone());
445        }
446    }
447
448    if !meta_json.is_empty() {
449        let raw = serde_json::to_string(&meta_json).unwrap_or_else(|_| "{}".to_string());
450        let redacted = policy.redact_string(&raw).into_owned();
451        attributes.push(("harn.meta_json".to_string(), redacted));
452    }
453
454    attributes
455}
456
457/// Idempotency guard for [`install_otel_sink_from_env`]. The first
458/// caller wins; later ones become no-ops returning `Ok(false)`. The
459/// stored provider keeps the batch processor's runtime alive — and
460/// gives [`shutdown_otel_sink`] a handle to flush before the host
461/// tokio runtime exits.
462#[cfg(feature = "otel")]
463static OTEL_PROVIDER: std::sync::OnceLock<
464    std::sync::Mutex<Option<opentelemetry_sdk::trace::SdkTracerProvider>>,
465> = std::sync::OnceLock::new();
466
467/// Register an [`OtelSink`] into the thread-local event sink chain when
468/// the environment is configured for OTLP export.
469///
470/// Returns `Ok(true)` when a sink was installed, `Ok(false)` when no
471/// OTLP endpoint is configured (or when a sink has already been
472/// installed for this process), and `Err` when the exporter failed to
473/// initialise.
474///
475/// The exporter activates iff at least one of `HARN_OTEL_ENDPOINT` or
476/// the standard `OTEL_EXPORTER_OTLP_ENDPOINT` is non-empty. Service
477/// name comes from `HARN_OTEL_SERVICE_NAME` → `OTEL_SERVICE_NAME` →
478/// `"harn"` (in that order). Headers come from `HARN_OTEL_HEADERS` →
479/// `OTEL_EXPORTER_OTLP_HEADERS` (comma/semicolon-separated
480/// `name=value` pairs).
481///
482/// Hosts (`harn run`, `harn serve acp`, and other embedders) should
483/// call this once near process startup so any spans emitted during the
484/// session land at the configured collector.
485#[cfg(feature = "otel")]
486pub fn install_otel_sink_from_env() -> Result<bool, String> {
487    if otel_endpoint_from_env().is_none() {
488        return Ok(false);
489    }
490    let provider_slot = OTEL_PROVIDER.get_or_init(|| std::sync::Mutex::new(None));
491    {
492        let guard = provider_slot.lock().expect("otel provider mutex poisoned");
493        if guard.is_some() {
494            // A sink was already installed in this process. Don't
495            // double-register; the existing one will keep emitting.
496            return Ok(false);
497        }
498    }
499    let sink = OtelSink::new()?;
500    let provider = sink.provider.clone();
501    add_event_sink(Rc::new(sink));
502    provider_slot
503        .lock()
504        .expect("otel provider mutex poisoned")
505        .replace(provider);
506    Ok(true)
507}
508
509/// Flush and tear down the auto-registered OTel sink. Hosts that
510/// shut down their tokio runtime before process exit must call this
511/// while the runtime is still alive — `BatchSpanProcessor` needs a
512/// reactor to drain queued exports, and the [`Drop`] impl on
513/// [`OtelSink`] otherwise runs after the runtime is gone. Safe to
514/// call when no sink was installed.
515///
516/// Returns `Ok(true)` when a provider was flushed, `Ok(false)` when
517/// none was installed, and `Err` when the SDK reported an export or
518/// shutdown error. Errors are advisory — long-running hosts should
519/// log and continue.
520#[cfg(feature = "otel")]
521pub fn shutdown_otel_sink() -> Result<bool, String> {
522    let Some(slot) = OTEL_PROVIDER.get() else {
523        return Ok(false);
524    };
525    let provider = {
526        let mut guard = slot.lock().expect("otel provider mutex poisoned");
527        guard.take()
528    };
529    let Some(provider) = provider else {
530        return Ok(false);
531    };
532    provider
533        .force_flush()
534        .map_err(|error| format!("OTel force_flush failed: {error}"))?;
535    provider
536        .shutdown()
537        .map_err(|error| format!("OTel shutdown failed: {error}"))?;
538    Ok(true)
539}
540
541/// No-op stub for builds compiled without the `otel` feature. Returns
542/// `Ok(false)` so call sites can use the same code path on either
543/// build.
544#[cfg(not(feature = "otel"))]
545pub fn install_otel_sink_from_env() -> Result<bool, String> {
546    Ok(false)
547}
548
549#[cfg(not(feature = "otel"))]
550pub fn shutdown_otel_sink() -> Result<bool, String> {
551    Ok(false)
552}
553
554#[cfg(feature = "otel")]
555impl EventSink for OtelSink {
556    fn emit_log(&self, event: &LogEvent) {
557        use opentelemetry::trace::{Tracer, TracerProvider};
558        let tracer = self.provider.tracer("harn");
559        // Apply the unified redaction policy to attribute values
560        // before they leave the process. The active policy includes
561        // the OAuth token catalog (HARN-OAU-001) so a Bearer header
562        // or provider token that snuck into a log message gets
563        // scrubbed before it lands at the OTel collector.
564        let policy = crate::redact::current_policy();
565        // Log events are zero-duration spans — start and immediately drop.
566        let _span = tracer
567            .span_builder(format!("log.{}", event.category))
568            .with_attributes(vec![
569                opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
570                opentelemetry::KeyValue::new(
571                    "message",
572                    policy.redact_string(&event.message).into_owned(),
573                ),
574                opentelemetry::KeyValue::new("category", event.category.clone()),
575            ])
576            .start(&tracer);
577    }
578
579    fn emit_span_start(&self, event: &SpanEvent) {
580        use opentelemetry::trace::{Tracer, TracerProvider};
581        let tracer = self.provider.tracer("harn");
582        let span = tracer
583            .span_builder(event.name.clone())
584            .with_attributes(vec![
585                opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
586                opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
587            ])
588            .start(&tracer);
589        self.active_spans.borrow_mut().insert(event.span_id, span);
590    }
591
592    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
593        use opentelemetry::trace::Span;
594        if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
595            // OTel span attributes are the fourth sink covered by the
596            // OA-06 token-redaction policy (transcripts, audit
597            // receipts, OTel, and system reminders). The helper also
598            // bounds attribute-key cardinality by folding unknown
599            // metadata into `harn.meta_json`.
600            for (key, redacted) in otel_span_end_attributes(metadata) {
601                span.set_attribute(opentelemetry::KeyValue::new(key.clone(), redacted));
602            }
603            span.end();
604        }
605    }
606}
607
608#[cfg(feature = "otel")]
609impl Drop for OtelSink {
610    fn drop(&mut self) {
611        // End any spans that were never closed (abnormal shutdown).
612        self.active_spans.borrow_mut().clear();
613        let _ = self.provider.shutdown();
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620
621    #[test]
622    fn test_collector_sink_captures_logs() {
623        let sink = Rc::new(CollectorSink::new());
624        clear_event_sinks();
625        add_event_sink(sink.clone());
626
627        log_info("llm", "test message");
628        log_warn("llm.cost", "cost warning");
629        log_error("llm.agent", "agent error");
630
631        let logs = sink.logs.borrow();
632        assert_eq!(logs.len(), 3);
633        assert_eq!(logs[0].level, EventLevel::Info);
634        assert_eq!(logs[0].category, "llm");
635        assert_eq!(logs[0].message, "test message");
636        assert_eq!(logs[1].level, EventLevel::Warn);
637        assert_eq!(logs[2].level, EventLevel::Error);
638
639        // Restore default sinks for other tests.
640        reset_event_sinks();
641    }
642
643    #[test]
644    fn test_collector_sink_captures_spans() {
645        let sink = Rc::new(CollectorSink::new());
646        clear_event_sinks();
647        add_event_sink(sink.clone());
648
649        emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
650        emit_span_end(1, BTreeMap::new());
651
652        let spans = sink.spans.borrow();
653        assert_eq!(spans.len(), 1);
654        assert_eq!(spans[0].span_id, 1);
655        assert_eq!(spans[0].name, "agent_loop");
656
657        reset_event_sinks();
658    }
659
660    #[test]
661    fn test_stderr_sink_does_not_panic() {
662        let sink = StderrSink;
663        let event = LogEvent {
664            level: EventLevel::Warn,
665            category: "test".into(),
666            message: "hello".into(),
667            metadata: BTreeMap::new(),
668        };
669        sink.emit_log(&event);
670        sink.emit_span_start(&SpanEvent {
671            span_id: 1,
672            parent_id: None,
673            name: "x".into(),
674            kind: "y".into(),
675            metadata: BTreeMap::new(),
676        });
677        sink.emit_span_end(1, &BTreeMap::new());
678    }
679
680    #[test]
681    fn test_multiple_sinks() {
682        let a = Rc::new(CollectorSink::new());
683        let b = Rc::new(CollectorSink::new());
684        clear_event_sinks();
685        add_event_sink(a.clone());
686        add_event_sink(b.clone());
687
688        log_debug("x", "msg");
689
690        assert_eq!(a.logs.borrow().len(), 1);
691        assert_eq!(b.logs.borrow().len(), 1);
692
693        reset_event_sinks();
694    }
695
696    #[test]
697    fn test_log_with_metadata() {
698        let sink = Rc::new(CollectorSink::new());
699        clear_event_sinks();
700        add_event_sink(sink.clone());
701
702        let mut meta = BTreeMap::new();
703        meta.insert("tokens".into(), serde_json::json!(42));
704        log_info_meta("llm", "token usage", meta);
705
706        let logs = sink.logs.borrow();
707        assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
708
709        reset_event_sinks();
710    }
711
712    #[cfg(feature = "otel")]
713    #[derive(Default)]
714    struct SpanAttrCollectorSink {
715        attrs: RefCell<Vec<(String, String)>>,
716    }
717
718    #[cfg(feature = "otel")]
719    impl EventSink for SpanAttrCollectorSink {
720        fn emit_log(&self, _event: &LogEvent) {}
721
722        fn emit_span_start(&self, _event: &SpanEvent) {}
723
724        fn emit_span_end(&self, _span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
725            self.attrs
726                .borrow_mut()
727                .extend(otel_span_end_attributes(metadata));
728        }
729    }
730
731    #[cfg(feature = "otel")]
732    #[test]
733    fn span_attr_keys_are_low_cardinality() {
734        let sink = Rc::new(SpanAttrCollectorSink::default());
735        clear_event_sinks();
736        add_event_sink(sink.clone());
737
738        let rogue_key = "request.550e8400-e29b-41d4-a716-446655440000";
739        let mut metadata = BTreeMap::new();
740        metadata.insert("harn.kind".to_string(), serde_json::json!("llm_call"));
741        metadata.insert(rogue_key.to_string(), serde_json::json!("rogue-value"));
742
743        emit_span_end(42, metadata);
744        reset_event_sinks();
745
746        let attrs = sink.attrs.borrow();
747        assert!(
748            attrs
749                .iter()
750                .any(|(key, value)| key == "harn.kind" && value.contains("llm_call")),
751            "allowlisted harn.kind should remain a top-level OTel attribute: {attrs:?}",
752        );
753        assert!(
754            !attrs.iter().any(|(key, _)| key == rogue_key),
755            "rogue metadata key must not become a top-level OTel attribute: {attrs:?}",
756        );
757        let (_, meta_json) = attrs
758            .iter()
759            .find(|(key, _)| key == "harn.meta_json")
760            .expect("rogue metadata should be folded into harn.meta_json");
761        let blob: serde_json::Value =
762            serde_json::from_str(meta_json).expect("harn.meta_json should stay JSON");
763        assert_eq!(blob[rogue_key], serde_json::json!("rogue-value"));
764    }
765
766    #[cfg(feature = "otel")]
767    mod otel_env {
768        use super::super::*;
769        use std::sync::{Mutex, MutexGuard, OnceLock};
770
771        /// Serializes env-mutating tests in this module. Crate-wide
772        /// `crate::llm::env_lock()` is reserved for LLM env scopes; a
773        /// dedicated lock here keeps these tests independent.
774        fn lock() -> MutexGuard<'static, ()> {
775            static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
776            LOCK.get_or_init(|| Mutex::new(()))
777                .lock()
778                .expect("otel env lock")
779        }
780
781        /// RAII guard for a single env var. Saves the prior value on
782        /// construction and restores it on Drop so parallel tests in
783        /// the same process don't leak state.
784        struct ScopedEnvVar {
785            key: &'static str,
786            previous: Option<String>,
787        }
788
789        impl ScopedEnvVar {
790            fn set(key: &'static str, value: &str) -> Self {
791                let previous = std::env::var(key).ok();
792                // SAFETY: env mutation is serialized by the test-level
793                // `lock()` above; no other thread inspects these
794                // variables while a guard is alive.
795                unsafe { std::env::set_var(key, value) };
796                Self { key, previous }
797            }
798
799            fn remove(key: &'static str) -> Self {
800                let previous = std::env::var(key).ok();
801                // SAFETY: see `set` above.
802                unsafe { std::env::remove_var(key) };
803                Self { key, previous }
804            }
805        }
806
807        impl Drop for ScopedEnvVar {
808            fn drop(&mut self) {
809                // SAFETY: see `set` above. Restoration happens while the
810                // test still holds the module lock.
811                match &self.previous {
812                    Some(value) => unsafe { std::env::set_var(self.key, value) },
813                    None => unsafe { std::env::remove_var(self.key) },
814                }
815            }
816        }
817
818        #[test]
819        fn install_returns_false_when_endpoint_unset() {
820            let _guard = lock();
821            let _endpoint = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
822            let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
823
824            let installed = install_otel_sink_from_env()
825                .expect("install must not error when endpoint is unset");
826            assert!(!installed, "expected no sink registration without endpoint");
827        }
828
829        #[test]
830        fn endpoint_helper_prefers_harn_variable() {
831            let _guard = lock();
832            let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", "http://harn.example.test:4318");
833            let _standard = ScopedEnvVar::set(
834                "OTEL_EXPORTER_OTLP_ENDPOINT",
835                "http://generic.example.test:4318",
836            );
837
838            assert_eq!(
839                otel_endpoint_from_env().as_deref(),
840                Some("http://harn.example.test:4318"),
841            );
842        }
843
844        #[test]
845        fn endpoint_helper_falls_back_to_standard_variable() {
846            let _guard = lock();
847            let _harn = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
848            let _standard = ScopedEnvVar::set(
849                "OTEL_EXPORTER_OTLP_ENDPOINT",
850                "http://generic.example.test:4318",
851            );
852
853            assert_eq!(
854                otel_endpoint_from_env().as_deref(),
855                Some("http://generic.example.test:4318"),
856            );
857        }
858
859        #[test]
860        fn endpoint_helper_ignores_whitespace_only_values() {
861            let _guard = lock();
862            let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", "   ");
863            let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
864
865            assert!(otel_endpoint_from_env().is_none());
866        }
867
868        #[test]
869        fn service_name_helper_layers_defaults() {
870            let _guard = lock();
871            let _harn = ScopedEnvVar::remove("HARN_OTEL_SERVICE_NAME");
872            let _standard = ScopedEnvVar::remove("OTEL_SERVICE_NAME");
873            assert_eq!(otel_service_name_from_env(), "harn");
874
875            let _standard = ScopedEnvVar::set("OTEL_SERVICE_NAME", "editor");
876            assert_eq!(otel_service_name_from_env(), "editor");
877
878            let _harn = ScopedEnvVar::set("HARN_OTEL_SERVICE_NAME", "burin-tui");
879            assert_eq!(otel_service_name_from_env(), "burin-tui");
880        }
881
882        #[test]
883        fn headers_helper_parses_comma_separated_pairs() {
884            let _guard = lock();
885            let _harn = ScopedEnvVar::set(
886                "HARN_OTEL_HEADERS",
887                "x-honeycomb-team=abc123, x-other=val ,blank=",
888            );
889
890            let headers = otel_headers_from_env();
891            assert_eq!(
892                headers.get("x-honeycomb-team").map(String::as_str),
893                Some("abc123"),
894            );
895            assert_eq!(headers.get("x-other").map(String::as_str), Some("val"));
896            assert!(
897                !headers.contains_key("blank"),
898                "empty values must be dropped to match the orchestrator helper",
899            );
900        }
901
902        #[test]
903        fn normalize_endpoint_appends_traces_path_when_missing() {
904            assert_eq!(
905                normalize_otlp_traces_endpoint("http://localhost:4318"),
906                "http://localhost:4318/v1/traces",
907            );
908            assert_eq!(
909                normalize_otlp_traces_endpoint("http://localhost:4318/"),
910                "http://localhost:4318/v1/traces",
911            );
912            assert_eq!(
913                normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces"),
914                "http://localhost:4318/v1/traces",
915            );
916            assert_eq!(
917                normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces/"),
918                "http://localhost:4318/v1/traces",
919            );
920        }
921    }
922
923    #[cfg(not(feature = "otel"))]
924    #[test]
925    fn install_otel_sink_returns_ok_false_on_non_otel_builds() {
926        let installed = install_otel_sink_from_env().expect("non-otel stub never errors");
927        assert!(!installed);
928    }
929}