Skip to main content

harn_vm/
events.rs

1//! Structured event emission for observability.
2//!
3//! Provides an `EventSink` trait and a thread-local sink registry so that the
4//! VM (and especially the LLM layer) can emit structured log and span events
5//! instead of raw `eprintln!` calls.  Consumers register one or more sinks;
6//! the default `StderrSink` preserves backward-compatible stderr output.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11
12/// Severity level for log events.
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum EventLevel {
15    Trace,
16    Debug,
17    Info,
18    Warn,
19    Error,
20}
21
22/// A structured log event.
23#[derive(Clone, Debug)]
24pub struct LogEvent {
25    pub level: EventLevel,
26    pub category: String,
27    pub message: String,
28    pub metadata: BTreeMap<String, serde_json::Value>,
29}
30
31/// A structured span event (start or end).
32#[derive(Clone, Debug)]
33pub struct SpanEvent {
34    pub span_id: u64,
35    pub parent_id: Option<u64>,
36    pub name: String,
37    pub kind: String,
38    pub metadata: BTreeMap<String, serde_json::Value>,
39}
40
41/// Trait for receiving structured events from the VM.
42pub trait EventSink {
43    fn emit_log(&self, event: &LogEvent);
44    fn emit_span_start(&self, event: &SpanEvent);
45    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
46}
47
48/// Default sink that writes formatted output to stderr.
49pub struct StderrSink;
50
51impl EventSink for StderrSink {
52    fn emit_log(&self, event: &LogEvent) {
53        let level_str = match event.level {
54            EventLevel::Trace => "TRACE",
55            EventLevel::Debug => "DEBUG",
56            EventLevel::Info => "INFO",
57            EventLevel::Warn => "WARN",
58            EventLevel::Error => "ERROR",
59        };
60        // "[harn]" prefix for warn/error is relied on by downstream
61        // tooling and tests that parse stderr.
62        match event.level {
63            EventLevel::Warn => {
64                eprintln!("[harn] warning: {}", event.message);
65            }
66            EventLevel::Error => {
67                eprintln!("[harn] error: {}", event.message);
68            }
69            _ => {
70                eprintln!("[{level_str}] [{}] {}", event.category, event.message);
71            }
72        }
73    }
74
75    fn emit_span_start(&self, _event: &SpanEvent) {
76        // Silent by default — spans are for observability backends.
77    }
78
79    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
80}
81
82/// A sink that collects events for later retrieval (testing, inspection).
83pub struct CollectorSink {
84    pub logs: RefCell<Vec<LogEvent>>,
85    pub spans: RefCell<Vec<SpanEvent>>,
86}
87
88impl CollectorSink {
89    pub fn new() -> Self {
90        Self {
91            logs: RefCell::new(Vec::new()),
92            spans: RefCell::new(Vec::new()),
93        }
94    }
95}
96
97impl Default for CollectorSink {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl EventSink for CollectorSink {
104    fn emit_log(&self, event: &LogEvent) {
105        self.logs.borrow_mut().push(event.clone());
106    }
107
108    fn emit_span_start(&self, event: &SpanEvent) {
109        self.spans.borrow_mut().push(event.clone());
110    }
111
112    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
113}
114
115thread_local! {
116    static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
117}
118
119/// Register an additional event sink.
120pub fn add_event_sink(sink: Rc<dyn EventSink>) {
121    EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
122}
123
124/// Remove all sinks (including the default `StderrSink`).
125pub fn clear_event_sinks() {
126    EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
127}
128
129/// Reset sinks to just the default `StderrSink`.
130pub fn reset_event_sinks() {
131    EVENT_SINKS.with(|sinks| {
132        let mut s = sinks.borrow_mut();
133        s.clear();
134        s.push(Rc::new(StderrSink));
135    });
136}
137
138/// Emit a structured log event to all registered sinks.
139pub fn emit_log(
140    level: EventLevel,
141    category: &str,
142    message: &str,
143    metadata: BTreeMap<String, serde_json::Value>,
144) {
145    let event = LogEvent {
146        level,
147        category: category.to_string(),
148        message: message.to_string(),
149        metadata,
150    };
151    EVENT_SINKS.with(|sinks| {
152        for sink in sinks.borrow().iter() {
153            sink.emit_log(&event);
154        }
155    });
156}
157
158/// Emit a span-start event to all registered sinks.
159pub fn emit_span_start(
160    span_id: u64,
161    parent_id: Option<u64>,
162    name: &str,
163    kind: &str,
164    metadata: BTreeMap<String, serde_json::Value>,
165) {
166    let event = SpanEvent {
167        span_id,
168        parent_id,
169        name: name.to_string(),
170        kind: kind.to_string(),
171        metadata,
172    };
173    EVENT_SINKS.with(|sinks| {
174        for sink in sinks.borrow().iter() {
175            sink.emit_span_start(&event);
176        }
177    });
178}
179
180/// Emit a span-end event to all registered sinks.
181pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
182    EVENT_SINKS.with(|sinks| {
183        for sink in sinks.borrow().iter() {
184            sink.emit_span_end(span_id, &metadata);
185        }
186    });
187}
188
189/// Log at Info level with no metadata.
190pub fn log_info(category: &str, message: &str) {
191    emit_log(EventLevel::Info, category, message, BTreeMap::new());
192}
193
194/// Log at Warn level with no metadata.
195pub fn log_warn(category: &str, message: &str) {
196    emit_log(EventLevel::Warn, category, message, BTreeMap::new());
197}
198
199/// Log at Error level with no metadata.
200pub fn log_error(category: &str, message: &str) {
201    emit_log(EventLevel::Error, category, message, BTreeMap::new());
202}
203
204/// Log at Debug level with no metadata.
205pub fn log_debug(category: &str, message: &str) {
206    emit_log(EventLevel::Debug, category, message, BTreeMap::new());
207}
208
209/// Log at Info level with metadata.
210pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
211    emit_log(EventLevel::Info, category, message, metadata);
212}
213
214/// Log at Warn level with metadata.
215pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
216    emit_log(EventLevel::Warn, category, message, metadata);
217}
218
219/// OpenTelemetry exporter sink. Requires the `otel` feature flag.
220/// Forwards Harn log events and span lifecycle to OTLP collectors.
221///
222/// Active spans are stored keyed by Harn's `span_id` so that
223/// `emit_span_end` can close the correct OTel span.
224#[cfg(feature = "otel")]
225pub struct OtelSink {
226    provider: opentelemetry_sdk::trace::SdkTracerProvider,
227    active_spans:
228        std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
229}
230
231#[cfg(feature = "otel")]
232impl OtelSink {
233    /// Create a new OTel sink. Reads OTLP configuration from the
234    /// environment (endpoint, service name, headers). Errors when the
235    /// span exporter fails to initialise — a missing endpoint is **not**
236    /// an error; the exporter falls back to OpenTelemetry's default
237    /// (`http://localhost:4318/v1/traces`). Callers that want
238    /// presence-of-endpoint to gate registration should use
239    /// [`install_otel_sink_from_env`].
240    pub fn new() -> Result<Self, String> {
241        use opentelemetry::global;
242        use opentelemetry_otlp::{
243            Protocol, SpanExporter, WithExportConfig as _, WithHttpConfig as _,
244        };
245        use opentelemetry_sdk::runtime;
246        use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
247        use opentelemetry_sdk::trace::SdkTracerProvider;
248        use opentelemetry_sdk::Resource;
249
250        let endpoint = otel_endpoint_from_env();
251        let headers = otel_headers_from_env();
252        let service_name = otel_service_name_from_env();
253
254        let mut exporter_builder = SpanExporter::builder()
255            .with_http()
256            .with_protocol(Protocol::HttpJson)
257            .with_headers(headers);
258        if let Some(endpoint) = endpoint.as_deref() {
259            exporter_builder =
260                exporter_builder.with_endpoint(normalize_otlp_traces_endpoint(endpoint));
261        }
262        let exporter = exporter_builder
263            .build()
264            .map_err(|e| format!("OTel span exporter init failed: {e}"))?;
265
266        // Drive the batch processor on the current Tokio runtime so
267        // the exporter's reqwest client can reach the network. The
268        // default SDK processor spawns its own thread, which has no
269        // Tokio reactor and panics on the first send — we need the
270        // async-runtime variant for the same reason the orchestrator
271        // path uses it.
272        let provider = SdkTracerProvider::builder()
273            .with_resource(Resource::builder().with_service_name(service_name).build())
274            .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
275            .build();
276
277        global::set_tracer_provider(provider.clone());
278
279        Ok(Self {
280            provider,
281            active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
282        })
283    }
284}
285
286/// The host spawns a fresh `harn` child per session, so the only
287/// reliable way to opt into local trace export is via environment
288/// variables read at startup. Prefer the Harn-specific variable so a
289/// caller that points an unrelated process at an OTLP collector via
290/// the shared OpenTelemetry variable doesn't accidentally enable Harn
291/// emission too.
292#[cfg(feature = "otel")]
293fn otel_endpoint_from_env() -> Option<String> {
294    for name in ["HARN_OTEL_ENDPOINT", "OTEL_EXPORTER_OTLP_ENDPOINT"] {
295        if let Ok(value) = std::env::var(name) {
296            let trimmed = value.trim();
297            if !trimmed.is_empty() {
298                return Some(trimmed.to_string());
299            }
300        }
301    }
302    None
303}
304
305#[cfg(feature = "otel")]
306fn otel_service_name_from_env() -> String {
307    for name in ["HARN_OTEL_SERVICE_NAME", "OTEL_SERVICE_NAME"] {
308        if let Ok(value) = std::env::var(name) {
309            let trimmed = value.trim();
310            if !trimmed.is_empty() {
311                return trimmed.to_string();
312            }
313        }
314    }
315    "harn".to_string()
316}
317
318#[cfg(feature = "otel")]
319fn otel_headers_from_env() -> std::collections::HashMap<String, String> {
320    let raw = std::env::var("HARN_OTEL_HEADERS")
321        .ok()
322        .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_HEADERS").ok())
323        .unwrap_or_default();
324    raw.split([',', '\n', ';'])
325        .map(str::trim)
326        .filter(|segment| !segment.is_empty())
327        .filter_map(|segment| {
328            let (name, value) = segment
329                .split_once('=')
330                .or_else(|| segment.split_once(':'))?;
331            let name = name.trim();
332            let value = value.trim();
333            if name.is_empty() || value.is_empty() {
334                return None;
335            }
336            Some((name.to_string(), value.to_string()))
337        })
338        .collect()
339}
340
341#[cfg(feature = "otel")]
342fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
343    let trimmed = endpoint.trim_end_matches('/');
344    if trimmed.ends_with("/v1/traces") {
345        trimmed.to_string()
346    } else {
347        format!("{trimmed}/v1/traces")
348    }
349}
350
351/// Stable OTel span-end attribute keys exported as top-level attributes.
352///
353/// Keep this list narrow: span metadata can be script-controlled, and OTel
354/// backends charge or index by attribute key. Runtime code that needs another
355/// top-level key should add an exact `harn.*` key here rather than letting
356/// arbitrary metadata become a new exported attribute name.
357#[cfg(feature = "otel")]
358const ALLOWED_SPAN_ATTR_KEYS: &[&str] = &[
359    "harn.duration_ms",
360    "harn.error",
361    "harn.error.kind",
362    "harn.kind",
363    "harn.span_id",
364    "harn.status",
365];
366
367/// Runtime-owned low-cardinality attribute namespaces.
368///
369/// These prefixes are for stable schema families, not dynamic suffixes such as
370/// run ids, file paths, or UUIDs. Metadata outside this exact-key/prefix
371/// allowlist is folded into `harn.meta_json` so OTel sees one bounded key.
372#[cfg(feature = "otel")]
373const ALLOWED_SPAN_ATTR_PREFIXES: &[&str] = &[
374    "harn.cost.",
375    "harn.lifecycle.",
376    "harn.llm.",
377    "harn.step.",
378    "harn.timing.",
379    "harn.token.",
380    "harn.tool.",
381    "harn.worker.",
382];
383
384#[cfg(feature = "otel")]
385fn is_low_cardinality_attr_key(key: &str) -> bool {
386    ALLOWED_SPAN_ATTR_KEYS.contains(&key)
387        || ALLOWED_SPAN_ATTR_PREFIXES
388            .iter()
389            .any(|prefix| key.starts_with(prefix))
390}
391
392#[cfg(feature = "otel")]
393fn otel_span_end_attributes(
394    metadata: &BTreeMap<String, serde_json::Value>,
395) -> Vec<(String, String)> {
396    let policy = crate::redact::current_policy();
397    let mut attributes = Vec::new();
398    let mut meta_json = BTreeMap::new();
399
400    for (key, value) in metadata {
401        if is_low_cardinality_attr_key(key) {
402            let raw = format!("{value}");
403            let redacted = policy.redact_string(&raw).into_owned();
404            attributes.push((key.clone(), redacted));
405        } else {
406            meta_json.insert(key.clone(), value.clone());
407        }
408    }
409
410    if !meta_json.is_empty() {
411        let raw = serde_json::to_string(&meta_json).unwrap_or_else(|_| "{}".to_string());
412        let redacted = policy.redact_string(&raw).into_owned();
413        attributes.push(("harn.meta_json".to_string(), redacted));
414    }
415
416    attributes
417}
418
419/// Idempotency guard for [`install_otel_sink_from_env`]. The first
420/// caller wins; later ones become no-ops returning `Ok(false)`. The
421/// stored provider keeps the batch processor's runtime alive — and
422/// gives [`shutdown_otel_sink`] a handle to flush before the host
423/// tokio runtime exits.
424#[cfg(feature = "otel")]
425static OTEL_PROVIDER: std::sync::OnceLock<
426    std::sync::Mutex<Option<opentelemetry_sdk::trace::SdkTracerProvider>>,
427> = std::sync::OnceLock::new();
428
429/// Register an [`OtelSink`] into the thread-local event sink chain when
430/// the environment is configured for OTLP export.
431///
432/// Returns `Ok(true)` when a sink was installed, `Ok(false)` when no
433/// OTLP endpoint is configured (or when a sink has already been
434/// installed for this process), and `Err` when the exporter failed to
435/// initialise.
436///
437/// The exporter activates iff at least one of `HARN_OTEL_ENDPOINT` or
438/// the standard `OTEL_EXPORTER_OTLP_ENDPOINT` is non-empty. Service
439/// name comes from `HARN_OTEL_SERVICE_NAME` → `OTEL_SERVICE_NAME` →
440/// `"harn"` (in that order). Headers come from `HARN_OTEL_HEADERS` →
441/// `OTEL_EXPORTER_OTLP_HEADERS` (comma/semicolon-separated
442/// `name=value` pairs).
443///
444/// Hosts (`harn run`, `harn serve acp`, and other embedders) should
445/// call this once near process startup so any spans emitted during the
446/// session land at the configured collector.
447#[cfg(feature = "otel")]
448pub fn install_otel_sink_from_env() -> Result<bool, String> {
449    if otel_endpoint_from_env().is_none() {
450        return Ok(false);
451    }
452    let provider_slot = OTEL_PROVIDER.get_or_init(|| std::sync::Mutex::new(None));
453    {
454        let guard = provider_slot.lock().expect("otel provider mutex poisoned");
455        if guard.is_some() {
456            // A sink was already installed in this process. Don't
457            // double-register; the existing one will keep emitting.
458            return Ok(false);
459        }
460    }
461    let sink = OtelSink::new()?;
462    let provider = sink.provider.clone();
463    add_event_sink(Rc::new(sink));
464    provider_slot
465        .lock()
466        .expect("otel provider mutex poisoned")
467        .replace(provider);
468    Ok(true)
469}
470
471/// Flush and tear down the auto-registered OTel sink. Hosts that
472/// shut down their tokio runtime before process exit must call this
473/// while the runtime is still alive — `BatchSpanProcessor` needs a
474/// reactor to drain queued exports, and the [`Drop`] impl on
475/// [`OtelSink`] otherwise runs after the runtime is gone. Safe to
476/// call when no sink was installed.
477///
478/// Returns `Ok(true)` when a provider was flushed, `Ok(false)` when
479/// none was installed, and `Err` when the SDK reported an export or
480/// shutdown error. Errors are advisory — long-running hosts should
481/// log and continue.
482#[cfg(feature = "otel")]
483pub fn shutdown_otel_sink() -> Result<bool, String> {
484    let Some(slot) = OTEL_PROVIDER.get() else {
485        return Ok(false);
486    };
487    let provider = {
488        let mut guard = slot.lock().expect("otel provider mutex poisoned");
489        guard.take()
490    };
491    let Some(provider) = provider else {
492        return Ok(false);
493    };
494    provider
495        .force_flush()
496        .map_err(|error| format!("OTel force_flush failed: {error}"))?;
497    provider
498        .shutdown()
499        .map_err(|error| format!("OTel shutdown failed: {error}"))?;
500    Ok(true)
501}
502
503/// No-op stub for builds compiled without the `otel` feature. Returns
504/// `Ok(false)` so call sites can use the same code path on either
505/// build.
506#[cfg(not(feature = "otel"))]
507pub fn install_otel_sink_from_env() -> Result<bool, String> {
508    Ok(false)
509}
510
511#[cfg(not(feature = "otel"))]
512pub fn shutdown_otel_sink() -> Result<bool, String> {
513    Ok(false)
514}
515
516#[cfg(feature = "otel")]
517impl EventSink for OtelSink {
518    fn emit_log(&self, event: &LogEvent) {
519        use opentelemetry::trace::{Tracer, TracerProvider};
520        let tracer = self.provider.tracer("harn");
521        // Apply the unified redaction policy to attribute values
522        // before they leave the process. The active policy includes
523        // the OAuth token catalog (HARN-OAU-001) so a Bearer header
524        // or provider token that snuck into a log message gets
525        // scrubbed before it lands at the OTel collector.
526        let policy = crate::redact::current_policy();
527        // Log events are zero-duration spans — start and immediately drop.
528        let _span = tracer
529            .span_builder(format!("log.{}", event.category))
530            .with_attributes(vec![
531                opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
532                opentelemetry::KeyValue::new(
533                    "message",
534                    policy.redact_string(&event.message).into_owned(),
535                ),
536                opentelemetry::KeyValue::new("category", event.category.clone()),
537            ])
538            .start(&tracer);
539    }
540
541    fn emit_span_start(&self, event: &SpanEvent) {
542        use opentelemetry::trace::{Tracer, TracerProvider};
543        let tracer = self.provider.tracer("harn");
544        let span = tracer
545            .span_builder(event.name.clone())
546            .with_attributes(vec![
547                opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
548                opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
549            ])
550            .start(&tracer);
551        self.active_spans.borrow_mut().insert(event.span_id, span);
552    }
553
554    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
555        use opentelemetry::trace::Span;
556        if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
557            // OTel span attributes are the fourth sink covered by the
558            // OA-06 token-redaction policy (transcripts, audit
559            // receipts, OTel, and system reminders). The helper also
560            // bounds attribute-key cardinality by folding unknown
561            // metadata into `harn.meta_json`.
562            for (key, redacted) in otel_span_end_attributes(metadata) {
563                span.set_attribute(opentelemetry::KeyValue::new(key.clone(), redacted));
564            }
565            span.end();
566        }
567    }
568}
569
570#[cfg(feature = "otel")]
571impl Drop for OtelSink {
572    fn drop(&mut self) {
573        // End any spans that were never closed (abnormal shutdown).
574        self.active_spans.borrow_mut().clear();
575        let _ = self.provider.shutdown();
576    }
577}
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582
583    #[test]
584    fn test_collector_sink_captures_logs() {
585        let sink = Rc::new(CollectorSink::new());
586        clear_event_sinks();
587        add_event_sink(sink.clone());
588
589        log_info("llm", "test message");
590        log_warn("llm.cost", "cost warning");
591        log_error("llm.agent", "agent error");
592
593        let logs = sink.logs.borrow();
594        assert_eq!(logs.len(), 3);
595        assert_eq!(logs[0].level, EventLevel::Info);
596        assert_eq!(logs[0].category, "llm");
597        assert_eq!(logs[0].message, "test message");
598        assert_eq!(logs[1].level, EventLevel::Warn);
599        assert_eq!(logs[2].level, EventLevel::Error);
600
601        // Restore default sinks for other tests.
602        reset_event_sinks();
603    }
604
605    #[test]
606    fn test_collector_sink_captures_spans() {
607        let sink = Rc::new(CollectorSink::new());
608        clear_event_sinks();
609        add_event_sink(sink.clone());
610
611        emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
612        emit_span_end(1, BTreeMap::new());
613
614        let spans = sink.spans.borrow();
615        assert_eq!(spans.len(), 1);
616        assert_eq!(spans[0].span_id, 1);
617        assert_eq!(spans[0].name, "agent_loop");
618
619        reset_event_sinks();
620    }
621
622    #[test]
623    fn test_stderr_sink_does_not_panic() {
624        let sink = StderrSink;
625        let event = LogEvent {
626            level: EventLevel::Warn,
627            category: "test".into(),
628            message: "hello".into(),
629            metadata: BTreeMap::new(),
630        };
631        sink.emit_log(&event);
632        sink.emit_span_start(&SpanEvent {
633            span_id: 1,
634            parent_id: None,
635            name: "x".into(),
636            kind: "y".into(),
637            metadata: BTreeMap::new(),
638        });
639        sink.emit_span_end(1, &BTreeMap::new());
640    }
641
642    #[test]
643    fn test_multiple_sinks() {
644        let a = Rc::new(CollectorSink::new());
645        let b = Rc::new(CollectorSink::new());
646        clear_event_sinks();
647        add_event_sink(a.clone());
648        add_event_sink(b.clone());
649
650        log_debug("x", "msg");
651
652        assert_eq!(a.logs.borrow().len(), 1);
653        assert_eq!(b.logs.borrow().len(), 1);
654
655        reset_event_sinks();
656    }
657
658    #[test]
659    fn test_log_with_metadata() {
660        let sink = Rc::new(CollectorSink::new());
661        clear_event_sinks();
662        add_event_sink(sink.clone());
663
664        let mut meta = BTreeMap::new();
665        meta.insert("tokens".into(), serde_json::json!(42));
666        log_info_meta("llm", "token usage", meta);
667
668        let logs = sink.logs.borrow();
669        assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
670
671        reset_event_sinks();
672    }
673
674    #[cfg(feature = "otel")]
675    #[derive(Default)]
676    struct SpanAttrCollectorSink {
677        attrs: RefCell<Vec<(String, String)>>,
678    }
679
680    #[cfg(feature = "otel")]
681    impl EventSink for SpanAttrCollectorSink {
682        fn emit_log(&self, _event: &LogEvent) {}
683
684        fn emit_span_start(&self, _event: &SpanEvent) {}
685
686        fn emit_span_end(&self, _span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
687            self.attrs
688                .borrow_mut()
689                .extend(otel_span_end_attributes(metadata));
690        }
691    }
692
693    #[cfg(feature = "otel")]
694    #[test]
695    fn span_attr_keys_are_low_cardinality() {
696        let sink = Rc::new(SpanAttrCollectorSink::default());
697        clear_event_sinks();
698        add_event_sink(sink.clone());
699
700        let rogue_key = "request.550e8400-e29b-41d4-a716-446655440000";
701        let mut metadata = BTreeMap::new();
702        metadata.insert("harn.kind".to_string(), serde_json::json!("llm_call"));
703        metadata.insert(rogue_key.to_string(), serde_json::json!("rogue-value"));
704
705        emit_span_end(42, metadata);
706        reset_event_sinks();
707
708        let attrs = sink.attrs.borrow();
709        assert!(
710            attrs
711                .iter()
712                .any(|(key, value)| key == "harn.kind" && value.contains("llm_call")),
713            "allowlisted harn.kind should remain a top-level OTel attribute: {attrs:?}",
714        );
715        assert!(
716            !attrs.iter().any(|(key, _)| key == rogue_key),
717            "rogue metadata key must not become a top-level OTel attribute: {attrs:?}",
718        );
719        let (_, meta_json) = attrs
720            .iter()
721            .find(|(key, _)| key == "harn.meta_json")
722            .expect("rogue metadata should be folded into harn.meta_json");
723        let blob: serde_json::Value =
724            serde_json::from_str(meta_json).expect("harn.meta_json should stay JSON");
725        assert_eq!(blob[rogue_key], serde_json::json!("rogue-value"));
726    }
727
728    #[cfg(feature = "otel")]
729    mod otel_env {
730        use super::super::*;
731        use std::sync::{Mutex, MutexGuard, OnceLock};
732
733        /// Serializes env-mutating tests in this module. Crate-wide
734        /// `crate::llm::env_lock()` is reserved for LLM env scopes; a
735        /// dedicated lock here keeps these tests independent.
736        fn lock() -> MutexGuard<'static, ()> {
737            static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
738            LOCK.get_or_init(|| Mutex::new(()))
739                .lock()
740                .expect("otel env lock")
741        }
742
743        /// RAII guard for a single env var. Saves the prior value on
744        /// construction and restores it on Drop so parallel tests in
745        /// the same process don't leak state.
746        struct ScopedEnvVar {
747            key: &'static str,
748            previous: Option<String>,
749        }
750
751        impl ScopedEnvVar {
752            fn set(key: &'static str, value: &str) -> Self {
753                let previous = std::env::var(key).ok();
754                // SAFETY: env mutation is serialized by the test-level
755                // `lock()` above; no other thread inspects these
756                // variables while a guard is alive.
757                unsafe { std::env::set_var(key, value) };
758                Self { key, previous }
759            }
760
761            fn remove(key: &'static str) -> Self {
762                let previous = std::env::var(key).ok();
763                // SAFETY: see `set` above.
764                unsafe { std::env::remove_var(key) };
765                Self { key, previous }
766            }
767        }
768
769        impl Drop for ScopedEnvVar {
770            fn drop(&mut self) {
771                // SAFETY: see `set` above. Restoration happens while the
772                // test still holds the module lock.
773                match &self.previous {
774                    Some(value) => unsafe { std::env::set_var(self.key, value) },
775                    None => unsafe { std::env::remove_var(self.key) },
776                }
777            }
778        }
779
780        #[test]
781        fn install_returns_false_when_endpoint_unset() {
782            let _guard = lock();
783            let _endpoint = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
784            let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
785
786            let installed = install_otel_sink_from_env()
787                .expect("install must not error when endpoint is unset");
788            assert!(!installed, "expected no sink registration without endpoint");
789        }
790
791        #[test]
792        fn endpoint_helper_prefers_harn_variable() {
793            let _guard = lock();
794            let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", "http://harn.example.test:4318");
795            let _standard = ScopedEnvVar::set(
796                "OTEL_EXPORTER_OTLP_ENDPOINT",
797                "http://generic.example.test:4318",
798            );
799
800            assert_eq!(
801                otel_endpoint_from_env().as_deref(),
802                Some("http://harn.example.test:4318"),
803            );
804        }
805
806        #[test]
807        fn endpoint_helper_falls_back_to_standard_variable() {
808            let _guard = lock();
809            let _harn = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
810            let _standard = ScopedEnvVar::set(
811                "OTEL_EXPORTER_OTLP_ENDPOINT",
812                "http://generic.example.test:4318",
813            );
814
815            assert_eq!(
816                otel_endpoint_from_env().as_deref(),
817                Some("http://generic.example.test:4318"),
818            );
819        }
820
821        #[test]
822        fn endpoint_helper_ignores_whitespace_only_values() {
823            let _guard = lock();
824            let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", "   ");
825            let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
826
827            assert!(otel_endpoint_from_env().is_none());
828        }
829
830        #[test]
831        fn service_name_helper_layers_defaults() {
832            let _guard = lock();
833            let _harn = ScopedEnvVar::remove("HARN_OTEL_SERVICE_NAME");
834            let _standard = ScopedEnvVar::remove("OTEL_SERVICE_NAME");
835            assert_eq!(otel_service_name_from_env(), "harn");
836
837            let _standard = ScopedEnvVar::set("OTEL_SERVICE_NAME", "editor");
838            assert_eq!(otel_service_name_from_env(), "editor");
839
840            let _harn = ScopedEnvVar::set("HARN_OTEL_SERVICE_NAME", "burin-tui");
841            assert_eq!(otel_service_name_from_env(), "burin-tui");
842        }
843
844        #[test]
845        fn headers_helper_parses_comma_separated_pairs() {
846            let _guard = lock();
847            let _harn = ScopedEnvVar::set(
848                "HARN_OTEL_HEADERS",
849                "x-honeycomb-team=abc123, x-other=val ,blank=",
850            );
851
852            let headers = otel_headers_from_env();
853            assert_eq!(
854                headers.get("x-honeycomb-team").map(String::as_str),
855                Some("abc123"),
856            );
857            assert_eq!(headers.get("x-other").map(String::as_str), Some("val"));
858            assert!(
859                !headers.contains_key("blank"),
860                "empty values must be dropped to match the orchestrator helper",
861            );
862        }
863
864        #[test]
865        fn normalize_endpoint_appends_traces_path_when_missing() {
866            assert_eq!(
867                normalize_otlp_traces_endpoint("http://localhost:4318"),
868                "http://localhost:4318/v1/traces",
869            );
870            assert_eq!(
871                normalize_otlp_traces_endpoint("http://localhost:4318/"),
872                "http://localhost:4318/v1/traces",
873            );
874            assert_eq!(
875                normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces"),
876                "http://localhost:4318/v1/traces",
877            );
878            assert_eq!(
879                normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces/"),
880                "http://localhost:4318/v1/traces",
881            );
882        }
883    }
884
885    #[cfg(not(feature = "otel"))]
886    #[test]
887    fn install_otel_sink_returns_ok_false_on_non_otel_builds() {
888        let installed = install_otel_sink_from_env().expect("non-otel stub never errors");
889        assert!(!installed);
890    }
891}