Skip to main content

kaizen/store/sqlite/
event_read.rs

1use super::rows::*;
2use super::*;
3
4impl Store {
5    pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
6        self.list_events_page(session_id, 0, i64::MAX as usize)
7    }
8
9    pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
10        let mut stmt = self.conn.prepare(
11            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
12                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
13                    stop_reason, latency_ms, ttft_ms, retry_count,
14                    context_used_tokens, context_max_tokens,
15                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
16             FROM events WHERE session_id = ?1 AND seq = ?2",
17        )?;
18        stmt.query_row(params![session_id, seq as i64], event_row)
19            .optional()
20            .map_err(Into::into)
21    }
22
23    pub fn search_tool_events(
24        &self,
25        workspace: &str,
26        tool: &str,
27        since_ms: Option<u64>,
28        agent: Option<&str>,
29        limit: usize,
30    ) -> Result<Vec<(String, Event)>> {
31        let mut stmt = self.conn.prepare(
32            "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
33                    e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
34                    e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
35                    e.context_used_tokens, e.context_max_tokens,
36                    e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
37                    s.agent
38             FROM events e JOIN sessions s ON s.id = e.session_id
39             WHERE e.tool = ?2
40               AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
41               AND (?3 IS NULL OR e.ts_ms >= ?3)
42               AND (?4 IS NULL OR s.agent = ?4)
43             ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
44             LIMIT ?5",
45        )?;
46        let since = since_ms.map(|v| v as i64);
47        let rows = stmt.query_map(
48            params![workspace, tool, since, agent, limit as i64],
49            search_tool_event_row,
50        )?;
51        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
52    }
53
54    pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
55        let mut out = Vec::new();
56        for session in self.list_sessions(workspace)? {
57            for event in self.list_events_for_session(&session.id)? {
58                out.push((session.clone(), event));
59            }
60        }
61        out.sort_by(|a, b| {
62            (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
63        });
64        Ok(out)
65    }
66
67    pub fn list_events_page(
68        &self,
69        session_id: &str,
70        after_seq: u64,
71        limit: usize,
72    ) -> Result<Vec<Event>> {
73        let mut stmt = self.conn.prepare(
74            "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
75                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
76                    stop_reason, latency_ms, ttft_ms, retry_count,
77                    context_used_tokens, context_max_tokens,
78                    cache_creation_tokens, cache_read_tokens, system_prompt_tokens
79             FROM events
80             WHERE session_id = ?1 AND seq >= ?2
81             ORDER BY seq ASC LIMIT ?3",
82        )?;
83        let rows = stmt.query_map(
84            params![
85                session_id,
86                after_seq as i64,
87                limit.min(i64::MAX as usize) as i64
88            ],
89            event_row,
90        )?;
91        let mut events = Vec::new();
92        for row in rows {
93            events.push(row?);
94        }
95        Ok(events)
96    }
97
98    pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
99        let seq = self
100            .conn
101            .query_row(
102                "SELECT MAX(seq) FROM events WHERE session_id = ?1",
103                params![session_id],
104                |r| r.get::<_, Option<i64>>(0),
105            )?
106            .map(|v| v as u64);
107        Ok(seq)
108    }
109}