1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum EventLevel {
15 Trace,
16 Debug,
17 Info,
18 Warn,
19 Error,
20}
21
22#[derive(Clone, Debug)]
24pub struct LogEvent {
25 pub level: EventLevel,
26 pub category: String,
27 pub message: String,
28 pub metadata: BTreeMap<String, serde_json::Value>,
29}
30
31#[derive(Clone, Debug)]
33pub struct SpanEvent {
34 pub span_id: u64,
35 pub parent_id: Option<u64>,
36 pub name: String,
37 pub kind: String,
38 pub metadata: BTreeMap<String, serde_json::Value>,
39}
40
41pub trait EventSink {
43 fn emit_log(&self, event: &LogEvent);
44 fn emit_span_start(&self, event: &SpanEvent);
45 fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
46}
47
48pub struct StderrSink;
50
51impl EventSink for StderrSink {
52 fn emit_log(&self, event: &LogEvent) {
53 let level_str = match event.level {
54 EventLevel::Trace => "TRACE",
55 EventLevel::Debug => "DEBUG",
56 EventLevel::Info => "INFO",
57 EventLevel::Warn => "WARN",
58 EventLevel::Error => "ERROR",
59 };
60 match event.level {
63 EventLevel::Warn => {
64 eprintln!("[harn] warning: {}", event.message);
65 }
66 EventLevel::Error => {
67 eprintln!("[harn] error: {}", event.message);
68 }
69 _ => {
70 eprintln!("[{level_str}] [{}] {}", event.category, event.message);
71 }
72 }
73 }
74
75 fn emit_span_start(&self, _event: &SpanEvent) {
76 }
78
79 fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
80}
81
82pub struct CollectorSink {
84 pub logs: RefCell<Vec<LogEvent>>,
85 pub spans: RefCell<Vec<SpanEvent>>,
86}
87
88impl CollectorSink {
89 pub fn new() -> Self {
90 Self {
91 logs: RefCell::new(Vec::new()),
92 spans: RefCell::new(Vec::new()),
93 }
94 }
95}
96
97impl Default for CollectorSink {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl EventSink for CollectorSink {
104 fn emit_log(&self, event: &LogEvent) {
105 self.logs.borrow_mut().push(event.clone());
106 }
107
108 fn emit_span_start(&self, event: &SpanEvent) {
109 self.spans.borrow_mut().push(event.clone());
110 }
111
112 fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {}
113}
114
115thread_local! {
116 static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
117}
118
119pub fn add_event_sink(sink: Rc<dyn EventSink>) {
121 EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
122}
123
124pub fn clear_event_sinks() {
126 EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
127}
128
129pub fn reset_event_sinks() {
131 EVENT_SINKS.with(|sinks| {
132 let mut s = sinks.borrow_mut();
133 s.clear();
134 s.push(Rc::new(StderrSink));
135 });
136}
137
138pub fn emit_log(
140 level: EventLevel,
141 category: &str,
142 message: &str,
143 metadata: BTreeMap<String, serde_json::Value>,
144) {
145 let event = LogEvent {
146 level,
147 category: category.to_string(),
148 message: message.to_string(),
149 metadata,
150 };
151 EVENT_SINKS.with(|sinks| {
152 for sink in sinks.borrow().iter() {
153 sink.emit_log(&event);
154 }
155 });
156}
157
158pub fn emit_span_start(
160 span_id: u64,
161 parent_id: Option<u64>,
162 name: &str,
163 kind: &str,
164 metadata: BTreeMap<String, serde_json::Value>,
165) {
166 let event = SpanEvent {
167 span_id,
168 parent_id,
169 name: name.to_string(),
170 kind: kind.to_string(),
171 metadata,
172 };
173 EVENT_SINKS.with(|sinks| {
174 for sink in sinks.borrow().iter() {
175 sink.emit_span_start(&event);
176 }
177 });
178}
179
180pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
182 EVENT_SINKS.with(|sinks| {
183 for sink in sinks.borrow().iter() {
184 sink.emit_span_end(span_id, &metadata);
185 }
186 });
187}
188
189pub fn log_info(category: &str, message: &str) {
191 emit_log(EventLevel::Info, category, message, BTreeMap::new());
192}
193
194pub fn log_warn(category: &str, message: &str) {
196 emit_log(EventLevel::Warn, category, message, BTreeMap::new());
197}
198
199pub fn log_error(category: &str, message: &str) {
201 emit_log(EventLevel::Error, category, message, BTreeMap::new());
202}
203
204pub fn log_debug(category: &str, message: &str) {
206 emit_log(EventLevel::Debug, category, message, BTreeMap::new());
207}
208
209pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
211 emit_log(EventLevel::Info, category, message, metadata);
212}
213
214pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
216 emit_log(EventLevel::Warn, category, message, metadata);
217}
218
219#[cfg(feature = "otel")]
225pub struct OtelSink {
226 provider: opentelemetry_sdk::trace::SdkTracerProvider,
227 active_spans:
228 std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
229}
230
231#[cfg(feature = "otel")]
232impl OtelSink {
233 pub fn new() -> Result<Self, String> {
236 use opentelemetry_otlp::SpanExporter;
237 use opentelemetry_sdk::trace::SdkTracerProvider;
238
239 let exporter = SpanExporter::builder()
240 .with_http()
241 .build()
242 .map_err(|e| format!("OTel span exporter init failed: {e}"))?;
243
244 let provider = SdkTracerProvider::builder()
245 .with_batch_exporter(exporter)
246 .build();
247
248 opentelemetry::global::set_tracer_provider(provider.clone());
249
250 Ok(Self {
251 provider,
252 active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
253 })
254 }
255}
256
257#[cfg(feature = "otel")]
258impl EventSink for OtelSink {
259 fn emit_log(&self, event: &LogEvent) {
260 use opentelemetry::trace::{Tracer, TracerProvider};
261 let tracer = self.provider.tracer("harn");
262 let _span = tracer
264 .span_builder(format!("log.{}", event.category))
265 .with_attributes(vec![
266 opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
267 opentelemetry::KeyValue::new("message", event.message.clone()),
268 opentelemetry::KeyValue::new("category", event.category.clone()),
269 ])
270 .start(&tracer);
271 }
272
273 fn emit_span_start(&self, event: &SpanEvent) {
274 use opentelemetry::trace::{Tracer, TracerProvider};
275 let tracer = self.provider.tracer("harn");
276 let span = tracer
277 .span_builder(event.name.clone())
278 .with_attributes(vec![
279 opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
280 opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
281 ])
282 .start(&tracer);
283 self.active_spans.borrow_mut().insert(event.span_id, span);
284 }
285
286 fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
287 use opentelemetry::trace::Span;
288 if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
289 for (key, value) in metadata {
290 span.set_attribute(opentelemetry::KeyValue::new(
291 key.clone(),
292 format!("{value}"),
293 ));
294 }
295 span.end();
296 }
297 }
298}
299
300#[cfg(feature = "otel")]
301impl Drop for OtelSink {
302 fn drop(&mut self) {
303 self.active_spans.borrow_mut().clear();
305 let _ = self.provider.shutdown();
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[test]
314 fn test_collector_sink_captures_logs() {
315 let sink = Rc::new(CollectorSink::new());
316 clear_event_sinks();
317 add_event_sink(sink.clone());
318
319 log_info("llm", "test message");
320 log_warn("llm.cost", "cost warning");
321 log_error("llm.agent", "agent error");
322
323 let logs = sink.logs.borrow();
324 assert_eq!(logs.len(), 3);
325 assert_eq!(logs[0].level, EventLevel::Info);
326 assert_eq!(logs[0].category, "llm");
327 assert_eq!(logs[0].message, "test message");
328 assert_eq!(logs[1].level, EventLevel::Warn);
329 assert_eq!(logs[2].level, EventLevel::Error);
330
331 reset_event_sinks();
333 }
334
335 #[test]
336 fn test_collector_sink_captures_spans() {
337 let sink = Rc::new(CollectorSink::new());
338 clear_event_sinks();
339 add_event_sink(sink.clone());
340
341 emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
342 emit_span_end(1, BTreeMap::new());
343
344 let spans = sink.spans.borrow();
345 assert_eq!(spans.len(), 1);
346 assert_eq!(spans[0].span_id, 1);
347 assert_eq!(spans[0].name, "agent_loop");
348
349 reset_event_sinks();
350 }
351
352 #[test]
353 fn test_stderr_sink_does_not_panic() {
354 let sink = StderrSink;
355 let event = LogEvent {
356 level: EventLevel::Warn,
357 category: "test".into(),
358 message: "hello".into(),
359 metadata: BTreeMap::new(),
360 };
361 sink.emit_log(&event);
362 sink.emit_span_start(&SpanEvent {
363 span_id: 1,
364 parent_id: None,
365 name: "x".into(),
366 kind: "y".into(),
367 metadata: BTreeMap::new(),
368 });
369 sink.emit_span_end(1, &BTreeMap::new());
370 }
371
372 #[test]
373 fn test_multiple_sinks() {
374 let a = Rc::new(CollectorSink::new());
375 let b = Rc::new(CollectorSink::new());
376 clear_event_sinks();
377 add_event_sink(a.clone());
378 add_event_sink(b.clone());
379
380 log_debug("x", "msg");
381
382 assert_eq!(a.logs.borrow().len(), 1);
383 assert_eq!(b.logs.borrow().len(), 1);
384
385 reset_event_sinks();
386 }
387
388 #[test]
389 fn test_log_with_metadata() {
390 let sink = Rc::new(CollectorSink::new());
391 clear_event_sinks();
392 add_event_sink(sink.clone());
393
394 let mut meta = BTreeMap::new();
395 meta.insert("tokens".into(), serde_json::json!(42));
396 log_info_meta("llm", "token usage", meta);
397
398 let logs = sink.logs.borrow();
399 assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
400
401 reset_event_sinks();
402 }
403}