Skip to main content

harn_vm/
events.rs

1//! Structured event emission for observability.
2//!
3//! Provides an `EventSink` trait and a thread-local sink registry so that the
4//! VM (and especially the LLM layer) can emit structured log and span events
5//! instead of raw `eprintln!` calls.  Consumers register one or more sinks;
6//! the default `StderrSink` preserves backward-compatible stderr output.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11
12/// Severity level for log events.
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum EventLevel {
15    Trace,
16    Debug,
17    Info,
18    Warn,
19    Error,
20}
21
22/// A structured log event.
23#[derive(Clone, Debug)]
24pub struct LogEvent {
25    pub level: EventLevel,
26    pub category: String,
27    pub message: String,
28    pub metadata: BTreeMap<String, serde_json::Value>,
29}
30
31/// A structured span event (start or end).
32#[derive(Clone, Debug)]
33pub struct SpanEvent {
34    pub span_id: u64,
35    pub parent_id: Option<u64>,
36    pub name: String,
37    pub kind: String,
38    pub metadata: BTreeMap<String, serde_json::Value>,
39}
40
41/// Trait for receiving structured events from the VM.
42pub trait EventSink {
43    fn emit_log(&self, event: &LogEvent);
44    fn emit_span_start(&self, event: &SpanEvent);
45    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
46}
47
48/// Default sink that writes formatted output to stderr.
49pub struct StderrSink;
50
51impl EventSink for StderrSink {
52    fn emit_log(&self, event: &LogEvent) {
53        let level_str = match event.level {
54            EventLevel::Trace => "TRACE",
55            EventLevel::Debug => "DEBUG",
56            EventLevel::Info => "INFO",
57            EventLevel::Warn => "WARN",
58            EventLevel::Error => "ERROR",
59        };
60        // "[harn]" prefix for warn/error is relied on by downstream
61        // tooling and tests that parse stderr.
62        match event.level {
63            EventLevel::Warn => {
64                eprintln!("[harn] warning: {}", event.message);
65            }
66            EventLevel::Error => {
67                eprintln!("[harn] error: {}", event.message);
68            }
69            _ => {
70                eprintln!("[{level_str}] [{}] {}", event.category, event.message);
71            }
72        }
73    }
74
75    fn emit_span_start(&self, _event: &SpanEvent) {
76        // Silent by default — spans are for observability backends.
77    }
78
79    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
80}
81
82/// A sink that collects events for later retrieval (testing, inspection).
83pub struct CollectorSink {
84    pub logs: RefCell<Vec<LogEvent>>,
85    pub spans: RefCell<Vec<SpanEvent>>,
86}
87
88impl CollectorSink {
89    pub fn new() -> Self {
90        Self {
91            logs: RefCell::new(Vec::new()),
92            spans: RefCell::new(Vec::new()),
93        }
94    }
95}
96
97impl Default for CollectorSink {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl EventSink for CollectorSink {
104    fn emit_log(&self, event: &LogEvent) {
105        self.logs.borrow_mut().push(event.clone());
106    }
107
108    fn emit_span_start(&self, event: &SpanEvent) {
109        self.spans.borrow_mut().push(event.clone());
110    }
111
112    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
113}
114
115thread_local! {
116    static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
117}
118
119/// Register an additional event sink.
120pub fn add_event_sink(sink: Rc<dyn EventSink>) {
121    EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
122}
123
124/// Remove all sinks (including the default `StderrSink`).
125pub fn clear_event_sinks() {
126    EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
127}
128
129/// Reset sinks to just the default `StderrSink`.
130pub fn reset_event_sinks() {
131    EVENT_SINKS.with(|sinks| {
132        let mut s = sinks.borrow_mut();
133        s.clear();
134        s.push(Rc::new(StderrSink));
135    });
136}
137
138/// Emit a structured log event to all registered sinks.
139pub fn emit_log(
140    level: EventLevel,
141    category: &str,
142    message: &str,
143    metadata: BTreeMap<String, serde_json::Value>,
144) {
145    let event = LogEvent {
146        level,
147        category: category.to_string(),
148        message: message.to_string(),
149        metadata,
150    };
151    EVENT_SINKS.with(|sinks| {
152        for sink in sinks.borrow().iter() {
153            sink.emit_log(&event);
154        }
155    });
156}
157
158/// Emit a span-start event to all registered sinks.
159pub fn emit_span_start(
160    span_id: u64,
161    parent_id: Option<u64>,
162    name: &str,
163    kind: &str,
164    metadata: BTreeMap<String, serde_json::Value>,
165) {
166    let event = SpanEvent {
167        span_id,
168        parent_id,
169        name: name.to_string(),
170        kind: kind.to_string(),
171        metadata,
172    };
173    EVENT_SINKS.with(|sinks| {
174        for sink in sinks.borrow().iter() {
175            sink.emit_span_start(&event);
176        }
177    });
178}
179
180/// Emit a span-end event to all registered sinks.
181pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
182    EVENT_SINKS.with(|sinks| {
183        for sink in sinks.borrow().iter() {
184            sink.emit_span_end(span_id, &metadata);
185        }
186    });
187}
188
189/// Log at Info level with no metadata.
190pub fn log_info(category: &str, message: &str) {
191    emit_log(EventLevel::Info, category, message, BTreeMap::new());
192}
193
194/// Log at Warn level with no metadata.
195pub fn log_warn(category: &str, message: &str) {
196    emit_log(EventLevel::Warn, category, message, BTreeMap::new());
197}
198
199/// Log at Error level with no metadata.
200pub fn log_error(category: &str, message: &str) {
201    emit_log(EventLevel::Error, category, message, BTreeMap::new());
202}
203
204/// Log at Debug level with no metadata.
205pub fn log_debug(category: &str, message: &str) {
206    emit_log(EventLevel::Debug, category, message, BTreeMap::new());
207}
208
209/// Log at Info level with metadata.
210pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
211    emit_log(EventLevel::Info, category, message, metadata);
212}
213
214/// Log at Warn level with metadata.
215pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
216    emit_log(EventLevel::Warn, category, message, metadata);
217}
218
219/// OpenTelemetry exporter sink. Requires the `otel` feature flag.
220/// Forwards Harn log events and span lifecycle to OTLP collectors.
221///
222/// Active spans are stored keyed by Harn's `span_id` so that
223/// `emit_span_end` can close the correct OTel span.
224#[cfg(feature = "otel")]
225pub struct OtelSink {
226    provider: opentelemetry_sdk::trace::SdkTracerProvider,
227    active_spans:
228        std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
229}
230
231#[cfg(feature = "otel")]
232impl OtelSink {
233    /// Create a new OTel sink. Reads OTLP configuration from the
234    /// environment (endpoint, service name, headers). Errors when the
235    /// span exporter fails to initialise — a missing endpoint is **not**
236    /// an error; the exporter falls back to OpenTelemetry's default
237    /// (`http://localhost:4318/v1/traces`). Callers that want
238    /// presence-of-endpoint to gate registration should use
239    /// [`install_otel_sink_from_env`].
240    pub fn new() -> Result<Self, String> {
241        use opentelemetry::global;
242        use opentelemetry_otlp::{
243            Protocol, SpanExporter, WithExportConfig as _, WithHttpConfig as _,
244        };
245        use opentelemetry_sdk::runtime;
246        use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
247        use opentelemetry_sdk::trace::SdkTracerProvider;
248        use opentelemetry_sdk::Resource;
249
250        let endpoint = otel_endpoint_from_env();
251        let headers = otel_headers_from_env();
252        let service_name = otel_service_name_from_env();
253
254        // opentelemetry-otlp does not pull in any default HTTP client
255        // because the `reqwest-rustls` feature only opts the dep in —
256        // the exporter still requires an explicit client. Reuse the
257        // same reqwest configuration as the orchestrator-side
258        // provider so both surfaces hit the collector with identical
259        // TLS + connection-pool behaviour.
260        let http_client = reqwest::Client::builder()
261            .build()
262            .map_err(|error| format!("failed to build OTLP HTTP client: {error}"))?;
263
264        let mut exporter_builder = SpanExporter::builder()
265            .with_http()
266            .with_http_client(http_client)
267            .with_protocol(Protocol::HttpJson)
268            .with_headers(headers);
269        if let Some(endpoint) = endpoint.as_deref() {
270            exporter_builder =
271                exporter_builder.with_endpoint(normalize_otlp_traces_endpoint(endpoint));
272        }
273        let exporter = exporter_builder
274            .build()
275            .map_err(|e| format!("OTel span exporter init failed: {e}"))?;
276
277        // Drive the batch processor on the current Tokio runtime so
278        // the exporter's reqwest client can reach the network. The
279        // default SDK processor spawns its own thread, which has no
280        // Tokio reactor and panics on the first send — we need the
281        // async-runtime variant for the same reason the orchestrator
282        // path uses it.
283        let provider = SdkTracerProvider::builder()
284            .with_resource(Resource::builder().with_service_name(service_name).build())
285            .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
286            .build();
287
288        global::set_tracer_provider(provider.clone());
289
290        Ok(Self {
291            provider,
292            active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
293        })
294    }
295}
296
297/// Burin-side spawns a fresh `harn` child per session, so the only
298/// reliable way to opt into local trace export is via environment
299/// variables read at startup. Prefer the Harn-specific variable so a
300/// caller that points an unrelated process at an OTLP collector via
301/// the shared OpenTelemetry variable doesn't accidentally enable Harn
302/// emission too.
303#[cfg(feature = "otel")]
304fn otel_endpoint_from_env() -> Option<String> {
305    for name in ["HARN_OTEL_ENDPOINT", "OTEL_EXPORTER_OTLP_ENDPOINT"] {
306        if let Ok(value) = std::env::var(name) {
307            let trimmed = value.trim();
308            if !trimmed.is_empty() {
309                return Some(trimmed.to_string());
310            }
311        }
312    }
313    None
314}
315
316#[cfg(feature = "otel")]
317fn otel_service_name_from_env() -> String {
318    for name in ["HARN_OTEL_SERVICE_NAME", "OTEL_SERVICE_NAME"] {
319        if let Ok(value) = std::env::var(name) {
320            let trimmed = value.trim();
321            if !trimmed.is_empty() {
322                return trimmed.to_string();
323            }
324        }
325    }
326    "harn".to_string()
327}
328
329#[cfg(feature = "otel")]
330fn otel_headers_from_env() -> std::collections::HashMap<String, String> {
331    let raw = std::env::var("HARN_OTEL_HEADERS")
332        .ok()
333        .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_HEADERS").ok())
334        .unwrap_or_default();
335    raw.split([',', '\n', ';'])
336        .map(str::trim)
337        .filter(|segment| !segment.is_empty())
338        .filter_map(|segment| {
339            let (name, value) = segment
340                .split_once('=')
341                .or_else(|| segment.split_once(':'))?;
342            let name = name.trim();
343            let value = value.trim();
344            if name.is_empty() || value.is_empty() {
345                return None;
346            }
347            Some((name.to_string(), value.to_string()))
348        })
349        .collect()
350}
351
352#[cfg(feature = "otel")]
353fn normalize_otlp_traces_endpoint(endpoint: &str) -> String {
354    let trimmed = endpoint.trim_end_matches('/');
355    if trimmed.ends_with("/v1/traces") {
356        trimmed.to_string()
357    } else {
358        format!("{trimmed}/v1/traces")
359    }
360}
361
362/// Stable OTel span-end attribute keys exported as top-level attributes.
363///
364/// Keep this list narrow: span metadata can be script-controlled, and OTel
365/// backends charge or index by attribute key. Runtime code that needs another
366/// top-level key should add an exact `harn.*` key here rather than letting
367/// arbitrary metadata become a new exported attribute name.
368#[cfg(feature = "otel")]
369const ALLOWED_SPAN_ATTR_KEYS: &[&str] = &[
370    "harn.duration_ms",
371    "harn.error",
372    "harn.error.kind",
373    "harn.kind",
374    "harn.span_id",
375    "harn.status",
376];
377
378/// Runtime-owned low-cardinality attribute namespaces.
379///
380/// These prefixes are for stable schema families, not dynamic suffixes such as
381/// run ids, file paths, or UUIDs. Metadata outside this exact-key/prefix
382/// allowlist is folded into `harn.meta_json` so OTel sees one bounded key.
383#[cfg(feature = "otel")]
384const ALLOWED_SPAN_ATTR_PREFIXES: &[&str] = &[
385    "harn.cost.",
386    "harn.lifecycle.",
387    "harn.llm.",
388    "harn.step.",
389    "harn.timing.",
390    "harn.token.",
391    "harn.tool.",
392    "harn.worker.",
393];
394
395#[cfg(feature = "otel")]
396fn is_low_cardinality_attr_key(key: &str) -> bool {
397    ALLOWED_SPAN_ATTR_KEYS.contains(&key)
398        || ALLOWED_SPAN_ATTR_PREFIXES
399            .iter()
400            .any(|prefix| key.starts_with(prefix))
401}
402
403#[cfg(feature = "otel")]
404fn otel_span_end_attributes(
405    metadata: &BTreeMap<String, serde_json::Value>,
406) -> Vec<(String, String)> {
407    let policy = crate::redact::current_policy();
408    let mut attributes = Vec::new();
409    let mut meta_json = BTreeMap::new();
410
411    for (key, value) in metadata {
412        if is_low_cardinality_attr_key(key) {
413            let raw = format!("{value}");
414            let redacted = policy.redact_string(&raw).into_owned();
415            attributes.push((key.clone(), redacted));
416        } else {
417            meta_json.insert(key.clone(), value.clone());
418        }
419    }
420
421    if !meta_json.is_empty() {
422        let raw = serde_json::to_string(&meta_json).unwrap_or_else(|_| "{}".to_string());
423        let redacted = policy.redact_string(&raw).into_owned();
424        attributes.push(("harn.meta_json".to_string(), redacted));
425    }
426
427    attributes
428}
429
430/// Idempotency guard for [`install_otel_sink_from_env`]. The first
431/// caller wins; later ones become no-ops returning `Ok(false)`. The
432/// stored provider keeps the batch processor's runtime alive — and
433/// gives [`shutdown_otel_sink`] a handle to flush before the host
434/// tokio runtime exits.
435#[cfg(feature = "otel")]
436static OTEL_PROVIDER: std::sync::OnceLock<
437    std::sync::Mutex<Option<opentelemetry_sdk::trace::SdkTracerProvider>>,
438> = std::sync::OnceLock::new();
439
440/// Register an [`OtelSink`] into the thread-local event sink chain when
441/// the environment is configured for OTLP export.
442///
443/// Returns `Ok(true)` when a sink was installed, `Ok(false)` when no
444/// OTLP endpoint is configured (or when a sink has already been
445/// installed for this process), and `Err` when the exporter failed to
446/// initialise.
447///
448/// The exporter activates iff at least one of `HARN_OTEL_ENDPOINT` or
449/// the standard `OTEL_EXPORTER_OTLP_ENDPOINT` is non-empty. Service
450/// name comes from `HARN_OTEL_SERVICE_NAME` → `OTEL_SERVICE_NAME` →
451/// `"harn"` (in that order). Headers come from `HARN_OTEL_HEADERS` →
452/// `OTEL_EXPORTER_OTLP_HEADERS` (comma/semicolon-separated
453/// `name=value` pairs).
454///
455/// Hosts (`harn run`, `harn serve acp`, embedders like Burin) should
456/// call this once near process startup so any spans emitted during the
457/// session land at the configured collector.
458#[cfg(feature = "otel")]
459pub fn install_otel_sink_from_env() -> Result<bool, String> {
460    if otel_endpoint_from_env().is_none() {
461        return Ok(false);
462    }
463    let provider_slot = OTEL_PROVIDER.get_or_init(|| std::sync::Mutex::new(None));
464    {
465        let guard = provider_slot.lock().expect("otel provider mutex poisoned");
466        if guard.is_some() {
467            // A sink was already installed in this process. Don't
468            // double-register; the existing one will keep emitting.
469            return Ok(false);
470        }
471    }
472    let sink = OtelSink::new()?;
473    let provider = sink.provider.clone();
474    add_event_sink(Rc::new(sink));
475    provider_slot
476        .lock()
477        .expect("otel provider mutex poisoned")
478        .replace(provider);
479    Ok(true)
480}
481
482/// Flush and tear down the auto-registered OTel sink. Hosts that
483/// shut down their tokio runtime before process exit must call this
484/// while the runtime is still alive — `BatchSpanProcessor` needs a
485/// reactor to drain queued exports, and the [`Drop`] impl on
486/// [`OtelSink`] otherwise runs after the runtime is gone. Safe to
487/// call when no sink was installed.
488///
489/// Returns `Ok(true)` when a provider was flushed, `Ok(false)` when
490/// none was installed, and `Err` when the SDK reported an export or
491/// shutdown error. Errors are advisory — long-running hosts should
492/// log and continue.
493#[cfg(feature = "otel")]
494pub fn shutdown_otel_sink() -> Result<bool, String> {
495    let Some(slot) = OTEL_PROVIDER.get() else {
496        return Ok(false);
497    };
498    let provider = {
499        let mut guard = slot.lock().expect("otel provider mutex poisoned");
500        guard.take()
501    };
502    let Some(provider) = provider else {
503        return Ok(false);
504    };
505    provider
506        .force_flush()
507        .map_err(|error| format!("OTel force_flush failed: {error}"))?;
508    provider
509        .shutdown()
510        .map_err(|error| format!("OTel shutdown failed: {error}"))?;
511    Ok(true)
512}
513
514/// No-op stub for builds compiled without the `otel` feature. Returns
515/// `Ok(false)` so call sites can use the same code path on either
516/// build.
517#[cfg(not(feature = "otel"))]
518pub fn install_otel_sink_from_env() -> Result<bool, String> {
519    Ok(false)
520}
521
522#[cfg(not(feature = "otel"))]
523pub fn shutdown_otel_sink() -> Result<bool, String> {
524    Ok(false)
525}
526
527#[cfg(feature = "otel")]
528impl EventSink for OtelSink {
529    fn emit_log(&self, event: &LogEvent) {
530        use opentelemetry::trace::{Tracer, TracerProvider};
531        let tracer = self.provider.tracer("harn");
532        // Apply the unified redaction policy to attribute values
533        // before they leave the process. The active policy includes
534        // the OAuth token catalog (HARN-OAU-001) so a Bearer header
535        // or provider token that snuck into a log message gets
536        // scrubbed before it lands at the OTel collector.
537        let policy = crate::redact::current_policy();
538        // Log events are zero-duration spans — start and immediately drop.
539        let _span = tracer
540            .span_builder(format!("log.{}", event.category))
541            .with_attributes(vec![
542                opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
543                opentelemetry::KeyValue::new(
544                    "message",
545                    policy.redact_string(&event.message).into_owned(),
546                ),
547                opentelemetry::KeyValue::new("category", event.category.clone()),
548            ])
549            .start(&tracer);
550    }
551
552    fn emit_span_start(&self, event: &SpanEvent) {
553        use opentelemetry::trace::{Tracer, TracerProvider};
554        let tracer = self.provider.tracer("harn");
555        let span = tracer
556            .span_builder(event.name.clone())
557            .with_attributes(vec![
558                opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
559                opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
560            ])
561            .start(&tracer);
562        self.active_spans.borrow_mut().insert(event.span_id, span);
563    }
564
565    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
566        use opentelemetry::trace::Span;
567        if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
568            // OTel span attributes are the fourth sink covered by the
569            // OA-06 token-redaction policy (transcripts, audit
570            // receipts, OTel, and system reminders). The helper also
571            // bounds attribute-key cardinality by folding unknown
572            // metadata into `harn.meta_json`.
573            for (key, redacted) in otel_span_end_attributes(metadata) {
574                span.set_attribute(opentelemetry::KeyValue::new(key.clone(), redacted));
575            }
576            span.end();
577        }
578    }
579}
580
581#[cfg(feature = "otel")]
582impl Drop for OtelSink {
583    fn drop(&mut self) {
584        // End any spans that were never closed (abnormal shutdown).
585        self.active_spans.borrow_mut().clear();
586        let _ = self.provider.shutdown();
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593
594    #[test]
595    fn test_collector_sink_captures_logs() {
596        let sink = Rc::new(CollectorSink::new());
597        clear_event_sinks();
598        add_event_sink(sink.clone());
599
600        log_info("llm", "test message");
601        log_warn("llm.cost", "cost warning");
602        log_error("llm.agent", "agent error");
603
604        let logs = sink.logs.borrow();
605        assert_eq!(logs.len(), 3);
606        assert_eq!(logs[0].level, EventLevel::Info);
607        assert_eq!(logs[0].category, "llm");
608        assert_eq!(logs[0].message, "test message");
609        assert_eq!(logs[1].level, EventLevel::Warn);
610        assert_eq!(logs[2].level, EventLevel::Error);
611
612        // Restore default sinks for other tests.
613        reset_event_sinks();
614    }
615
616    #[test]
617    fn test_collector_sink_captures_spans() {
618        let sink = Rc::new(CollectorSink::new());
619        clear_event_sinks();
620        add_event_sink(sink.clone());
621
622        emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
623        emit_span_end(1, BTreeMap::new());
624
625        let spans = sink.spans.borrow();
626        assert_eq!(spans.len(), 1);
627        assert_eq!(spans[0].span_id, 1);
628        assert_eq!(spans[0].name, "agent_loop");
629
630        reset_event_sinks();
631    }
632
633    #[test]
634    fn test_stderr_sink_does_not_panic() {
635        let sink = StderrSink;
636        let event = LogEvent {
637            level: EventLevel::Warn,
638            category: "test".into(),
639            message: "hello".into(),
640            metadata: BTreeMap::new(),
641        };
642        sink.emit_log(&event);
643        sink.emit_span_start(&SpanEvent {
644            span_id: 1,
645            parent_id: None,
646            name: "x".into(),
647            kind: "y".into(),
648            metadata: BTreeMap::new(),
649        });
650        sink.emit_span_end(1, &BTreeMap::new());
651    }
652
653    #[test]
654    fn test_multiple_sinks() {
655        let a = Rc::new(CollectorSink::new());
656        let b = Rc::new(CollectorSink::new());
657        clear_event_sinks();
658        add_event_sink(a.clone());
659        add_event_sink(b.clone());
660
661        log_debug("x", "msg");
662
663        assert_eq!(a.logs.borrow().len(), 1);
664        assert_eq!(b.logs.borrow().len(), 1);
665
666        reset_event_sinks();
667    }
668
669    #[test]
670    fn test_log_with_metadata() {
671        let sink = Rc::new(CollectorSink::new());
672        clear_event_sinks();
673        add_event_sink(sink.clone());
674
675        let mut meta = BTreeMap::new();
676        meta.insert("tokens".into(), serde_json::json!(42));
677        log_info_meta("llm", "token usage", meta);
678
679        let logs = sink.logs.borrow();
680        assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
681
682        reset_event_sinks();
683    }
684
685    #[cfg(feature = "otel")]
686    #[derive(Default)]
687    struct SpanAttrCollectorSink {
688        attrs: RefCell<Vec<(String, String)>>,
689    }
690
691    #[cfg(feature = "otel")]
692    impl EventSink for SpanAttrCollectorSink {
693        fn emit_log(&self, _event: &LogEvent) {}
694
695        fn emit_span_start(&self, _event: &SpanEvent) {}
696
697        fn emit_span_end(&self, _span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
698            self.attrs
699                .borrow_mut()
700                .extend(otel_span_end_attributes(metadata));
701        }
702    }
703
704    #[cfg(feature = "otel")]
705    #[test]
706    fn span_attr_keys_are_low_cardinality() {
707        let sink = Rc::new(SpanAttrCollectorSink::default());
708        clear_event_sinks();
709        add_event_sink(sink.clone());
710
711        let rogue_key = "request.550e8400-e29b-41d4-a716-446655440000";
712        let mut metadata = BTreeMap::new();
713        metadata.insert("harn.kind".to_string(), serde_json::json!("llm_call"));
714        metadata.insert(rogue_key.to_string(), serde_json::json!("rogue-value"));
715
716        emit_span_end(42, metadata);
717        reset_event_sinks();
718
719        let attrs = sink.attrs.borrow();
720        assert!(
721            attrs
722                .iter()
723                .any(|(key, value)| key == "harn.kind" && value.contains("llm_call")),
724            "allowlisted harn.kind should remain a top-level OTel attribute: {attrs:?}",
725        );
726        assert!(
727            !attrs.iter().any(|(key, _)| key == rogue_key),
728            "rogue metadata key must not become a top-level OTel attribute: {attrs:?}",
729        );
730        let (_, meta_json) = attrs
731            .iter()
732            .find(|(key, _)| key == "harn.meta_json")
733            .expect("rogue metadata should be folded into harn.meta_json");
734        let blob: serde_json::Value =
735            serde_json::from_str(meta_json).expect("harn.meta_json should stay JSON");
736        assert_eq!(blob[rogue_key], serde_json::json!("rogue-value"));
737    }
738
739    #[cfg(feature = "otel")]
740    mod otel_env {
741        use super::super::*;
742        use std::sync::{Mutex, MutexGuard, OnceLock};
743
744        /// Serializes env-mutating tests in this module. Crate-wide
745        /// `crate::llm::env_lock()` is reserved for LLM env scopes; a
746        /// dedicated lock here keeps these tests independent.
747        fn lock() -> MutexGuard<'static, ()> {
748            static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
749            LOCK.get_or_init(|| Mutex::new(()))
750                .lock()
751                .expect("otel env lock")
752        }
753
754        /// RAII guard for a single env var. Saves the prior value on
755        /// construction and restores it on Drop so parallel tests in
756        /// the same process don't leak state.
757        struct ScopedEnvVar {
758            key: &'static str,
759            previous: Option<String>,
760        }
761
762        impl ScopedEnvVar {
763            fn set(key: &'static str, value: &str) -> Self {
764                let previous = std::env::var(key).ok();
765                // SAFETY: env mutation is serialized by the test-level
766                // `lock()` above; no other thread inspects these
767                // variables while a guard is alive.
768                unsafe { std::env::set_var(key, value) };
769                Self { key, previous }
770            }
771
772            fn remove(key: &'static str) -> Self {
773                let previous = std::env::var(key).ok();
774                // SAFETY: see `set` above.
775                unsafe { std::env::remove_var(key) };
776                Self { key, previous }
777            }
778        }
779
780        impl Drop for ScopedEnvVar {
781            fn drop(&mut self) {
782                // SAFETY: see `set` above. Restoration happens while the
783                // test still holds the module lock.
784                match &self.previous {
785                    Some(value) => unsafe { std::env::set_var(self.key, value) },
786                    None => unsafe { std::env::remove_var(self.key) },
787                }
788            }
789        }
790
791        #[test]
792        fn install_returns_false_when_endpoint_unset() {
793            let _guard = lock();
794            let _endpoint = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
795            let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
796
797            let installed = install_otel_sink_from_env()
798                .expect("install must not error when endpoint is unset");
799            assert!(!installed, "expected no sink registration without endpoint");
800        }
801
802        #[test]
803        fn endpoint_helper_prefers_harn_variable() {
804            let _guard = lock();
805            let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", "http://harn.example.test:4318");
806            let _standard = ScopedEnvVar::set(
807                "OTEL_EXPORTER_OTLP_ENDPOINT",
808                "http://generic.example.test:4318",
809            );
810
811            assert_eq!(
812                otel_endpoint_from_env().as_deref(),
813                Some("http://harn.example.test:4318"),
814            );
815        }
816
817        #[test]
818        fn endpoint_helper_falls_back_to_standard_variable() {
819            let _guard = lock();
820            let _harn = ScopedEnvVar::remove("HARN_OTEL_ENDPOINT");
821            let _standard = ScopedEnvVar::set(
822                "OTEL_EXPORTER_OTLP_ENDPOINT",
823                "http://generic.example.test:4318",
824            );
825
826            assert_eq!(
827                otel_endpoint_from_env().as_deref(),
828                Some("http://generic.example.test:4318"),
829            );
830        }
831
832        #[test]
833        fn endpoint_helper_ignores_whitespace_only_values() {
834            let _guard = lock();
835            let _harn = ScopedEnvVar::set("HARN_OTEL_ENDPOINT", "   ");
836            let _standard = ScopedEnvVar::remove("OTEL_EXPORTER_OTLP_ENDPOINT");
837
838            assert!(otel_endpoint_from_env().is_none());
839        }
840
841        #[test]
842        fn service_name_helper_layers_defaults() {
843            let _guard = lock();
844            let _harn = ScopedEnvVar::remove("HARN_OTEL_SERVICE_NAME");
845            let _standard = ScopedEnvVar::remove("OTEL_SERVICE_NAME");
846            assert_eq!(otel_service_name_from_env(), "harn");
847
848            let _standard = ScopedEnvVar::set("OTEL_SERVICE_NAME", "burin-code");
849            assert_eq!(otel_service_name_from_env(), "burin-code");
850
851            let _harn = ScopedEnvVar::set("HARN_OTEL_SERVICE_NAME", "burin-tui");
852            assert_eq!(otel_service_name_from_env(), "burin-tui");
853        }
854
855        #[test]
856        fn headers_helper_parses_comma_separated_pairs() {
857            let _guard = lock();
858            let _harn = ScopedEnvVar::set(
859                "HARN_OTEL_HEADERS",
860                "x-honeycomb-team=abc123, x-other=val ,blank=",
861            );
862
863            let headers = otel_headers_from_env();
864            assert_eq!(
865                headers.get("x-honeycomb-team").map(String::as_str),
866                Some("abc123"),
867            );
868            assert_eq!(headers.get("x-other").map(String::as_str), Some("val"));
869            assert!(
870                !headers.contains_key("blank"),
871                "empty values must be dropped to match the orchestrator helper",
872            );
873        }
874
875        #[test]
876        fn normalize_endpoint_appends_traces_path_when_missing() {
877            assert_eq!(
878                normalize_otlp_traces_endpoint("http://localhost:4318"),
879                "http://localhost:4318/v1/traces",
880            );
881            assert_eq!(
882                normalize_otlp_traces_endpoint("http://localhost:4318/"),
883                "http://localhost:4318/v1/traces",
884            );
885            assert_eq!(
886                normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces"),
887                "http://localhost:4318/v1/traces",
888            );
889            assert_eq!(
890                normalize_otlp_traces_endpoint("http://localhost:4318/v1/traces/"),
891                "http://localhost:4318/v1/traces",
892            );
893        }
894    }
895
896    #[cfg(not(feature = "otel"))]
897    #[test]
898    fn install_otel_sink_returns_ok_false_on_non_otel_builds() {
899        let installed = install_otel_sink_from_env().expect("non-otel stub never errors");
900        assert!(!installed);
901    }
902}