1use roder_api::events::{EventEnvelope, RoderEvent};
4use roder_api::inference::TokenUsage;
5use time::OffsetDateTime;
6
7use crate::model::{SessionRecord, TokenUsageRecord, ToolCallRecord, TurnRecord};
8use crate::store::AnalyticsStore;
9
10fn ms(timestamp: OffsetDateTime) -> i64 {
11 (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
12}
13
14pub struct AnalyticsIngestor<'a> {
18 store: &'a AnalyticsStore,
19}
20
21impl<'a> AnalyticsIngestor<'a> {
22 pub fn new(store: &'a AnalyticsStore) -> Self {
23 Self { store }
24 }
25
26 pub fn ingest_event(&self, envelope: &EventEnvelope) -> anyhow::Result<()> {
31 match &envelope.event {
32 RoderEvent::ThreadCreated(event) => self.store.upsert_session(&SessionRecord {
33 thread_id: event.thread_id.clone(),
34 workspace_key: None,
35 workspace_label: None,
36 provider: None,
37 model: None,
38 created_at_ms: ms(event.timestamp),
39 updated_at_ms: ms(event.timestamp),
40 }),
41 RoderEvent::TurnStarted(event) => self.store.upsert_turn(&TurnRecord {
42 thread_id: event.thread_id.clone(),
43 turn_id: event.turn_id.clone(),
44 provider: None,
45 model: None,
46 runtime_profile: Some(format!("{:?}", event.runtime_profile).to_lowercase()),
47 started_at_ms: Some(ms(event.timestamp)),
48 completed_at_ms: None,
49 status: "running".to_string(),
50 error_kind: None,
51 }),
52 RoderEvent::InferenceStarted(event) => {
53 self.store.upsert_session(&SessionRecord {
54 thread_id: event.thread_id.clone(),
55 workspace_key: None,
56 workspace_label: None,
57 provider: Some(event.model.provider.clone()),
58 model: Some(event.model.model.clone()),
59 created_at_ms: ms(event.timestamp),
60 updated_at_ms: ms(event.timestamp),
61 })?;
62 self.store.upsert_turn(&TurnRecord {
63 thread_id: event.thread_id.clone(),
64 turn_id: event.turn_id.clone(),
65 provider: Some(event.model.provider.clone()),
66 model: Some(event.model.model.clone()),
67 runtime_profile: None,
68 started_at_ms: Some(ms(event.timestamp)),
69 completed_at_ms: None,
70 status: "running".to_string(),
71 error_kind: None,
72 })
73 }
74 RoderEvent::TurnCompleted(event) => {
75 self.store.upsert_turn(&TurnRecord {
76 thread_id: event.thread_id.clone(),
77 turn_id: event.turn_id.clone(),
78 provider: None,
79 model: None,
80 runtime_profile: None,
81 started_at_ms: None,
82 completed_at_ms: Some(ms(event.timestamp)),
83 status: "completed".to_string(),
84 error_kind: None,
85 })?;
86 self.record_usage(
87 &event.thread_id,
88 &event.turn_id,
89 event.usage.as_ref(),
90 event.timestamp,
91 )
92 }
93 RoderEvent::TurnFailed(event) => {
94 self.store.upsert_turn(&TurnRecord {
95 thread_id: event.thread_id.clone(),
96 turn_id: event.turn_id.clone(),
97 provider: None,
98 model: None,
99 runtime_profile: None,
100 started_at_ms: None,
101 completed_at_ms: Some(ms(event.timestamp)),
102 status: "failed".to_string(),
103 error_kind: Some(
104 event
105 .error_kind
106 .clone()
107 .unwrap_or_else(|| "unknown".to_string()),
108 ),
109 })?;
110 self.record_usage(
111 &event.thread_id,
112 &event.turn_id,
113 event.usage.as_ref(),
114 event.timestamp,
115 )
116 }
117 RoderEvent::ToolCallStarted(event) => self.store.upsert_tool_call(&ToolCallRecord {
118 thread_id: event.thread_id.clone(),
119 turn_id: event.turn_id.clone(),
120 tool_id: event.tool_id.clone(),
121 tool_name: event.tool_name.clone(),
122 started_at_ms: Some(ms(event.timestamp)),
123 completed_at_ms: None,
124 duration_ms: None,
125 status: "running".to_string(),
126 is_error: false,
127 }),
128 RoderEvent::ToolCallCompleted(event) => self.store.upsert_tool_call(&ToolCallRecord {
129 thread_id: event.thread_id.clone(),
130 turn_id: event.turn_id.clone(),
131 tool_id: event.tool_id.clone(),
132 tool_name: event.tool_name.clone(),
133 started_at_ms: None,
134 completed_at_ms: Some(ms(event.timestamp)),
135 duration_ms: None,
136 status: if event.is_error { "error" } else { "success" }.to_string(),
139 is_error: event.is_error,
140 }),
141 _ => Ok(()),
142 }
143 }
144
145 fn record_usage(
146 &self,
147 thread_id: &str,
148 turn_id: &str,
149 usage: Option<&TokenUsage>,
150 timestamp: OffsetDateTime,
151 ) -> anyhow::Result<()> {
152 let Some(usage) = usage else {
153 return Ok(());
154 };
155 if usage.total_tokens == 0 && usage.prompt_tokens == 0 && usage.completion_tokens == 0 {
156 return Ok(());
157 }
158 self.store.upsert_token_usage(&TokenUsageRecord {
159 thread_id: thread_id.to_string(),
160 turn_id: turn_id.to_string(),
161 provider: None,
163 model: None,
164 recorded_at_ms: ms(timestamp),
165 prompt_tokens: usage.prompt_tokens,
166 completion_tokens: usage.completion_tokens,
167 total_tokens: usage.total_tokens,
168 cached_prompt_tokens: usage.cached_prompt_tokens,
169 })
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use crate::model::WorkspaceLabelMode;
177 use roder_api::events::{
178 EventSource, ThreadCreated, ToolCallCompleted, ToolCallStarted, TurnCompleted, TurnFailed,
179 TurnStarted,
180 };
181
182 fn envelope(seq: u64, event: RoderEvent) -> EventEnvelope {
183 EventEnvelope {
184 event_id: format!("event-{seq}"),
185 seq,
186 timestamp: OffsetDateTime::UNIX_EPOCH,
187 source: EventSource::Core,
188 kind: event.kind().to_string(),
189 thread_id: event.thread_id().cloned(),
190 turn_id: event.turn_id().cloned(),
191 event,
192 }
193 }
194
195 fn at(ms_value: i64) -> OffsetDateTime {
196 OffsetDateTime::from_unix_timestamp_nanos(i128::from(ms_value) * 1_000_000).unwrap()
197 }
198
199 fn usage(total: u32) -> TokenUsage {
200 TokenUsage {
201 prompt_tokens: total - 20,
202 completion_tokens: 20,
203 total_tokens: total,
204 cached_prompt_tokens: 10,
205 cache_creation_prompt_tokens: 0,
206 ..TokenUsage::default()
207 }
208 }
209
210 fn temp_store() -> (AnalyticsStore, std::path::PathBuf) {
211 let dir =
212 std::env::temp_dir().join(format!("roder-analytics-ingest-{}", uuid::Uuid::new_v4()));
213 let store = AnalyticsStore::open(
214 &AnalyticsStore::default_path(&dir),
215 WorkspaceLabelMode::FullPath,
216 )
217 .unwrap();
218 (store, dir)
219 }
220
221 pub(crate) fn fake_turn_events(
222 thread_id: &str,
223 turn_id: &str,
224 base_ms: i64,
225 ) -> Vec<EventEnvelope> {
226 vec![
227 envelope(
228 1,
229 RoderEvent::ThreadCreated(ThreadCreated {
230 thread_id: thread_id.to_string(),
231 timestamp: at(base_ms),
232 }),
233 ),
234 envelope(
235 2,
236 RoderEvent::TurnStarted(TurnStarted {
237 thread_id: thread_id.to_string(),
238 turn_id: turn_id.to_string(),
239 runtime_profile: Default::default(),
240 timestamp: at(base_ms + 10),
241 }),
242 ),
243 envelope(
244 3,
245 RoderEvent::ToolCallStarted(ToolCallStarted {
246 thread_id: thread_id.to_string(),
247 turn_id: turn_id.to_string(),
248 tool_id: "call-1".to_string(),
249 tool_name: Some("read_file".to_string()),
250 display_payload: None,
251 timestamp: at(base_ms + 100),
252 }),
253 ),
254 envelope(
255 4,
256 RoderEvent::ToolCallCompleted(ToolCallCompleted {
257 thread_id: thread_id.to_string(),
258 turn_id: turn_id.to_string(),
259 tool_id: "call-1".to_string(),
260 tool_name: Some("read_file".to_string()),
261 display_payload: None,
262 is_error: false,
263 output: Some("secret file contents".to_string()),
264 timestamp: at(base_ms + 225),
265 }),
266 ),
267 envelope(
268 5,
269 RoderEvent::TurnCompleted(TurnCompleted {
270 thread_id: thread_id.to_string(),
271 turn_id: turn_id.to_string(),
272 usage: Some(usage(120)),
273 finish_reason: Some("stop".to_string()),
274 timestamp: at(base_ms + 500),
275 }),
276 ),
277 ]
278 }
279
280 #[test]
281 fn fake_turn_produces_one_turn_one_duration_one_usage_record() {
282 let (store, dir) = temp_store();
283 let ingestor = AnalyticsIngestor::new(&store);
284 for event in fake_turn_events("t1", "u1", 10_000) {
285 ingestor.ingest_event(&event).unwrap();
286 }
287
288 let counts = store.counts().unwrap();
289 assert_eq!(counts.sessions, 1);
290 assert_eq!(counts.turns, 1);
291 assert_eq!(counts.tool_calls, 1);
292 assert_eq!(counts.token_usage, 1);
293
294 let conn = store.conn.lock().unwrap();
295 let (duration, status): (i64, String) = conn
296 .query_row("SELECT duration_ms, status FROM tool_calls", [], |row| {
297 Ok((row.get(0)?, row.get(1)?))
298 })
299 .unwrap();
300 assert_eq!(duration, 125);
301 assert_eq!(status, "success");
302 let total: i64 = conn
303 .query_row("SELECT total_tokens FROM token_usage", [], |row| row.get(0))
304 .unwrap();
305 assert_eq!(total, 120);
306 let dumped: String = conn
308 .query_row(
309 "SELECT COALESCE(GROUP_CONCAT(tool_name), '') FROM tool_calls",
310 [],
311 |row| row.get(0),
312 )
313 .unwrap();
314 assert!(!dumped.contains("secret file contents"));
315 drop(conn);
316
317 let ingestor = AnalyticsIngestor::new(&store);
319 for event in fake_turn_events("t1", "u1", 10_000) {
320 ingestor.ingest_event(&event).unwrap();
321 }
322 assert_eq!(store.counts().unwrap(), counts);
323 let _ = std::fs::remove_dir_all(&dir);
324 }
325
326 #[test]
327 fn failed_turns_and_failed_tools_are_queryable_with_error_state() {
328 let (store, dir) = temp_store();
329 let ingestor = AnalyticsIngestor::new(&store);
330 ingestor
331 .ingest_event(&envelope(
332 1,
333 RoderEvent::ToolCallCompleted(ToolCallCompleted {
334 thread_id: "t1".to_string(),
335 turn_id: "u1".to_string(),
336 tool_id: "call-err".to_string(),
337 tool_name: Some("shell".to_string()),
338 display_payload: None,
339 is_error: true,
340 output: None,
341 timestamp: at(1_000),
342 }),
343 ))
344 .unwrap();
345 ingestor
346 .ingest_event(&envelope(
347 2,
348 RoderEvent::TurnFailed(TurnFailed {
349 thread_id: "t1".to_string(),
350 turn_id: "u1".to_string(),
351 error: "provider exploded".to_string(),
352 error_kind: Some("provider".to_string()),
353 usage: Some(usage(50)),
354 timestamp: at(2_000),
355 }),
356 ))
357 .unwrap();
358
359 let conn = store.conn.lock().unwrap();
360 let (status, error_kind): (String, String) = conn
361 .query_row("SELECT status, error_kind FROM turns", [], |row| {
362 Ok((row.get(0)?, row.get(1)?))
363 })
364 .unwrap();
365 assert_eq!(status, "failed");
366 assert_eq!(error_kind, "provider");
367 let (is_error, duration): (bool, Option<i64>) = conn
370 .query_row("SELECT is_error, duration_ms FROM tool_calls", [], |row| {
371 Ok((row.get(0)?, row.get(1)?))
372 })
373 .unwrap();
374 assert!(is_error);
375 assert_eq!(duration, None);
376 drop(conn);
377 let _ = std::fs::remove_dir_all(&dir);
378 }
379}