use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EventLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
#[derive(Clone, Debug)]
pub struct LogEvent {
pub level: EventLevel,
pub category: String,
pub message: String,
pub metadata: BTreeMap<String, serde_json::Value>,
}
#[derive(Clone, Debug)]
pub struct SpanEvent {
pub span_id: u64,
pub parent_id: Option<u64>,
pub name: String,
pub kind: String,
pub metadata: BTreeMap<String, serde_json::Value>,
}
pub trait EventSink {
fn emit_log(&self, event: &LogEvent);
fn emit_span_start(&self, event: &SpanEvent);
fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>);
}
pub struct StderrSink;
impl EventSink for StderrSink {
fn emit_log(&self, event: &LogEvent) {
let level_str = match event.level {
EventLevel::Trace => "TRACE",
EventLevel::Debug => "DEBUG",
EventLevel::Info => "INFO",
EventLevel::Warn => "WARN",
EventLevel::Error => "ERROR",
};
match event.level {
EventLevel::Warn => {
eprintln!("[harn] warning: {}", event.message);
}
EventLevel::Error => {
eprintln!("[harn] error: {}", event.message);
}
_ => {
eprintln!("[{level_str}] [{}] {}", event.category, event.message);
}
}
}
fn emit_span_start(&self, _event: &SpanEvent) {
}
fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {
}
}
pub struct CollectorSink {
pub logs: RefCell<Vec<LogEvent>>,
pub spans: RefCell<Vec<SpanEvent>>,
}
impl CollectorSink {
pub fn new() -> Self {
Self {
logs: RefCell::new(Vec::new()),
spans: RefCell::new(Vec::new()),
}
}
}
impl Default for CollectorSink {
fn default() -> Self {
Self::new()
}
}
impl EventSink for CollectorSink {
fn emit_log(&self, event: &LogEvent) {
self.logs.borrow_mut().push(event.clone());
}
fn emit_span_start(&self, event: &SpanEvent) {
self.spans.borrow_mut().push(event.clone());
}
fn emit_span_end(&self, _span_id: u64, _metadata: &BTreeMap<String, serde_json::Value>) {
}
}
thread_local! {
static EVENT_SINKS: RefCell<Vec<Rc<dyn EventSink>>> = RefCell::new(vec![Rc::new(StderrSink)]);
}
pub fn add_event_sink(sink: Rc<dyn EventSink>) {
EVENT_SINKS.with(|sinks| sinks.borrow_mut().push(sink));
}
pub fn clear_event_sinks() {
EVENT_SINKS.with(|sinks| sinks.borrow_mut().clear());
}
pub fn reset_event_sinks() {
EVENT_SINKS.with(|sinks| {
let mut s = sinks.borrow_mut();
s.clear();
s.push(Rc::new(StderrSink));
});
}
pub fn emit_log(
level: EventLevel,
category: &str,
message: &str,
metadata: BTreeMap<String, serde_json::Value>,
) {
let event = LogEvent {
level,
category: category.to_string(),
message: message.to_string(),
metadata,
};
EVENT_SINKS.with(|sinks| {
for sink in sinks.borrow().iter() {
sink.emit_log(&event);
}
});
}
pub fn emit_span_start(
span_id: u64,
parent_id: Option<u64>,
name: &str,
kind: &str,
metadata: BTreeMap<String, serde_json::Value>,
) {
let event = SpanEvent {
span_id,
parent_id,
name: name.to_string(),
kind: kind.to_string(),
metadata,
};
EVENT_SINKS.with(|sinks| {
for sink in sinks.borrow().iter() {
sink.emit_span_start(&event);
}
});
}
pub fn emit_span_end(span_id: u64, metadata: BTreeMap<String, serde_json::Value>) {
EVENT_SINKS.with(|sinks| {
for sink in sinks.borrow().iter() {
sink.emit_span_end(span_id, &metadata);
}
});
}
pub fn log_info(category: &str, message: &str) {
emit_log(EventLevel::Info, category, message, BTreeMap::new());
}
pub fn log_warn(category: &str, message: &str) {
emit_log(EventLevel::Warn, category, message, BTreeMap::new());
}
pub fn log_error(category: &str, message: &str) {
emit_log(EventLevel::Error, category, message, BTreeMap::new());
}
pub fn log_debug(category: &str, message: &str) {
emit_log(EventLevel::Debug, category, message, BTreeMap::new());
}
pub fn log_info_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
emit_log(EventLevel::Info, category, message, metadata);
}
pub fn log_warn_meta(category: &str, message: &str, metadata: BTreeMap<String, serde_json::Value>) {
emit_log(EventLevel::Warn, category, message, metadata);
}
#[cfg(feature = "otel")]
pub struct OtelSink {
provider: opentelemetry_sdk::trace::SdkTracerProvider,
active_spans:
std::cell::RefCell<std::collections::HashMap<u64, opentelemetry_sdk::trace::Span>>,
}
#[cfg(feature = "otel")]
impl OtelSink {
pub fn new() -> Result<Self, String> {
use opentelemetry_otlp::SpanExporter;
use opentelemetry_sdk::trace::SdkTracerProvider;
let exporter = SpanExporter::builder()
.with_http()
.build()
.map_err(|e| format!("OTel span exporter init failed: {e}"))?;
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.build();
opentelemetry::global::set_tracer_provider(provider.clone());
Ok(Self {
provider,
active_spans: std::cell::RefCell::new(std::collections::HashMap::new()),
})
}
}
#[cfg(feature = "otel")]
impl EventSink for OtelSink {
fn emit_log(&self, event: &LogEvent) {
use opentelemetry::trace::{Tracer, TracerProvider};
let tracer = self.provider.tracer("harn");
let _span = tracer
.span_builder(format!("log.{}", event.category))
.with_attributes(vec![
opentelemetry::KeyValue::new("level", format!("{:?}", event.level)),
opentelemetry::KeyValue::new("message", event.message.clone()),
opentelemetry::KeyValue::new("category", event.category.clone()),
])
.start(&tracer);
}
fn emit_span_start(&self, event: &SpanEvent) {
use opentelemetry::trace::{Tracer, TracerProvider};
let tracer = self.provider.tracer("harn");
let span = tracer
.span_builder(event.name.clone())
.with_attributes(vec![
opentelemetry::KeyValue::new("harn.span_id", event.span_id as i64),
opentelemetry::KeyValue::new("harn.kind", event.kind.clone()),
])
.start(&tracer);
self.active_spans.borrow_mut().insert(event.span_id, span);
}
fn emit_span_end(&self, span_id: u64, metadata: &BTreeMap<String, serde_json::Value>) {
use opentelemetry::trace::Span;
if let Some(mut span) = self.active_spans.borrow_mut().remove(&span_id) {
for (key, value) in metadata {
span.set_attribute(opentelemetry::KeyValue::new(
key.clone(),
format!("{value}"),
));
}
span.end();
}
}
}
#[cfg(feature = "otel")]
impl Drop for OtelSink {
fn drop(&mut self) {
self.active_spans.borrow_mut().clear();
let _ = self.provider.shutdown();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_collector_sink_captures_logs() {
let sink = Rc::new(CollectorSink::new());
clear_event_sinks();
add_event_sink(sink.clone());
log_info("llm", "test message");
log_warn("llm.cost", "cost warning");
log_error("llm.agent", "agent error");
let logs = sink.logs.borrow();
assert_eq!(logs.len(), 3);
assert_eq!(logs[0].level, EventLevel::Info);
assert_eq!(logs[0].category, "llm");
assert_eq!(logs[0].message, "test message");
assert_eq!(logs[1].level, EventLevel::Warn);
assert_eq!(logs[2].level, EventLevel::Error);
reset_event_sinks();
}
#[test]
fn test_collector_sink_captures_spans() {
let sink = Rc::new(CollectorSink::new());
clear_event_sinks();
add_event_sink(sink.clone());
emit_span_start(1, None, "agent_loop", "llm_call", BTreeMap::new());
emit_span_end(1, BTreeMap::new());
let spans = sink.spans.borrow();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].span_id, 1);
assert_eq!(spans[0].name, "agent_loop");
reset_event_sinks();
}
#[test]
fn test_stderr_sink_does_not_panic() {
let sink = StderrSink;
let event = LogEvent {
level: EventLevel::Warn,
category: "test".into(),
message: "hello".into(),
metadata: BTreeMap::new(),
};
sink.emit_log(&event);
sink.emit_span_start(&SpanEvent {
span_id: 1,
parent_id: None,
name: "x".into(),
kind: "y".into(),
metadata: BTreeMap::new(),
});
sink.emit_span_end(1, &BTreeMap::new());
}
#[test]
fn test_multiple_sinks() {
let a = Rc::new(CollectorSink::new());
let b = Rc::new(CollectorSink::new());
clear_event_sinks();
add_event_sink(a.clone());
add_event_sink(b.clone());
log_debug("x", "msg");
assert_eq!(a.logs.borrow().len(), 1);
assert_eq!(b.logs.borrow().len(), 1);
reset_event_sinks();
}
#[test]
fn test_log_with_metadata() {
let sink = Rc::new(CollectorSink::new());
clear_event_sinks();
add_event_sink(sink.clone());
let mut meta = BTreeMap::new();
meta.insert("tokens".into(), serde_json::json!(42));
log_info_meta("llm", "token usage", meta);
let logs = sink.logs.borrow();
assert_eq!(logs[0].metadata["tokens"], serde_json::json!(42));
reset_event_sinks();
}
}