Skip to main content

harn_vm/
events.rs

1//! Structured event emission for observability.
2//!
3//! Provides an `EventSink` trait and a thread-local sink registry so that the
4//! VM (and especially the LLM layer) can emit structured log and span events
5//! instead of raw `eprintln!` calls.  Consumers register one or more sinks;
6//! the default `StderrSink` preserves backward-compatible stderr output.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11
12/// Severity level for log events.
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum EventLevel {
15    Trace,
16    Debug,
17    Info,
18    Warn,
19    Error,
20}
21
22/// A structured log event.
23#[derive(Clone, Debug)]
24pub struct LogEvent {
25    pub level: EventLevel,
26    pub category: String,
27    pub message: String,
28    pub metadata: BTreeMap<String, serde_json::Value>,
29}
30
31/// A structured span event (start or end).
32#[derive(Clone, Debug)]
33pub struct SpanEvent {
34    pub span_id: u64,
35    pub parent_id: Option<u64>,
36    pub name: String,
37    pub kind: String,
38    pub metadata: BTreeMap<String, serde_json::Value>,
39}
40
41/// Trait for receiving structured events from the VM.
42pub trait EventSink {
43    fn emit_log(&self, event: &LogEvent);
44    fn emit_span_start(&self, event: &SpanEvent);
45    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
46}
47
48/// Default sink that writes formatted output to stderr.
49pub struct StderrSink;
50
51impl EventSink for StderrSink {
52    fn emit_log(&self, event: &LogEvent) {
53        let level_str = match event.level {
54            EventLevel::Trace => "TRACE",
55            EventLevel::Debug => "DEBUG",
56            EventLevel::Info => "INFO",
57            EventLevel::Warn => "WARN",
58            EventLevel::Error => "ERROR",
59        };
60        // "[harn]" prefix for warn/error is relied on by downstream
61        // tooling and tests that parse stderr.
62        match event.level {
63            EventLevel::Warn => {
64                eprintln!("[harn] warning: {}", event.message);
65            }
66            EventLevel::Error => {
67                eprintln!("[harn] error: {}", event.message);
68            }
69            _ => {
70                eprintln!("[{level_str}] [{}] {}", event.category, event.message);
71            }
72        }
73    }
74
75    fn emit_span_start(&self, _event: &SpanEvent) {
76        // Silent by default — spans are for observability backends.
77    }
78
79    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
80}
81
82/// A sink that collects events for later retrieval (testing, inspection).
83pub struct CollectorSink {
84    pub logs: RefCell<Vec<LogEvent>>,
85    pub spans: RefCell<Vec<SpanEvent>>,
86}
87
88impl CollectorSink {
89    pub fn new() -> Self {
90        Self {
91            logs: RefCell::new(Vec::new()),
92            spans: RefCell::new(Vec::new()),
93        }
94    }
95}
96
97impl Default for CollectorSink {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl EventSink for CollectorSink {
104    fn emit_log(&self, event: &LogEvent) {
105        self.logs.borrow_mut().push(event.clone());
106    }
107
108    fn emit_span_start(&self, event: &SpanEvent) {
109        self.spans.borrow_mut().push(event.clone());
110    }
111
112    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
113}
114
115thread_local! {
116    static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
117}
118
119/// Register an additional event sink.
120pub fn add_event_sink(sink: Rc<dyn EventSink>) {
121    EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
122}
123
124/// Remove all sinks (including the default `StderrSink`).
125pub fn clear_event_sinks() {
126    EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
127}
128
129/// Reset sinks to just the default `StderrSink`.
130pub fn reset_event_sinks() {
131    EVENT_SINKS.with(|sinks| {
132        let mut s = sinks.borrow_mut();
133        s.clear();
134        s.push(Rc::new(StderrSink));
135    });
136}
137
138/// Emit a structured log event to all registered sinks.
139pub fn emit_log(
140    level: EventLevel,
141    category: &str,
142    message: &str,
143    metadata: BTreeMap<String, serde_json::Value>,
144) {
145    let event = LogEvent {
146        level,
147        category: category.to_string(),
148        message: message.to_string(),
149        metadata,
150    };
151    EVENT_SINKS.with(|sinks| {
152        for sink in sinks.borrow().iter() {
153            sink.emit_log(&event);
154        }
155    });
156}
157
158/// Emit a span-start event to all registered sinks.
159pub fn emit_span_start(
160    span_id: u64,
161    parent_id: Option<u64>,
162    name: &str,
163    kind: &str,
164    metadata: BTreeMap<String, serde_json::Value>,
165) {
166    let event = SpanEvent {
167        span_id,
168        parent_id,
169        name: name.to_string(),
170        kind: kind.to_string(),
171        metadata,
172    };
173    EVENT_SINKS.with(|sinks| {
174        for sink in sinks.borrow().iter() {
175            sink.emit_span_start(&event);
176        }
177    });
178}
179
180/// Emit a span-end event to all registered sinks.
181pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
182    EVENT_SINKS.with(|sinks| {
183        for sink in sinks.borrow().iter() {
184            sink.emit_span_end(span_id, &metadata);
185        }
186    });
187}
188
189/// Log at Info level with no metadata.
190pub fn log_info(category: &str, message: &str) {
191    emit_log(EventLevel::Info, category, message, BTreeMap::new());
192}
193
194/// Log at Warn level with no metadata.
195pub fn log_warn(category: &str, message: &str) {
196    emit_log(EventLevel::Warn, category, message, BTreeMap::new());
197}
198
199/// Log at Error level with no metadata.
200pub fn log_error(category: &str, message: &str) {
201    emit_log(EventLevel::Error, category, message, BTreeMap::new());
202}
203
204/// Log at Debug level with no metadata.
205pub fn log_debug(category: &str, message: &str) {
206    emit_log(EventLevel::Debug, category, message, BTreeMap::new());
207}
208
209/// Log at Info level with metadata.
210pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
211    emit_log(EventLevel::Info, category, message, metadata);
212}
213
214/// Log at Warn level with metadata.
215pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
216    emit_log(EventLevel::Warn, category, message, metadata);
217}
218
219/// OpenTelemetry exporter sink. Requires the `otel` feature flag.
220/// Forwards Harn log events and span lifecycle to OTLP collectors.
221///
222/// Active spans are stored keyed by Harn's `span_id` so that
223/// `emit_span_end` can close the correct OTel span.
224#[cfg(feature = "otel")]
225pub struct OtelSink {
226    provider: opentelemetry_sdk::trace::SdkTracerProvider,
227    active_spans:
228        std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
229}
230
231#[cfg(feature = "otel")]
232impl OtelSink {
233    /// Create a new OTel sink. Initialises the OTLP span exporter
234    /// (default endpoint via OTEL_EXPORTER_OTLP_ENDPOINT, or localhost:4318).
235    pub fn new() -> Result<Self, String> {
236        use opentelemetry_otlp::SpanExporter;
237        use opentelemetry_sdk::trace::SdkTracerProvider;
238
239        let exporter = SpanExporter::builder()
240            .with_http()
241            .build()
242            .map_err(|e| format!("OTel span exporter init failed: {e}"))?;
243
244        let provider = SdkTracerProvider::builder()
245            .with_batch_exporter(exporter)
246            .build();
247
248        opentelemetry::global::set_tracer_provider(provider.clone());
249
250        Ok(Self {
251            provider,
252            active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
253        })
254    }
255}
256
257#[cfg(feature = "otel")]
258impl EventSink for OtelSink {
259    fn emit_log(&self, event: &LogEvent) {
260        use opentelemetry::trace::{Tracer, TracerProvider};
261        let tracer = self.provider.tracer("harn");
262        // Log events are zero-duration spans — start and immediately drop.
263        let _span = tracer
264            .span_builder(format!("log.{}", event.category))
265            .with_attributes(vec![
266                opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
267                opentelemetry::KeyValue::new("message", event.message.clone()),
268                opentelemetry::KeyValue::new("category", event.category.clone()),
269            ])
270            .start(&tracer);
271    }
272
273    fn emit_span_start(&self, event: &SpanEvent) {
274        use opentelemetry::trace::{Tracer, TracerProvider};
275        let tracer = self.provider.tracer("harn");
276        let span = tracer
277            .span_builder(event.name.clone())
278            .with_attributes(vec![
279                opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
280                opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
281            ])
282            .start(&tracer);
283        self.active_spans.borrow_mut().insert(event.span_id, span);
284    }
285
286    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
287        use opentelemetry::trace::Span;
288        if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
289            for (key, value) in metadata {
290                span.set_attribute(opentelemetry::KeyValue::new(
291                    key.clone(),
292                    format!("{value}"),
293                ));
294            }
295            span.end();
296        }
297    }
298}
299
300#[cfg(feature = "otel")]
301impl Drop for OtelSink {
302    fn drop(&mut self) {
303        // End any spans that were never closed (abnormal shutdown).
304        self.active_spans.borrow_mut().clear();
305        let _ = self.provider.shutdown();
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[test]
314    fn test_collector_sink_captures_logs() {
315        let sink = Rc::new(CollectorSink::new());
316        clear_event_sinks();
317        add_event_sink(sink.clone());
318
319        log_info("llm", "test message");
320        log_warn("llm.cost", "cost warning");
321        log_error("llm.agent", "agent error");
322
323        let logs = sink.logs.borrow();
324        assert_eq!(logs.len(), 3);
325        assert_eq!(logs[0].level, EventLevel::Info);
326        assert_eq!(logs[0].category, "llm");
327        assert_eq!(logs[0].message, "test message");
328        assert_eq!(logs[1].level, EventLevel::Warn);
329        assert_eq!(logs[2].level, EventLevel::Error);
330
331        // Restore default sinks for other tests.
332        reset_event_sinks();
333    }
334
335    #[test]
336    fn test_collector_sink_captures_spans() {
337        let sink = Rc::new(CollectorSink::new());
338        clear_event_sinks();
339        add_event_sink(sink.clone());
340
341        emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
342        emit_span_end(1, BTreeMap::new());
343
344        let spans = sink.spans.borrow();
345        assert_eq!(spans.len(), 1);
346        assert_eq!(spans[0].span_id, 1);
347        assert_eq!(spans[0].name, "agent_loop");
348
349        reset_event_sinks();
350    }
351
352    #[test]
353    fn test_stderr_sink_does_not_panic() {
354        let sink = StderrSink;
355        let event = LogEvent {
356            level: EventLevel::Warn,
357            category: "test".into(),
358            message: "hello".into(),
359            metadata: BTreeMap::new(),
360        };
361        sink.emit_log(&event);
362        sink.emit_span_start(&SpanEvent {
363            span_id: 1,
364            parent_id: None,
365            name: "x".into(),
366            kind: "y".into(),
367            metadata: BTreeMap::new(),
368        });
369        sink.emit_span_end(1, &BTreeMap::new());
370    }
371
372    #[test]
373    fn test_multiple_sinks() {
374        let a = Rc::new(CollectorSink::new());
375        let b = Rc::new(CollectorSink::new());
376        clear_event_sinks();
377        add_event_sink(a.clone());
378        add_event_sink(b.clone());
379
380        log_debug("x", "msg");
381
382        assert_eq!(a.logs.borrow().len(), 1);
383        assert_eq!(b.logs.borrow().len(), 1);
384
385        reset_event_sinks();
386    }
387
388    #[test]
389    fn test_log_with_metadata() {
390        let sink = Rc::new(CollectorSink::new());
391        clear_event_sinks();
392        add_event_sink(sink.clone());
393
394        let mut meta = BTreeMap::new();
395        meta.insert("tokens".into(), serde_json::json!(42));
396        log_info_meta("llm", "token usage", meta);
397
398        let logs = sink.logs.borrow();
399        assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
400
401        reset_event_sinks();
402    }
403}