Skip to main content

harn_vm/
events.rs

1//! Structured event emission for observability.
2//!
3//! Provides an `EventSink` trait and a thread-local sink registry so that the
4//! VM (and especially the LLM layer) can emit structured log and span events
5//! instead of raw `eprintln!` calls.  Consumers register one or more sinks;
6//! the default `StderrSink` preserves backward-compatible stderr output.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11
12// =============================================================================
13// Event types
14// =============================================================================
15
16/// Severity level for log events.
17#[derive(Clone, Copy, Debug, PartialEq, Eq)]
18pub enum EventLevel {
19    Trace,
20    Debug,
21    Info,
22    Warn,
23    Error,
24}
25
26/// A structured log event.
27#[derive(Clone, Debug)]
28pub struct LogEvent {
29    pub level: EventLevel,
30    pub category: String,
31    pub message: String,
32    pub metadata: BTreeMap<String, serde_json::Value>,
33}
34
35/// A structured span event (start or end).
36#[derive(Clone, Debug)]
37pub struct SpanEvent {
38    pub span_id: u64,
39    pub parent_id: Option<u64>,
40    pub name: String,
41    pub kind: String,
42    pub metadata: BTreeMap<String, serde_json::Value>,
43}
44
45// =============================================================================
46// EventSink trait
47// =============================================================================
48
49/// Trait for receiving structured events from the VM.
50pub trait EventSink {
51    fn emit_log(&self, event: &LogEvent);
52    fn emit_span_start(&self, event: &SpanEvent);
53    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
54}
55
56// =============================================================================
57// StderrSink — default, backward-compatible
58// =============================================================================
59
60/// Default sink that writes formatted output to stderr (preserves current behavior).
61pub struct StderrSink;
62
63impl EventSink for StderrSink {
64    fn emit_log(&self, event: &LogEvent) {
65        let level_str = match event.level {
66            EventLevel::Trace => "TRACE",
67            EventLevel::Debug => "DEBUG",
68            EventLevel::Info => "INFO",
69            EventLevel::Warn => "WARN",
70            EventLevel::Error => "ERROR",
71        };
72        // Preserve the existing "[harn]" prefix style for warn/error so
73        // that downstream tooling and tests that parse stderr are unaffected.
74        match event.level {
75            EventLevel::Warn => {
76                eprintln!("[harn] warning: {}", event.message);
77            }
78            EventLevel::Error => {
79                eprintln!("[harn] error: {}", event.message);
80            }
81            _ => {
82                eprintln!("[{level_str}] [{}] {}", event.category, event.message);
83            }
84        }
85    }
86
87    fn emit_span_start(&self, _event: &SpanEvent) {
88        // Silent by default — spans are for observability backends.
89    }
90
91    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {
92        // Silent by default.
93    }
94}
95
96// =============================================================================
97// CollectorSink — for testing and inspection
98// =============================================================================
99
100/// A sink that collects events for later retrieval (testing, inspection).
101pub struct CollectorSink {
102    pub logs: RefCell<Vec<LogEvent>>,
103    pub spans: RefCell<Vec<SpanEvent>>,
104}
105
106impl CollectorSink {
107    pub fn new() -> Self {
108        Self {
109            logs: RefCell::new(Vec::new()),
110            spans: RefCell::new(Vec::new()),
111        }
112    }
113}
114
115impl Default for CollectorSink {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl EventSink for CollectorSink {
122    fn emit_log(&self, event: &LogEvent) {
123        self.logs.borrow_mut().push(event.clone());
124    }
125
126    fn emit_span_start(&self, event: &SpanEvent) {
127        self.spans.borrow_mut().push(event.clone());
128    }
129
130    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {
131        // Could store end events if needed; for now just track starts.
132    }
133}
134
135// =============================================================================
136// Thread-local sink registry
137// =============================================================================
138
139thread_local! {
140    static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
141}
142
143/// Register an additional event sink.
144pub fn add_event_sink(sink: Rc<dyn EventSink>) {
145    EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
146}
147
148/// Remove all sinks (including the default `StderrSink`).
149pub fn clear_event_sinks() {
150    EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
151}
152
153/// Reset sinks to just the default `StderrSink`.
154pub fn reset_event_sinks() {
155    EVENT_SINKS.with(|sinks| {
156        let mut s = sinks.borrow_mut();
157        s.clear();
158        s.push(Rc::new(StderrSink));
159    });
160}
161
162// =============================================================================
163// Emission helpers
164// =============================================================================
165
166/// Emit a structured log event to all registered sinks.
167pub fn emit_log(
168    level: EventLevel,
169    category: &str,
170    message: &str,
171    metadata: BTreeMap<String, serde_json::Value>,
172) {
173    let event = LogEvent {
174        level,
175        category: category.to_string(),
176        message: message.to_string(),
177        metadata,
178    };
179    EVENT_SINKS.with(|sinks| {
180        for sink in sinks.borrow().iter() {
181            sink.emit_log(&event);
182        }
183    });
184}
185
186/// Emit a span-start event to all registered sinks.
187pub fn emit_span_start(
188    span_id: u64,
189    parent_id: Option<u64>,
190    name: &str,
191    kind: &str,
192    metadata: BTreeMap<String, serde_json::Value>,
193) {
194    let event = SpanEvent {
195        span_id,
196        parent_id,
197        name: name.to_string(),
198        kind: kind.to_string(),
199        metadata,
200    };
201    EVENT_SINKS.with(|sinks| {
202        for sink in sinks.borrow().iter() {
203            sink.emit_span_start(&event);
204        }
205    });
206}
207
208/// Emit a span-end event to all registered sinks.
209pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
210    EVENT_SINKS.with(|sinks| {
211        for sink in sinks.borrow().iter() {
212            sink.emit_span_end(span_id, &metadata);
213        }
214    });
215}
216
217// =============================================================================
218// Convenience functions
219// =============================================================================
220
221/// Log at Info level with no metadata.
222pub fn log_info(category: &str, message: &str) {
223    emit_log(EventLevel::Info, category, message, BTreeMap::new());
224}
225
226/// Log at Warn level with no metadata.
227pub fn log_warn(category: &str, message: &str) {
228    emit_log(EventLevel::Warn, category, message, BTreeMap::new());
229}
230
231/// Log at Error level with no metadata.
232pub fn log_error(category: &str, message: &str) {
233    emit_log(EventLevel::Error, category, message, BTreeMap::new());
234}
235
236/// Log at Debug level with no metadata.
237pub fn log_debug(category: &str, message: &str) {
238    emit_log(EventLevel::Debug, category, message, BTreeMap::new());
239}
240
241/// Log at Info level with metadata.
242pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
243    emit_log(EventLevel::Info, category, message, metadata);
244}
245
246/// Log at Warn level with metadata.
247pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
248    emit_log(EventLevel::Warn, category, message, metadata);
249}
250
251// =============================================================================
252// OTel stub (behind feature flag)
253// =============================================================================
254
255/// OpenTelemetry exporter sink. Requires the `otel` feature flag.
256/// Forwards Harn log events and span lifecycle to OTLP collectors.
257///
258/// Active spans are stored keyed by Harn's `span_id` so that
259/// `emit_span_end` can close the correct OTel span.
260#[cfg(feature = "otel")]
261pub struct OtelSink {
262    provider: opentelemetry_sdk::trace::SdkTracerProvider,
263    active_spans:
264        std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
265}
266
267#[cfg(feature = "otel")]
268impl OtelSink {
269    /// Create a new OTel sink. Initialises the OTLP span exporter
270    /// (default endpoint via OTEL_EXPORTER_OTLP_ENDPOINT, or localhost:4318).
271    pub fn new() -> Result<Self, String> {
272        use opentelemetry_otlp::SpanExporter;
273        use opentelemetry_sdk::trace::SdkTracerProvider;
274
275        let exporter = SpanExporter::builder()
276            .with_http()
277            .build()
278            .map_err(|e| format!("OTel span exporter init failed: {e}"))?;
279
280        let provider = SdkTracerProvider::builder()
281            .with_batch_exporter(exporter)
282            .build();
283
284        opentelemetry::global::set_tracer_provider(provider.clone());
285
286        Ok(Self {
287            provider,
288            active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
289        })
290    }
291}
292
293#[cfg(feature = "otel")]
294impl EventSink for OtelSink {
295    fn emit_log(&self, event: &LogEvent) {
296        use opentelemetry::trace::{Tracer, TracerProvider};
297        let tracer = self.provider.tracer("harn");
298        // Log events are zero-duration spans — start and immediately drop.
299        let _span = tracer
300            .span_builder(format!("log.{}", event.category))
301            .with_attributes(vec![
302                opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
303                opentelemetry::KeyValue::new("message", event.message.clone()),
304                opentelemetry::KeyValue::new("category", event.category.clone()),
305            ])
306            .start(&tracer);
307    }
308
309    fn emit_span_start(&self, event: &SpanEvent) {
310        use opentelemetry::trace::{Tracer, TracerProvider};
311        let tracer = self.provider.tracer("harn");
312        let span = tracer
313            .span_builder(event.name.clone())
314            .with_attributes(vec![
315                opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
316                opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
317            ])
318            .start(&tracer);
319        self.active_spans.borrow_mut().insert(event.span_id, span);
320    }
321
322    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
323        use opentelemetry::trace::Span;
324        if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
325            for (key, value) in metadata {
326                span.set_attribute(opentelemetry::KeyValue::new(
327                    key.clone(),
328                    format!("{value}"),
329                ));
330            }
331            span.end();
332        }
333    }
334}
335
336#[cfg(feature = "otel")]
337impl Drop for OtelSink {
338    fn drop(&mut self) {
339        // End any spans that were never closed (abnormal shutdown).
340        self.active_spans.borrow_mut().clear();
341        let _ = self.provider.shutdown();
342    }
343}
344
345// =============================================================================
346// Tests
347// =============================================================================
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352
353    #[test]
354    fn test_collector_sink_captures_logs() {
355        let sink = Rc::new(CollectorSink::new());
356        clear_event_sinks();
357        add_event_sink(sink.clone());
358
359        log_info("llm", "test message");
360        log_warn("llm.cost", "cost warning");
361        log_error("llm.agent", "agent error");
362
363        let logs = sink.logs.borrow();
364        assert_eq!(logs.len(), 3);
365        assert_eq!(logs[0].level, EventLevel::Info);
366        assert_eq!(logs[0].category, "llm");
367        assert_eq!(logs[0].message, "test message");
368        assert_eq!(logs[1].level, EventLevel::Warn);
369        assert_eq!(logs[2].level, EventLevel::Error);
370
371        // Restore default sinks for other tests.
372        reset_event_sinks();
373    }
374
375    #[test]
376    fn test_collector_sink_captures_spans() {
377        let sink = Rc::new(CollectorSink::new());
378        clear_event_sinks();
379        add_event_sink(sink.clone());
380
381        emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
382        emit_span_end(1, BTreeMap::new());
383
384        let spans = sink.spans.borrow();
385        assert_eq!(spans.len(), 1);
386        assert_eq!(spans[0].span_id, 1);
387        assert_eq!(spans[0].name, "agent_loop");
388
389        reset_event_sinks();
390    }
391
392    #[test]
393    fn test_stderr_sink_does_not_panic() {
394        let sink = StderrSink;
395        let event = LogEvent {
396            level: EventLevel::Warn,
397            category: "test".into(),
398            message: "hello".into(),
399            metadata: BTreeMap::new(),
400        };
401        sink.emit_log(&event);
402        sink.emit_span_start(&SpanEvent {
403            span_id: 1,
404            parent_id: None,
405            name: "x".into(),
406            kind: "y".into(),
407            metadata: BTreeMap::new(),
408        });
409        sink.emit_span_end(1, &BTreeMap::new());
410    }
411
412    #[test]
413    fn test_multiple_sinks() {
414        let a = Rc::new(CollectorSink::new());
415        let b = Rc::new(CollectorSink::new());
416        clear_event_sinks();
417        add_event_sink(a.clone());
418        add_event_sink(b.clone());
419
420        log_debug("x", "msg");
421
422        assert_eq!(a.logs.borrow().len(), 1);
423        assert_eq!(b.logs.borrow().len(), 1);
424
425        reset_event_sinks();
426    }
427
428    #[test]
429    fn test_log_with_metadata() {
430        let sink = Rc::new(CollectorSink::new());
431        clear_event_sinks();
432        add_event_sink(sink.clone());
433
434        let mut meta = BTreeMap::new();
435        meta.insert("tokens".into(), serde_json::json!(42));
436        log_info_meta("llm", "token usage", meta);
437
438        let logs = sink.logs.borrow();
439        assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
440
441        reset_event_sinks();
442    }
443}