Skip to main content

kaizen/store/sqlite/
event_read.rs

1use super::rows::*;
2use super::*;
3
4impl Store {
5    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
6        self.list_events_page(session_id, 0, i64::MAX as usize)
7    }
8
9    pub(crate) fn list_latest_events_for_session(
10        &self,
11        session_id: &str,
12        limit: usize,
13    ) -> Result<Vec<Event>> {
14        let mut stmt = self.conn.prepare(LATEST_SESSION_EVENTS_SQL)?;
15        let sql_limit = limit.min(i64::MAX as usize) as i64;
16        let rows = stmt.query_map(params![session_id, sql_limit], event_row)?;
17        rows.map(|row| row.map_err(Into::into)).collect()
18    }
19
20    pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
21        let mut stmt = self.conn.prepare(
22            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
23                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
24                    stop_reason, latency_ms, ttft_ms, retry_count,
25                    context_used_tokens, context_max_tokens,
26                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
27             FROM events WHERE session_id = ?1 AND seq = ?2",
28        )?;
29        stmt.query_row(params![session_id, seq as i64], event_row)
30            .optional()
31            .map_err(Into::into)
32    }
33
34    pub fn search_tool_events(
35        &self,
36        workspace: &str,
37        tool: &str,
38        since_ms: Option<u64>,
39        agent: Option<&str>,
40        limit: usize,
41    ) -> Result<Vec<(String, Event)>> {
42        let mut stmt = self.conn.prepare(
43            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
44                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
45                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
46                    e.context_used_tokens, e.context_max_tokens,
47                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
48                    s.agent
49             FROM events e JOIN sessions s ON s.id = e.session_id
50             WHERE e.tool = ?2
51               AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
52               AND (?3 IS NULL OR e.ts_ms >= ?3)
53               AND (?4 IS NULL OR s.agent = ?4)
54             ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
55             LIMIT ?5",
56        )?;
57        let since = since_ms.map(|v| v as i64);
58        let rows = stmt.query_map(
59            params![workspace, tool, since, agent, limit as i64],
60            search_tool_event_row,
61        )?;
62        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
63    }
64
65    pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
66        let sessions = self
67            .list_sessions(workspace)?
68            .into_iter()
69            .map(|session| (session.id.clone(), session))
70            .collect::<HashMap<_, _>>();
71        self.workspace_event_rows(workspace)?
72            .into_iter()
73            .map(|event| pair_event(&sessions, event))
74            .collect()
75    }
76
77    fn workspace_event_rows(&self, workspace: &str) -> Result<Vec<Event>> {
78        let mut stmt = self.conn.prepare(
79            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source,
80                    e.tool, e.tool_call_id, e.tokens_in, e.tokens_out, e.reasoning_tokens,
81                    e.cost_usd_e6, e.payload, e.stop_reason, e.latency_ms, e.ttft_ms,
82                    e.retry_count, e.context_used_tokens, e.context_max_tokens,
83                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
84             FROM events e JOIN sessions s ON s.id = e.session_id
85             WHERE s.workspace = ?1
86             ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
87        )?;
88        let rows = stmt.query_map([workspace], event_row)?;
89        rows.map(|row| row.map_err(anyhow::Error::from)).collect()
90    }
91
92    pub fn list_events_page(
93        &self,
94        session_id: &str,
95        after_seq: u64,
96        limit: usize,
97    ) -> Result<Vec<Event>> {
98        let mut stmt = self.conn.prepare(
99            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
100                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
101                    stop_reason, latency_ms, ttft_ms, retry_count,
102                    context_used_tokens, context_max_tokens,
103                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
104             FROM events
105             WHERE session_id = ?1 AND seq >= ?2
106             ORDER BY seq ASC LIMIT ?3",
107        )?;
108        let rows = stmt.query_map(
109            params![
110                session_id,
111                after_seq as i64,
112                limit.min(i64::MAX as usize) as i64
113            ],
114            event_row,
115        )?;
116        let mut events = Vec::new();
117        for row in rows {
118            events.push(row?);
119        }
120        Ok(events)
121    }
122
123    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
124        let seq = self
125            .conn
126            .query_row(
127                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
128                params![session_id],
129                |r| r.get::<_, Option<i64>>(0),
130            )?
131            .map(|v| v as u64);
132        Ok(seq)
133    }
134}
135
136const LATEST_SESSION_EVENTS_SQL: &str = "
137SELECT * FROM (
138 SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
139  tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload, stop_reason, latency_ms,
140  ttft_ms, retry_count, context_used_tokens, context_max_tokens, cache_creation_tokens,
141  cache_read_tokens, system_prompt_tokens
142 FROM events WHERE session_id = ?1 ORDER BY seq DESC LIMIT ?2
143) ORDER BY seq ASC";
144
145fn pair_event(
146    sessions: &HashMap<String, SessionRecord>,
147    event: Event,
148) -> Result<(SessionRecord, Event)> {
149    let session = sessions
150        .get(&event.session_id)
151        .cloned()
152        .ok_or_else(|| anyhow::anyhow!("event session missing: {}", event.session_id))?;
153    Ok((session, event))
154}