use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use chrono::Utc;
use parking_lot::Mutex;
use serde_json::Value;
use super::events::{EventCtx, EventKind, TraceEvent};
use super::pipeline::TracePipeline;
tokio::task_local! {
pub static CURRENT_OP: (String, EventCtx);
}
#[derive(Clone)]
pub struct EventEmitter {
inner: Arc<EmitterInner>,
}
struct EmitterInner {
pipeline: Option<Arc<TracePipeline>>,
request_id: String,
seq: Mutex<u64>,
start_times: Mutex<HashMap<(String, EventCtx), Instant>>,
}
impl EventEmitter {
pub fn new(pipeline: Arc<TracePipeline>, request_id: impl Into<String>) -> Self {
Self {
inner: Arc::new(EmitterInner {
pipeline: Some(pipeline),
request_id: request_id.into(),
seq: Mutex::new(0),
start_times: Mutex::new(HashMap::new()),
}),
}
}
pub fn null() -> Self {
Self {
inner: Arc::new(EmitterInner {
pipeline: None,
request_id: String::new(),
seq: Mutex::new(0),
start_times: Mutex::new(HashMap::new()),
}),
}
}
pub fn is_active(&self) -> bool {
self.inner.pipeline.is_some()
}
pub fn emit(&self, event: TraceEvent) {
if let Some(p) = &self.inner.pipeline {
p.push(event);
}
}
pub fn op_start(&self, op_name: &str, ctx: &EventCtx, inputs: Value) {
if !self.is_active() {
return;
}
self.inner
.start_times
.lock()
.insert((op_name.to_string(), ctx.clone()), Instant::now());
let event = self.build(
EventKind::OpStart,
Some(op_name),
ctx,
payload_one("inputs", inputs),
);
self.emit(event);
}
pub fn op_end(
&self,
op_name: &str,
ctx: &EventCtx,
outputs: Value,
status: &str,
duration_ms: Option<f64>,
yield_count: u32,
) {
if !self.is_active() {
return;
}
let key = (op_name.to_string(), ctx.clone());
let start = self.inner.start_times.lock().remove(&key);
let duration = duration_ms.unwrap_or_else(|| match start {
Some(t) => t.elapsed().as_secs_f64() * 1000.0,
None => return_no_op(0.0),
});
if start.is_none() && duration_ms.is_none() {
return;
}
let mut payload = serde_json::Map::new();
payload.insert("outputs".into(), outputs);
payload.insert("status".into(), Value::String(status.to_string()));
payload.insert(
"duration_ms".into(),
serde_json::Number::from_f64(duration)
.map(Value::Number)
.unwrap_or(Value::Null),
);
payload.insert(
"yield_count".into(),
Value::Number(serde_json::Number::from(yield_count)),
);
let event = self.build(EventKind::OpEnd, Some(op_name), ctx, into_btree(payload));
self.emit(event);
}
pub fn op_yield(&self, op_name: &str, ctx: &EventCtx, yielded: Value, idx: u64) {
if !self.is_active() {
return;
}
let mut payload = serde_json::Map::new();
payload.insert("yielded".into(), yielded);
payload.insert("idx".into(), Value::Number(serde_json::Number::from(idx)));
let event = self.build(EventKind::OpYield, Some(op_name), ctx, into_btree(payload));
self.emit(event);
}
pub fn annotate(&self, key: &str, value: Value) {
if !self.is_active() {
return;
}
let (op_name, ctx) = match CURRENT_OP.try_with(|cur| cur.clone()) {
Ok(v) => v,
Err(_) => return,
};
let mut payload = serde_json::Map::new();
payload.insert("key".into(), Value::String(key.to_string()));
payload.insert("value".into(), value);
let event = self.build(
EventKind::Annotation,
Some(&op_name),
&ctx,
into_btree(payload),
);
self.emit(event);
}
pub fn llm_usage(
&self,
op_name: &str,
ctx: &EventCtx,
model: &str,
prompt_tokens: u64,
completion_tokens: u64,
total_tokens: u64,
cost_usd: f64,
) {
if !self.is_active() {
return;
}
let mut payload = serde_json::Map::new();
payload.insert("model".into(), Value::String(model.to_string()));
payload.insert("prompt_tokens".into(), Value::Number(prompt_tokens.into()));
payload.insert(
"completion_tokens".into(),
Value::Number(completion_tokens.into()),
);
payload.insert("total_tokens".into(), Value::Number(total_tokens.into()));
payload.insert(
"cost_usd".into(),
serde_json::Number::from_f64(cost_usd)
.map(Value::Number)
.unwrap_or(Value::Null),
);
let event = self.build(EventKind::LlmUsage, Some(op_name), ctx, into_btree(payload));
self.emit(event);
}
pub fn media_ref(
&self,
op_name: &str,
ctx: &EventCtx,
handle: &str,
mime: &str,
size_bytes: u64,
) {
if !self.is_active() {
return;
}
let mut payload = serde_json::Map::new();
payload.insert("handle".into(), Value::String(handle.to_string()));
payload.insert("mime".into(), Value::String(mime.to_string()));
payload.insert("size_bytes".into(), Value::Number(size_bytes.into()));
let event = self.build(EventKind::MediaRef, Some(op_name), ctx, into_btree(payload));
self.emit(event);
}
pub fn group<'a>(&'a self, name: &str) -> GroupGuard<'a> {
if self.is_active() {
let mut payload = serde_json::Map::new();
payload.insert("name".into(), Value::String(name.to_string()));
let event = self.build(
EventKind::GroupStart,
None,
&Vec::new(),
into_btree(payload),
);
self.emit(event);
}
GroupGuard {
emitter: self,
name: name.to_string(),
}
}
pub fn start_time_of(&self, op_name: &str, ctx: &EventCtx) -> Option<Instant> {
self.inner
.start_times
.lock()
.get(&(op_name.to_string(), ctx.clone()))
.copied()
}
fn build(
&self,
kind: EventKind,
op_name: Option<&str>,
ctx: &EventCtx,
payload: std::collections::BTreeMap<String, Value>,
) -> TraceEvent {
let mut s = self.inner.seq.lock();
let seq = *s;
*s = seq + 1;
drop(s);
TraceEvent {
event_id: format!("{}-{}", self.inner.request_id, seq),
request_id: self.inner.request_id.clone(),
kind,
op_name: op_name.map(String::from),
ctx: ctx.clone(),
timestamp: Utc::now(),
seq,
payload,
}
}
}
pub struct GroupGuard<'a> {
emitter: &'a EventEmitter,
name: String,
}
impl Drop for GroupGuard<'_> {
fn drop(&mut self) {
if !self.emitter.is_active() {
return;
}
let mut payload = serde_json::Map::new();
payload.insert("name".into(), Value::String(self.name.clone()));
payload.insert("status".into(), Value::String("ok".into()));
let event = self
.emitter
.build(EventKind::GroupEnd, None, &Vec::new(), into_btree(payload));
self.emitter.emit(event);
}
}
fn payload_one(key: &str, value: Value) -> std::collections::BTreeMap<String, Value> {
let mut m = std::collections::BTreeMap::new();
m.insert(key.to_string(), value);
m
}
fn into_btree(m: serde_json::Map<String, Value>) -> std::collections::BTreeMap<String, Value> {
let mut out = std::collections::BTreeMap::new();
for (k, v) in m {
out.insert(k, v);
}
out
}
fn return_no_op<T>(t: T) -> T {
t
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::tracing::pipeline::TracePipeline;
#[test]
fn null_emitter_is_inactive_and_silent() {
let e = EventEmitter::null();
assert!(!e.is_active());
e.op_start("op", &vec!["main".into()], serde_json::json!({}));
e.op_end(
"op",
&vec!["main".into()],
serde_json::json!({}),
"ok",
None,
0,
);
}
#[test]
fn op_start_op_end_round_trip_records_duration() {
let pipeline = Arc::new(TracePipeline::new());
let emitter = EventEmitter::new(pipeline.clone(), "req-1");
emitter.op_start("op", &vec!["main".into()], serde_json::json!({"x": 1}));
std::thread::sleep(std::time::Duration::from_millis(2));
emitter.op_end(
"op",
&vec!["main".into()],
serde_json::json!({"y": 2}),
"ok",
None,
0,
);
let events = pipeline.drain();
assert_eq!(events.len(), 2);
assert_eq!(events[0].kind, EventKind::OpStart);
assert_eq!(events[1].kind, EventKind::OpEnd);
let duration = events[1]
.payload
.get("duration_ms")
.and_then(|v| v.as_f64())
.expect("duration_ms set");
assert!(duration >= 1.0, "duration_ms = {duration}");
}
#[test]
fn op_end_idempotent_when_called_twice() {
let pipeline = Arc::new(TracePipeline::new());
let emitter = EventEmitter::new(pipeline.clone(), "req-2");
emitter.op_start("op", &vec!["main".into()], serde_json::json!({}));
emitter.op_end(
"op",
&vec!["main".into()],
serde_json::json!({}),
"ok",
None,
0,
);
emitter.op_end(
"op",
&vec!["main".into()],
serde_json::json!({}),
"ok",
None,
0,
);
let events = pipeline.drain();
assert_eq!(events.len(), 2);
}
#[test]
fn group_guard_emits_start_and_end_on_drop() {
let pipeline = Arc::new(TracePipeline::new());
let emitter = EventEmitter::new(pipeline.clone(), "req-3");
{
let _g = emitter.group("preflight");
}
let events = pipeline.drain();
assert_eq!(events.len(), 2);
assert_eq!(events[0].kind, EventKind::GroupStart);
assert_eq!(events[1].kind, EventKind::GroupEnd);
}
#[tokio::test]
async fn annotate_reads_current_op_task_local() {
let pipeline = Arc::new(TracePipeline::new());
let emitter = EventEmitter::new(pipeline.clone(), "req-4");
let scope_op = ("the_op".to_string(), vec!["main".into()]);
CURRENT_OP
.scope(scope_op, async {
emitter.annotate("user_id", serde_json::json!("u-1"));
})
.await;
let events = pipeline.drain();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, EventKind::Annotation);
assert_eq!(events[0].op_name.as_deref(), Some("the_op"));
}
}