use roder_api::events::{EventEnvelope, RoderEvent};
use roder_api::inference::TokenUsage;
use time::OffsetDateTime;
use crate::model::{SessionRecord, TokenUsageRecord, ToolCallRecord, TurnRecord};
use crate::store::AnalyticsStore;
fn ms(timestamp: OffsetDateTime) -> i64 {
(timestamp.unix_timestamp_nanos() / 1_000_000) as i64
}
pub struct AnalyticsIngestor<'a> {
store: &'a AnalyticsStore,
}
impl<'a> AnalyticsIngestor<'a> {
pub fn new(store: &'a AnalyticsStore) -> Self {
Self { store }
}
pub fn ingest_event(&self, envelope: &EventEnvelope) -> anyhow::Result<()> {
match &envelope.event {
RoderEvent::ThreadCreated(event) => self.store.upsert_session(&SessionRecord {
thread_id: event.thread_id.clone(),
workspace_key: None,
workspace_label: None,
provider: None,
model: None,
created_at_ms: ms(event.timestamp),
updated_at_ms: ms(event.timestamp),
}),
RoderEvent::TurnStarted(event) => self.store.upsert_turn(&TurnRecord {
thread_id: event.thread_id.clone(),
turn_id: event.turn_id.clone(),
provider: None,
model: None,
runtime_profile: Some(format!("{:?}", event.runtime_profile).to_lowercase()),
started_at_ms: Some(ms(event.timestamp)),
completed_at_ms: None,
status: "running".to_string(),
error_kind: None,
}),
RoderEvent::InferenceStarted(event) => {
self.store.upsert_session(&SessionRecord {
thread_id: event.thread_id.clone(),
workspace_key: None,
workspace_label: None,
provider: Some(event.model.provider.clone()),
model: Some(event.model.model.clone()),
created_at_ms: ms(event.timestamp),
updated_at_ms: ms(event.timestamp),
})?;
self.store.upsert_turn(&TurnRecord {
thread_id: event.thread_id.clone(),
turn_id: event.turn_id.clone(),
provider: Some(event.model.provider.clone()),
model: Some(event.model.model.clone()),
runtime_profile: None,
started_at_ms: Some(ms(event.timestamp)),
completed_at_ms: None,
status: "running".to_string(),
error_kind: None,
})
}
RoderEvent::TurnCompleted(event) => {
self.store.upsert_turn(&TurnRecord {
thread_id: event.thread_id.clone(),
turn_id: event.turn_id.clone(),
provider: None,
model: None,
runtime_profile: None,
started_at_ms: None,
completed_at_ms: Some(ms(event.timestamp)),
status: "completed".to_string(),
error_kind: None,
})?;
self.record_usage(
&event.thread_id,
&event.turn_id,
event.usage.as_ref(),
event.timestamp,
)
}
RoderEvent::TurnFailed(event) => {
self.store.upsert_turn(&TurnRecord {
thread_id: event.thread_id.clone(),
turn_id: event.turn_id.clone(),
provider: None,
model: None,
runtime_profile: None,
started_at_ms: None,
completed_at_ms: Some(ms(event.timestamp)),
status: "failed".to_string(),
error_kind: Some(
event
.error_kind
.clone()
.unwrap_or_else(|| "unknown".to_string()),
),
})?;
self.record_usage(
&event.thread_id,
&event.turn_id,
event.usage.as_ref(),
event.timestamp,
)
}
RoderEvent::ToolCallStarted(event) => self.store.upsert_tool_call(&ToolCallRecord {
thread_id: event.thread_id.clone(),
turn_id: event.turn_id.clone(),
tool_id: event.tool_id.clone(),
tool_name: event.tool_name.clone(),
started_at_ms: Some(ms(event.timestamp)),
completed_at_ms: None,
duration_ms: None,
status: "running".to_string(),
is_error: false,
}),
RoderEvent::ToolCallCompleted(event) => self.store.upsert_tool_call(&ToolCallRecord {
thread_id: event.thread_id.clone(),
turn_id: event.turn_id.clone(),
tool_id: event.tool_id.clone(),
tool_name: event.tool_name.clone(),
started_at_ms: None,
completed_at_ms: Some(ms(event.timestamp)),
duration_ms: None,
status: if event.is_error { "error" } else { "success" }.to_string(),
is_error: event.is_error,
}),
_ => Ok(()),
}
}
fn record_usage(
&self,
thread_id: &str,
turn_id: &str,
usage: Option<&TokenUsage>,
timestamp: OffsetDateTime,
) -> anyhow::Result<()> {
let Some(usage) = usage else {
return Ok(());
};
if usage.total_tokens == 0 && usage.prompt_tokens == 0 && usage.completion_tokens == 0 {
return Ok(());
}
self.store.upsert_token_usage(&TokenUsageRecord {
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
provider: None,
model: None,
recorded_at_ms: ms(timestamp),
prompt_tokens: usage.prompt_tokens,
completion_tokens: usage.completion_tokens,
total_tokens: usage.total_tokens,
cached_prompt_tokens: usage.cached_prompt_tokens,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::WorkspaceLabelMode;
use roder_api::events::{
EventSource, ThreadCreated, ToolCallCompleted, ToolCallStarted, TurnCompleted, TurnFailed,
TurnStarted,
};
fn envelope(seq: u64, event: RoderEvent) -> EventEnvelope {
EventEnvelope {
event_id: format!("event-{seq}"),
seq,
timestamp: OffsetDateTime::UNIX_EPOCH,
source: EventSource::Core,
kind: event.kind().to_string(),
thread_id: event.thread_id().cloned(),
turn_id: event.turn_id().cloned(),
event,
}
}
fn at(ms_value: i64) -> OffsetDateTime {
OffsetDateTime::from_unix_timestamp_nanos(i128::from(ms_value) * 1_000_000).unwrap()
}
fn usage(total: u32) -> TokenUsage {
TokenUsage {
prompt_tokens: total - 20,
completion_tokens: 20,
total_tokens: total,
cached_prompt_tokens: 10,
cache_creation_prompt_tokens: 0,
..TokenUsage::default()
}
}
fn temp_store() -> (AnalyticsStore, std::path::PathBuf) {
let dir =
std::env::temp_dir().join(format!("roder-analytics-ingest-{}", uuid::Uuid::new_v4()));
let store = AnalyticsStore::open(
&AnalyticsStore::default_path(&dir),
WorkspaceLabelMode::FullPath,
)
.unwrap();
(store, dir)
}
pub(crate) fn fake_turn_events(
thread_id: &str,
turn_id: &str,
base_ms: i64,
) -> Vec<EventEnvelope> {
vec![
envelope(
1,
RoderEvent::ThreadCreated(ThreadCreated {
thread_id: thread_id.to_string(),
timestamp: at(base_ms),
}),
),
envelope(
2,
RoderEvent::TurnStarted(TurnStarted {
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
runtime_profile: Default::default(),
timestamp: at(base_ms + 10),
}),
),
envelope(
3,
RoderEvent::ToolCallStarted(ToolCallStarted {
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
tool_id: "call-1".to_string(),
tool_name: Some("read_file".to_string()),
display_payload: None,
timestamp: at(base_ms + 100),
}),
),
envelope(
4,
RoderEvent::ToolCallCompleted(ToolCallCompleted {
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
tool_id: "call-1".to_string(),
tool_name: Some("read_file".to_string()),
display_payload: None,
is_error: false,
output: Some("secret file contents".to_string()),
timestamp: at(base_ms + 225),
}),
),
envelope(
5,
RoderEvent::TurnCompleted(TurnCompleted {
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
usage: Some(usage(120)),
finish_reason: Some("stop".to_string()),
timestamp: at(base_ms + 500),
}),
),
]
}
#[test]
fn fake_turn_produces_one_turn_one_duration_one_usage_record() {
let (store, dir) = temp_store();
let ingestor = AnalyticsIngestor::new(&store);
for event in fake_turn_events("t1", "u1", 10_000) {
ingestor.ingest_event(&event).unwrap();
}
let counts = store.counts().unwrap();
assert_eq!(counts.sessions, 1);
assert_eq!(counts.turns, 1);
assert_eq!(counts.tool_calls, 1);
assert_eq!(counts.token_usage, 1);
let conn = store.conn.lock().unwrap();
let (duration, status): (i64, String) = conn
.query_row("SELECT duration_ms, status FROM tool_calls", [], |row| {
Ok((row.get(0)?, row.get(1)?))
})
.unwrap();
assert_eq!(duration, 125);
assert_eq!(status, "success");
let total: i64 = conn
.query_row("SELECT total_tokens FROM token_usage", [], |row| row.get(0))
.unwrap();
assert_eq!(total, 120);
let dumped: String = conn
.query_row(
"SELECT COALESCE(GROUP_CONCAT(tool_name), '') FROM tool_calls",
[],
|row| row.get(0),
)
.unwrap();
assert!(!dumped.contains("secret file contents"));
drop(conn);
let ingestor = AnalyticsIngestor::new(&store);
for event in fake_turn_events("t1", "u1", 10_000) {
ingestor.ingest_event(&event).unwrap();
}
assert_eq!(store.counts().unwrap(), counts);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn failed_turns_and_failed_tools_are_queryable_with_error_state() {
let (store, dir) = temp_store();
let ingestor = AnalyticsIngestor::new(&store);
ingestor
.ingest_event(&envelope(
1,
RoderEvent::ToolCallCompleted(ToolCallCompleted {
thread_id: "t1".to_string(),
turn_id: "u1".to_string(),
tool_id: "call-err".to_string(),
tool_name: Some("shell".to_string()),
display_payload: None,
is_error: true,
output: None,
timestamp: at(1_000),
}),
))
.unwrap();
ingestor
.ingest_event(&envelope(
2,
RoderEvent::TurnFailed(TurnFailed {
thread_id: "t1".to_string(),
turn_id: "u1".to_string(),
error: "provider exploded".to_string(),
error_kind: Some("provider".to_string()),
usage: Some(usage(50)),
timestamp: at(2_000),
}),
))
.unwrap();
let conn = store.conn.lock().unwrap();
let (status, error_kind): (String, String) = conn
.query_row("SELECT status, error_kind FROM turns", [], |row| {
Ok((row.get(0)?, row.get(1)?))
})
.unwrap();
assert_eq!(status, "failed");
assert_eq!(error_kind, "provider");
let (is_error, duration): (bool, Option<i64>) = conn
.query_row("SELECT is_error, duration_ms FROM tool_calls", [], |row| {
Ok((row.get(0)?, row.get(1)?))
})
.unwrap();
assert!(is_error);
assert_eq!(duration, None);
drop(conn);
let _ = std::fs::remove_dir_all(&dir);
}
}