use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
pub enum TelemetryEvent {
LlmCallStart {
model: String,
session_id: Option<String>,
},
LlmCallEnd {
model: String,
latency: Duration,
prompt_tokens: u64,
completion_tokens: u64,
session_id: Option<String>,
},
ToolCallStart {
tool: String,
session_id: Option<String>,
},
ToolCallEnd {
tool: String,
latency: Duration,
session_id: Option<String>,
},
Error {
source: String,
message: String,
session_id: Option<String>,
},
Custom {
kind: String,
payload: serde_json::Value,
session_id: Option<String>,
},
}
#[async_trait]
pub trait TelemetrySink: Send + Sync {
async fn record(&self, event: TelemetryEvent);
}
#[async_trait]
impl<F> TelemetrySink for F
where
F: Fn(TelemetryEvent) + Send + Sync,
{
async fn record(&self, event: TelemetryEvent) {
(self)(event)
}
}
#[derive(Debug, Default, Clone)]
pub struct TelemetrySnapshot {
pub llm_calls: u64,
pub tool_calls: u64,
pub errors: u64,
pub prompt_tokens: u64,
pub completion_tokens: u64,
pub by_model: HashMap<String, u64>,
pub by_tool: HashMap<String, u64>,
pub by_session: HashMap<String, u64>,
}
#[derive(Default)]
pub struct InMemoryTelemetry {
inner: Mutex<TelemetrySnapshot>,
}
impl InMemoryTelemetry {
pub fn new() -> Self {
Self::default()
}
pub async fn snapshot(&self) -> TelemetrySnapshot {
self.inner.lock().await.clone()
}
pub async fn reset(&self) {
*self.inner.lock().await = TelemetrySnapshot::default();
}
}
#[async_trait]
impl TelemetrySink for InMemoryTelemetry {
async fn record(&self, event: TelemetryEvent) {
let mut g = self.inner.lock().await;
match event {
TelemetryEvent::LlmCallStart { model, session_id } => {
g.llm_calls += 1;
*g.by_model.entry(model).or_insert(0) += 1;
if let Some(s) = session_id {
*g.by_session.entry(s).or_insert(0) += 1;
}
}
TelemetryEvent::LlmCallEnd {
prompt_tokens,
completion_tokens,
..
} => {
g.prompt_tokens += prompt_tokens;
g.completion_tokens += completion_tokens;
}
TelemetryEvent::ToolCallStart { tool, session_id } => {
g.tool_calls += 1;
*g.by_tool.entry(tool).or_insert(0) += 1;
if let Some(s) = session_id {
*g.by_session.entry(s).or_insert(0) += 1;
}
}
TelemetryEvent::ToolCallEnd { .. } => {}
TelemetryEvent::Error { .. } => {
g.errors += 1;
}
TelemetryEvent::Custom { .. } => {
}
}
}
}
pub type TelemetryHandle = Arc<dyn TelemetrySink>;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn counters_increment() {
let sink = InMemoryTelemetry::new();
sink.record(TelemetryEvent::LlmCallStart {
model: "gpt-4o".into(),
session_id: Some("s1".into()),
})
.await;
sink.record(TelemetryEvent::LlmCallStart {
model: "gpt-4o".into(),
session_id: Some("s2".into()),
})
.await;
sink.record(TelemetryEvent::LlmCallEnd {
model: "gpt-4o".into(),
latency: Duration::from_millis(100),
prompt_tokens: 50,
completion_tokens: 25,
session_id: None,
})
.await;
sink.record(TelemetryEvent::ToolCallStart {
tool: "search".into(),
session_id: None,
})
.await;
sink.record(TelemetryEvent::Error {
source: "openai".into(),
message: "rate limit".into(),
session_id: None,
})
.await;
let snap = sink.snapshot().await;
assert_eq!(snap.llm_calls, 2);
assert_eq!(snap.tool_calls, 1);
assert_eq!(snap.errors, 1);
assert_eq!(snap.prompt_tokens, 50);
assert_eq!(snap.completion_tokens, 25);
assert_eq!(snap.by_model.get("gpt-4o").copied().unwrap_or(0), 2);
assert_eq!(snap.by_session.get("s1").copied().unwrap_or(0), 1);
}
#[tokio::test]
async fn reset_clears_counters() {
let sink = InMemoryTelemetry::new();
sink.record(TelemetryEvent::LlmCallStart {
model: "m".into(),
session_id: None,
})
.await;
sink.reset().await;
let snap = sink.snapshot().await;
assert_eq!(snap.llm_calls, 0);
}
#[tokio::test]
async fn custom_event_does_not_break_counters() {
let sink = InMemoryTelemetry::new();
sink.record(TelemetryEvent::Custom {
kind: "cache-hit".into(),
payload: serde_json::json!({"size": 12}),
session_id: None,
})
.await;
let snap = sink.snapshot().await;
assert_eq!(snap.llm_calls, 0);
}
#[tokio::test]
async fn closure_sink_works() {
use std::sync::atomic::{AtomicU64, Ordering};
let count = Arc::new(AtomicU64::new(0));
let c2 = count.clone();
let sink = move |_e: TelemetryEvent| {
c2.fetch_add(1, Ordering::SeqCst);
};
sink.record(TelemetryEvent::Error {
source: "x".into(),
message: "y".into(),
session_id: None,
})
.await;
assert_eq!(count.load(Ordering::SeqCst), 1);
}
}