1use super::rows::*;
2use super::*;
3
4impl Store {
5 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
6 self.list_events_page(session_id, 0, i64::MAX as usize)
7 }
8
9 pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
10 let mut stmt = self.conn.prepare(
11 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
12 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
13 stop_reason, latency_ms, ttft_ms, retry_count,
14 context_used_tokens, context_max_tokens,
15 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
16 FROM events WHERE session_id = ?1 AND seq = ?2",
17 )?;
18 stmt.query_row(params![session_id, seq as i64], event_row)
19 .optional()
20 .map_err(Into::into)
21 }
22
23 pub fn search_tool_events(
24 &self,
25 workspace: &str,
26 tool: &str,
27 since_ms: Option<u64>,
28 agent: Option<&str>,
29 limit: usize,
30 ) -> Result<Vec<(String, Event)>> {
31 let mut stmt = self.conn.prepare(
32 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
33 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
34 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
35 e.context_used_tokens, e.context_max_tokens,
36 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
37 s.agent
38 FROM events e JOIN sessions s ON s.id = e.session_id
39 WHERE e.tool = ?2
40 AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
41 AND (?3 IS NULL OR e.ts_ms >= ?3)
42 AND (?4 IS NULL OR s.agent = ?4)
43 ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
44 LIMIT ?5",
45 )?;
46 let since = since_ms.map(|v| v as i64);
47 let rows = stmt.query_map(
48 params![workspace, tool, since, agent, limit as i64],
49 search_tool_event_row,
50 )?;
51 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
52 }
53
54 pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
55 let mut out = Vec::new();
56 for session in self.list_sessions(workspace)? {
57 for event in self.list_events_for_session(&session.id)? {
58 out.push((session.clone(), event));
59 }
60 }
61 out.sort_by(|a, b| {
62 (a.1.ts_ms, &a.1.session_id, a.1.seq).cmp(&(b.1.ts_ms, &b.1.session_id, b.1.seq))
63 });
64 Ok(out)
65 }
66
67 pub fn list_events_page(
68 &self,
69 session_id: &str,
70 after_seq: u64,
71 limit: usize,
72 ) -> Result<Vec<Event>> {
73 let mut stmt = self.conn.prepare(
74 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
75 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
76 stop_reason, latency_ms, ttft_ms, retry_count,
77 context_used_tokens, context_max_tokens,
78 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
79 FROM events
80 WHERE session_id = ?1 AND seq >= ?2
81 ORDER BY seq ASC LIMIT ?3",
82 )?;
83 let rows = stmt.query_map(
84 params![
85 session_id,
86 after_seq as i64,
87 limit.min(i64::MAX as usize) as i64
88 ],
89 event_row,
90 )?;
91 let mut events = Vec::new();
92 for row in rows {
93 events.push(row?);
94 }
95 Ok(events)
96 }
97
98 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
99 let seq = self
100 .conn
101 .query_row(
102 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
103 params![session_id],
104 |r| r.get::<_, Option<i64>>(0),
105 )?
106 .map(|v| v as u64);
107 Ok(seq)
108 }
109}