Skip to main content

harn_vm/
events.rs

1//! Structured event emission for observability.
2//!
3//! Provides an `EventSink` trait and a thread-local sink registry so that the
4//! VM (and especially the LLM layer) can emit structured log and span events
5//! instead of raw `eprintln!` calls.  Consumers register one or more sinks;
6//! the default `StderrSink` preserves backward-compatible stderr output.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11
12// =============================================================================
13// Event types
14// =============================================================================
15
16/// Severity level for log events.
17#[derive(Clone, Copy, Debug, PartialEq, Eq)]
18pub enum EventLevel {
19    Trace,
20    Debug,
21    Info,
22    Warn,
23    Error,
24}
25
26/// A structured log event.
27#[derive(Clone, Debug)]
28pub struct LogEvent {
29    pub level: EventLevel,
30    pub category: String,
31    pub message: String,
32    pub metadata: BTreeMap<String, serde_json::Value>,
33}
34
35/// A structured span event (start or end).
36#[derive(Clone, Debug)]
37pub struct SpanEvent {
38    pub span_id: u64,
39    pub parent_id: Option<u64>,
40    pub name: String,
41    pub kind: String,
42    pub metadata: BTreeMap<String, serde_json::Value>,
43}
44
45// =============================================================================
46// EventSink trait
47// =============================================================================
48
49/// Trait for receiving structured events from the VM.
50pub trait EventSink {
51    fn emit_log(&self, event: &LogEvent);
52    fn emit_span_start(&self, event: &SpanEvent);
53    fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
54}
55
56// =============================================================================
57// StderrSink — default, backward-compatible
58// =============================================================================
59
60/// Default sink that writes formatted output to stderr (preserves current behavior).
61pub struct StderrSink;
62
63impl EventSink for StderrSink {
64    fn emit_log(&self, event: &LogEvent) {
65        let level_str = match event.level {
66            EventLevel::Trace => "TRACE",
67            EventLevel::Debug => "DEBUG",
68            EventLevel::Info => "INFO",
69            EventLevel::Warn => "WARN",
70            EventLevel::Error => "ERROR",
71        };
72        // Preserve the existing "[harn]" prefix style for warn/error so
73        // that downstream tooling and tests that parse stderr are unaffected.
74        match event.level {
75            EventLevel::Warn => {
76                eprintln!("[harn] warning: {}", event.message);
77            }
78            EventLevel::Error => {
79                eprintln!("[harn] error: {}", event.message);
80            }
81            _ => {
82                eprintln!("[{level_str}] [{}] {}", event.category, event.message);
83            }
84        }
85    }
86
87    fn emit_span_start(&self, _event: &SpanEvent) {
88        // Silent by default — spans are for observability backends.
89    }
90
91    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {
92        // Silent by default.
93    }
94}
95
96// =============================================================================
97// CollectorSink — for testing and inspection
98// =============================================================================
99
100/// A sink that collects events for later retrieval (testing, inspection).
101pub struct CollectorSink {
102    pub logs: RefCell<Vec<LogEvent>>,
103    pub spans: RefCell<Vec<SpanEvent>>,
104}
105
106impl CollectorSink {
107    pub fn new() -> Self {
108        Self {
109            logs: RefCell::new(Vec::new()),
110            spans: RefCell::new(Vec::new()),
111        }
112    }
113}
114
115impl Default for CollectorSink {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl EventSink for CollectorSink {
122    fn emit_log(&self, event: &LogEvent) {
123        self.logs.borrow_mut().push(event.clone());
124    }
125
126    fn emit_span_start(&self, event: &SpanEvent) {
127        self.spans.borrow_mut().push(event.clone());
128    }
129
130    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {
131        // Could store end events if needed; for now just track starts.
132    }
133}
134
135// =============================================================================
136// Thread-local sink registry
137// =============================================================================
138
139thread_local! {
140    static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
141}
142
143/// Register an additional event sink.
144pub fn add_event_sink(sink: Rc<dyn EventSink>) {
145    EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
146}
147
148/// Remove all sinks (including the default `StderrSink`).
149pub fn clear_event_sinks() {
150    EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
151}
152
153/// Reset sinks to just the default `StderrSink`.
154pub fn reset_event_sinks() {
155    EVENT_SINKS.with(|sinks| {
156        let mut s = sinks.borrow_mut();
157        s.clear();
158        s.push(Rc::new(StderrSink));
159    });
160}
161
162// =============================================================================
163// Emission helpers
164// =============================================================================
165
166/// Emit a structured log event to all registered sinks.
167pub fn emit_log(
168    level: EventLevel,
169    category: &str,
170    message: &str,
171    metadata: BTreeMap<String, serde_json::Value>,
172) {
173    let event = LogEvent {
174        level,
175        category: category.to_string(),
176        message: message.to_string(),
177        metadata,
178    };
179    EVENT_SINKS.with(|sinks| {
180        for sink in sinks.borrow().iter() {
181            sink.emit_log(&event);
182        }
183    });
184}
185
186/// Emit a span-start event to all registered sinks.
187pub fn emit_span_start(
188    span_id: u64,
189    parent_id: Option<u64>,
190    name: &str,
191    kind: &str,
192    metadata: BTreeMap<String, serde_json::Value>,
193) {
194    let event = SpanEvent {
195        span_id,
196        parent_id,
197        name: name.to_string(),
198        kind: kind.to_string(),
199        metadata,
200    };
201    EVENT_SINKS.with(|sinks| {
202        for sink in sinks.borrow().iter() {
203            sink.emit_span_start(&event);
204        }
205    });
206}
207
208/// Emit a span-end event to all registered sinks.
209pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
210    EVENT_SINKS.with(|sinks| {
211        for sink in sinks.borrow().iter() {
212            sink.emit_span_end(span_id, &metadata);
213        }
214    });
215}
216
217// =============================================================================
218// Convenience functions
219// =============================================================================
220
221/// Log at Info level with no metadata.
222pub fn log_info(category: &str, message: &str) {
223    emit_log(EventLevel::Info, category, message, BTreeMap::new());
224}
225
226/// Log at Warn level with no metadata.
227pub fn log_warn(category: &str, message: &str) {
228    emit_log(EventLevel::Warn, category, message, BTreeMap::new());
229}
230
231/// Log at Error level with no metadata.
232pub fn log_error(category: &str, message: &str) {
233    emit_log(EventLevel::Error, category, message, BTreeMap::new());
234}
235
236/// Log at Debug level with no metadata.
237pub fn log_debug(category: &str, message: &str) {
238    emit_log(EventLevel::Debug, category, message, BTreeMap::new());
239}
240
241/// Log at Info level with metadata.
242pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
243    emit_log(EventLevel::Info, category, message, metadata);
244}
245
246/// Log at Warn level with metadata.
247pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
248    emit_log(EventLevel::Warn, category, message, metadata);
249}
250
251// =============================================================================
252// OTel stub (behind feature flag)
253// =============================================================================
254
255/// OpenTelemetry exporter sink. Requires the `otel` feature flag.
256#[cfg(feature = "otel")]
257pub struct OtelSink {
258    _private: (),
259}
260
261#[cfg(feature = "otel")]
262impl OtelSink {
263    /// Create a new OTel sink. Initialises the OTLP exporter pipeline.
264    pub fn new() -> Result<Self, String> {
265        // TODO: initialise opentelemetry_otlp pipeline
266        Ok(Self { _private: () })
267    }
268}
269
270#[cfg(feature = "otel")]
271impl EventSink for OtelSink {
272    fn emit_log(&self, _event: &LogEvent) {
273        // TODO: forward to OTel log exporter
274    }
275    fn emit_span_start(&self, _event: &SpanEvent) {
276        // TODO: forward to OTel trace exporter
277    }
278    fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {
279        // TODO: forward to OTel trace exporter
280    }
281}
282
283// =============================================================================
284// Tests
285// =============================================================================
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_collector_sink_captures_logs() {
293        let sink = Rc::new(CollectorSink::new());
294        clear_event_sinks();
295        add_event_sink(sink.clone());
296
297        log_info("llm", "test message");
298        log_warn("llm.cost", "cost warning");
299        log_error("llm.agent", "agent error");
300
301        let logs = sink.logs.borrow();
302        assert_eq!(logs.len(), 3);
303        assert_eq!(logs[0].level, EventLevel::Info);
304        assert_eq!(logs[0].category, "llm");
305        assert_eq!(logs[0].message, "test message");
306        assert_eq!(logs[1].level, EventLevel::Warn);
307        assert_eq!(logs[2].level, EventLevel::Error);
308
309        // Restore default sinks for other tests.
310        reset_event_sinks();
311    }
312
313    #[test]
314    fn test_collector_sink_captures_spans() {
315        let sink = Rc::new(CollectorSink::new());
316        clear_event_sinks();
317        add_event_sink(sink.clone());
318
319        emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
320        emit_span_end(1, BTreeMap::new());
321
322        let spans = sink.spans.borrow();
323        assert_eq!(spans.len(), 1);
324        assert_eq!(spans[0].span_id, 1);
325        assert_eq!(spans[0].name, "agent_loop");
326
327        reset_event_sinks();
328    }
329
330    #[test]
331    fn test_stderr_sink_does_not_panic() {
332        let sink = StderrSink;
333        let event = LogEvent {
334            level: EventLevel::Warn,
335            category: "test".into(),
336            message: "hello".into(),
337            metadata: BTreeMap::new(),
338        };
339        sink.emit_log(&event);
340        sink.emit_span_start(&SpanEvent {
341            span_id: 1,
342            parent_id: None,
343            name: "x".into(),
344            kind: "y".into(),
345            metadata: BTreeMap::new(),
346        });
347        sink.emit_span_end(1, &BTreeMap::new());
348    }
349
350    #[test]
351    fn test_multiple_sinks() {
352        let a = Rc::new(CollectorSink::new());
353        let b = Rc::new(CollectorSink::new());
354        clear_event_sinks();
355        add_event_sink(a.clone());
356        add_event_sink(b.clone());
357
358        log_debug("x", "msg");
359
360        assert_eq!(a.logs.borrow().len(), 1);
361        assert_eq!(b.logs.borrow().len(), 1);
362
363        reset_event_sinks();
364    }
365
366    #[test]
367    fn test_log_with_metadata() {
368        let sink = Rc::new(CollectorSink::new());
369        clear_event_sinks();
370        add_event_sink(sink.clone());
371
372        let mut meta = BTreeMap::new();
373        meta.insert("tokens".into(), serde_json::json!(42));
374        log_info_meta("llm", "token usage", meta);
375
376        let logs = sink.logs.borrow();
377        assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
378
379        reset_event_sinks();
380    }
381}