Skip to main content

roder_usage_analytics/
ingest.rs

1//! Projection of canonical `EventEnvelope` values into analytics records.
2
3use roder_api::events::{EventEnvelope, RoderEvent};
4use roder_api::inference::TokenUsage;
5use time::OffsetDateTime;
6
7use crate::model::{SessionRecord, TokenUsageRecord, ToolCallRecord, TurnRecord};
8use crate::store::AnalyticsStore;
9
10fn ms(timestamp: OffsetDateTime) -> i64 {
11    (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
12}
13
14/// Stateless event projector. Partial pairs (start without completion or
15/// completion without start) remain visible as `running`/`partial` records
16/// — they never panic or block later events.
17pub struct AnalyticsIngestor<'a> {
18    store: &'a AnalyticsStore,
19}
20
21impl<'a> AnalyticsIngestor<'a> {
22    pub fn new(store: &'a AnalyticsStore) -> Self {
23        Self { store }
24    }
25
26    /// Projects one envelope. Unknown events are ignored. Token usage is
27    /// recorded only from terminal turn events (`TurnCompleted`/`TurnFailed`
28    /// carry the turn-total usage), so streamed per-step usage events can
29    /// never double-count a turn.
30    pub fn ingest_event(&self, envelope: &EventEnvelope) -> anyhow::Result<()> {
31        match &envelope.event {
32            RoderEvent::ThreadCreated(event) => self.store.upsert_session(&SessionRecord {
33                thread_id: event.thread_id.clone(),
34                workspace_key: None,
35                workspace_label: None,
36                provider: None,
37                model: None,
38                created_at_ms: ms(event.timestamp),
39                updated_at_ms: ms(event.timestamp),
40            }),
41            RoderEvent::TurnStarted(event) => self.store.upsert_turn(&TurnRecord {
42                thread_id: event.thread_id.clone(),
43                turn_id: event.turn_id.clone(),
44                provider: None,
45                model: None,
46                runtime_profile: Some(format!("{:?}", event.runtime_profile).to_lowercase()),
47                started_at_ms: Some(ms(event.timestamp)),
48                completed_at_ms: None,
49                status: "running".to_string(),
50                error_kind: None,
51            }),
52            RoderEvent::InferenceStarted(event) => {
53                self.store.upsert_session(&SessionRecord {
54                    thread_id: event.thread_id.clone(),
55                    workspace_key: None,
56                    workspace_label: None,
57                    provider: Some(event.model.provider.clone()),
58                    model: Some(event.model.model.clone()),
59                    created_at_ms: ms(event.timestamp),
60                    updated_at_ms: ms(event.timestamp),
61                })?;
62                self.store.upsert_turn(&TurnRecord {
63                    thread_id: event.thread_id.clone(),
64                    turn_id: event.turn_id.clone(),
65                    provider: Some(event.model.provider.clone()),
66                    model: Some(event.model.model.clone()),
67                    runtime_profile: None,
68                    started_at_ms: Some(ms(event.timestamp)),
69                    completed_at_ms: None,
70                    status: "running".to_string(),
71                    error_kind: None,
72                })
73            }
74            RoderEvent::TurnCompleted(event) => {
75                self.store.upsert_turn(&TurnRecord {
76                    thread_id: event.thread_id.clone(),
77                    turn_id: event.turn_id.clone(),
78                    provider: None,
79                    model: None,
80                    runtime_profile: None,
81                    started_at_ms: None,
82                    completed_at_ms: Some(ms(event.timestamp)),
83                    status: "completed".to_string(),
84                    error_kind: None,
85                })?;
86                self.record_usage(
87                    &event.thread_id,
88                    &event.turn_id,
89                    event.usage.as_ref(),
90                    event.timestamp,
91                )
92            }
93            RoderEvent::TurnFailed(event) => {
94                self.store.upsert_turn(&TurnRecord {
95                    thread_id: event.thread_id.clone(),
96                    turn_id: event.turn_id.clone(),
97                    provider: None,
98                    model: None,
99                    runtime_profile: None,
100                    started_at_ms: None,
101                    completed_at_ms: Some(ms(event.timestamp)),
102                    status: "failed".to_string(),
103                    error_kind: Some(
104                        event
105                            .error_kind
106                            .clone()
107                            .unwrap_or_else(|| "unknown".to_string()),
108                    ),
109                })?;
110                self.record_usage(
111                    &event.thread_id,
112                    &event.turn_id,
113                    event.usage.as_ref(),
114                    event.timestamp,
115                )
116            }
117            RoderEvent::ToolCallStarted(event) => self.store.upsert_tool_call(&ToolCallRecord {
118                thread_id: event.thread_id.clone(),
119                turn_id: event.turn_id.clone(),
120                tool_id: event.tool_id.clone(),
121                tool_name: event.tool_name.clone(),
122                started_at_ms: Some(ms(event.timestamp)),
123                completed_at_ms: None,
124                duration_ms: None,
125                status: "running".to_string(),
126                is_error: false,
127            }),
128            RoderEvent::ToolCallCompleted(event) => self.store.upsert_tool_call(&ToolCallRecord {
129                thread_id: event.thread_id.clone(),
130                turn_id: event.turn_id.clone(),
131                tool_id: event.tool_id.clone(),
132                tool_name: event.tool_name.clone(),
133                started_at_ms: None,
134                completed_at_ms: Some(ms(event.timestamp)),
135                duration_ms: None,
136                // Without a matching start event the record stays visibly
137                // partial; the store upgrades it when both halves exist.
138                status: if event.is_error { "error" } else { "success" }.to_string(),
139                is_error: event.is_error,
140            }),
141            _ => Ok(()),
142        }
143    }
144
145    fn record_usage(
146        &self,
147        thread_id: &str,
148        turn_id: &str,
149        usage: Option<&TokenUsage>,
150        timestamp: OffsetDateTime,
151    ) -> anyhow::Result<()> {
152        let Some(usage) = usage else {
153            return Ok(());
154        };
155        if usage.total_tokens == 0 && usage.prompt_tokens == 0 && usage.completion_tokens == 0 {
156            return Ok(());
157        }
158        self.store.upsert_token_usage(&TokenUsageRecord {
159            thread_id: thread_id.to_string(),
160            turn_id: turn_id.to_string(),
161            // Provider/model are joined from `turns` at query time.
162            provider: None,
163            model: None,
164            recorded_at_ms: ms(timestamp),
165            prompt_tokens: usage.prompt_tokens,
166            completion_tokens: usage.completion_tokens,
167            total_tokens: usage.total_tokens,
168            cached_prompt_tokens: usage.cached_prompt_tokens,
169        })
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use crate::model::WorkspaceLabelMode;
177    use roder_api::events::{
178        EventSource, ThreadCreated, ToolCallCompleted, ToolCallStarted, TurnCompleted, TurnFailed,
179        TurnStarted,
180    };
181
182    fn envelope(seq: u64, event: RoderEvent) -> EventEnvelope {
183        EventEnvelope {
184            event_id: format!("event-{seq}"),
185            seq,
186            timestamp: OffsetDateTime::UNIX_EPOCH,
187            source: EventSource::Core,
188            kind: event.kind().to_string(),
189            thread_id: event.thread_id().cloned(),
190            turn_id: event.turn_id().cloned(),
191            event,
192        }
193    }
194
195    fn at(ms_value: i64) -> OffsetDateTime {
196        OffsetDateTime::from_unix_timestamp_nanos(i128::from(ms_value) * 1_000_000).unwrap()
197    }
198
199    fn usage(total: u32) -> TokenUsage {
200        TokenUsage {
201            prompt_tokens: total - 20,
202            completion_tokens: 20,
203            total_tokens: total,
204            cached_prompt_tokens: 10,
205            cache_creation_prompt_tokens: 0,
206            ..TokenUsage::default()
207        }
208    }
209
210    fn temp_store() -> (AnalyticsStore, std::path::PathBuf) {
211        let dir =
212            std::env::temp_dir().join(format!("roder-analytics-ingest-{}", uuid::Uuid::new_v4()));
213        let store = AnalyticsStore::open(
214            &AnalyticsStore::default_path(&dir),
215            WorkspaceLabelMode::FullPath,
216        )
217        .unwrap();
218        (store, dir)
219    }
220
221    pub(crate) fn fake_turn_events(
222        thread_id: &str,
223        turn_id: &str,
224        base_ms: i64,
225    ) -> Vec<EventEnvelope> {
226        vec![
227            envelope(
228                1,
229                RoderEvent::ThreadCreated(ThreadCreated {
230                    thread_id: thread_id.to_string(),
231                    timestamp: at(base_ms),
232                }),
233            ),
234            envelope(
235                2,
236                RoderEvent::TurnStarted(TurnStarted {
237                    thread_id: thread_id.to_string(),
238                    turn_id: turn_id.to_string(),
239                    runtime_profile: Default::default(),
240                    timestamp: at(base_ms + 10),
241                }),
242            ),
243            envelope(
244                3,
245                RoderEvent::ToolCallStarted(ToolCallStarted {
246                    thread_id: thread_id.to_string(),
247                    turn_id: turn_id.to_string(),
248                    tool_id: "call-1".to_string(),
249                    tool_name: Some("read_file".to_string()),
250                    display_payload: None,
251                    timestamp: at(base_ms + 100),
252                }),
253            ),
254            envelope(
255                4,
256                RoderEvent::ToolCallCompleted(ToolCallCompleted {
257                    thread_id: thread_id.to_string(),
258                    turn_id: turn_id.to_string(),
259                    tool_id: "call-1".to_string(),
260                    tool_name: Some("read_file".to_string()),
261                    display_payload: None,
262                    is_error: false,
263                    output: Some("secret file contents".to_string()),
264                    timestamp: at(base_ms + 225),
265                }),
266            ),
267            envelope(
268                5,
269                RoderEvent::TurnCompleted(TurnCompleted {
270                    thread_id: thread_id.to_string(),
271                    turn_id: turn_id.to_string(),
272                    usage: Some(usage(120)),
273                    finish_reason: Some("stop".to_string()),
274                    timestamp: at(base_ms + 500),
275                }),
276            ),
277        ]
278    }
279
280    #[test]
281    fn fake_turn_produces_one_turn_one_duration_one_usage_record() {
282        let (store, dir) = temp_store();
283        let ingestor = AnalyticsIngestor::new(&store);
284        for event in fake_turn_events("t1", "u1", 10_000) {
285            ingestor.ingest_event(&event).unwrap();
286        }
287
288        let counts = store.counts().unwrap();
289        assert_eq!(counts.sessions, 1);
290        assert_eq!(counts.turns, 1);
291        assert_eq!(counts.tool_calls, 1);
292        assert_eq!(counts.token_usage, 1);
293
294        let conn = store.conn.lock().unwrap();
295        let (duration, status): (i64, String) = conn
296            .query_row("SELECT duration_ms, status FROM tool_calls", [], |row| {
297                Ok((row.get(0)?, row.get(1)?))
298            })
299            .unwrap();
300        assert_eq!(duration, 125);
301        assert_eq!(status, "success");
302        let total: i64 = conn
303            .query_row("SELECT total_tokens FROM token_usage", [], |row| row.get(0))
304            .unwrap();
305        assert_eq!(total, 120);
306        // Tool output bodies never reach the database.
307        let dumped: String = conn
308            .query_row(
309                "SELECT COALESCE(GROUP_CONCAT(tool_name), '') FROM tool_calls",
310                [],
311                |row| row.get(0),
312            )
313            .unwrap();
314        assert!(!dumped.contains("secret file contents"));
315        drop(conn);
316
317        // Replaying the same events does not double-count anything.
318        let ingestor = AnalyticsIngestor::new(&store);
319        for event in fake_turn_events("t1", "u1", 10_000) {
320            ingestor.ingest_event(&event).unwrap();
321        }
322        assert_eq!(store.counts().unwrap(), counts);
323        let _ = std::fs::remove_dir_all(&dir);
324    }
325
326    #[test]
327    fn failed_turns_and_failed_tools_are_queryable_with_error_state() {
328        let (store, dir) = temp_store();
329        let ingestor = AnalyticsIngestor::new(&store);
330        ingestor
331            .ingest_event(&envelope(
332                1,
333                RoderEvent::ToolCallCompleted(ToolCallCompleted {
334                    thread_id: "t1".to_string(),
335                    turn_id: "u1".to_string(),
336                    tool_id: "call-err".to_string(),
337                    tool_name: Some("shell".to_string()),
338                    display_payload: None,
339                    is_error: true,
340                    output: None,
341                    timestamp: at(1_000),
342                }),
343            ))
344            .unwrap();
345        ingestor
346            .ingest_event(&envelope(
347                2,
348                RoderEvent::TurnFailed(TurnFailed {
349                    thread_id: "t1".to_string(),
350                    turn_id: "u1".to_string(),
351                    error: "provider exploded".to_string(),
352                    error_kind: Some("provider".to_string()),
353                    usage: Some(usage(50)),
354                    timestamp: at(2_000),
355                }),
356            ))
357            .unwrap();
358
359        let conn = store.conn.lock().unwrap();
360        let (status, error_kind): (String, String) = conn
361            .query_row("SELECT status, error_kind FROM turns", [], |row| {
362                Ok((row.get(0)?, row.get(1)?))
363            })
364            .unwrap();
365        assert_eq!(status, "failed");
366        assert_eq!(error_kind, "provider");
367        // Tool completion without a start event remains a visible partial
368        // record (no duration, but counted with its error state).
369        let (is_error, duration): (bool, Option<i64>) = conn
370            .query_row("SELECT is_error, duration_ms FROM tool_calls", [], |row| {
371                Ok((row.get(0)?, row.get(1)?))
372            })
373            .unwrap();
374        assert!(is_error);
375        assert_eq!(duration, None);
376        drop(conn);
377        let _ = std::fs::remove_dir_all(&dir);
378    }
379}