1use super::rows::*;
2use super::*;
3
4impl Store {
5 pub fn list_events_for_session(&self, session_id: &str) -> Result<Vec<Event>> {
6 self.list_events_page(session_id, 0, i64::MAX as usize)
7 }
8
9 pub(crate) fn list_latest_events_for_session(
10 &self,
11 session_id: &str,
12 limit: usize,
13 ) -> Result<Vec<Event>> {
14 let mut stmt = self.conn.prepare(LATEST_SESSION_EVENTS_SQL)?;
15 let sql_limit = limit.min(i64::MAX as usize) as i64;
16 let rows = stmt.query_map(params![session_id, sql_limit], event_row)?;
17 rows.map(|row| row.map_err(Into::into)).collect()
18 }
19
20 pub fn get_event(&self, session_id: &str, seq: u64) -> Result<Option<Event>> {
21 let mut stmt = self.conn.prepare(
22 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
23 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
24 stop_reason, latency_ms, ttft_ms, retry_count,
25 context_used_tokens, context_max_tokens,
26 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
27 FROM events WHERE session_id = ?1 AND seq = ?2",
28 )?;
29 stmt.query_row(params![session_id, seq as i64], event_row)
30 .optional()
31 .map_err(Into::into)
32 }
33
34 pub fn search_tool_events(
35 &self,
36 workspace: &str,
37 tool: &str,
38 since_ms: Option<u64>,
39 agent: Option<&str>,
40 limit: usize,
41 ) -> Result<Vec<(String, Event)>> {
42 let mut stmt = self.conn.prepare(
43 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source, e.tool, e.tool_call_id,
44 e.tokens_in, e.tokens_out, e.reasoning_tokens, e.cost_usd_e6, e.payload,
45 e.stop_reason, e.latency_ms, e.ttft_ms, e.retry_count,
46 e.context_used_tokens, e.context_max_tokens,
47 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens,
48 s.agent
49 FROM events e JOIN sessions s ON s.id = e.session_id
50 WHERE e.tool = ?2
51 AND (s.workspace = ?1 OR NOT EXISTS (SELECT 1 FROM sessions WHERE workspace = ?1))
52 AND (?3 IS NULL OR e.ts_ms >= ?3)
53 AND (?4 IS NULL OR s.agent = ?4)
54 ORDER BY e.ts_ms DESC, e.session_id ASC, e.seq ASC
55 LIMIT ?5",
56 )?;
57 let since = since_ms.map(|v| v as i64);
58 let rows = stmt.query_map(
59 params![workspace, tool, since, agent, limit as i64],
60 search_tool_event_row,
61 )?;
62 rows.map(|r| r.map_err(anyhow::Error::from)).collect()
63 }
64
65 pub fn workspace_events(&self, workspace: &str) -> Result<Vec<(SessionRecord, Event)>> {
66 let sessions = self
67 .list_sessions(workspace)?
68 .into_iter()
69 .map(|session| (session.id.clone(), session))
70 .collect::<HashMap<_, _>>();
71 self.workspace_event_rows(workspace)?
72 .into_iter()
73 .map(|event| pair_event(&sessions, event))
74 .collect()
75 }
76
77 fn workspace_event_rows(&self, workspace: &str) -> Result<Vec<Event>> {
78 let mut stmt = self.conn.prepare(
79 "SELECT e.session_id, e.seq, e.ts_ms, COALESCE(e.ts_exact, 0), e.kind, e.source,
80 e.tool, e.tool_call_id, e.tokens_in, e.tokens_out, e.reasoning_tokens,
81 e.cost_usd_e6, e.payload, e.stop_reason, e.latency_ms, e.ttft_ms,
82 e.retry_count, e.context_used_tokens, e.context_max_tokens,
83 e.cache_creation_tokens, e.cache_read_tokens, e.system_prompt_tokens
84 FROM events e JOIN sessions s ON s.id = e.session_id
85 WHERE s.workspace = ?1
86 ORDER BY e.ts_ms ASC, e.session_id ASC, e.seq ASC",
87 )?;
88 let rows = stmt.query_map([workspace], event_row)?;
89 rows.map(|row| row.map_err(anyhow::Error::from)).collect()
90 }
91
92 pub fn list_events_page(
93 &self,
94 session_id: &str,
95 after_seq: u64,
96 limit: usize,
97 ) -> Result<Vec<Event>> {
98 let mut stmt = self.conn.prepare(
99 "SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
100 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
101 stop_reason, latency_ms, ttft_ms, retry_count,
102 context_used_tokens, context_max_tokens,
103 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
104 FROM events
105 WHERE session_id = ?1 AND seq >= ?2
106 ORDER BY seq ASC LIMIT ?3",
107 )?;
108 let rows = stmt.query_map(
109 params![
110 session_id,
111 after_seq as i64,
112 limit.min(i64::MAX as usize) as i64
113 ],
114 event_row,
115 )?;
116 let mut events = Vec::new();
117 for row in rows {
118 events.push(row?);
119 }
120 Ok(events)
121 }
122
123 pub fn last_event_seq_for_session(&self, session_id: &str) -> Result<Option<u64>> {
124 let seq = self
125 .conn
126 .query_row(
127 "SELECT MAX(seq) FROM events WHERE session_id = ?1",
128 params![session_id],
129 |r| r.get::<_, Option<i64>>(0),
130 )?
131 .map(|v| v as u64);
132 Ok(seq)
133 }
134}
135
136const LATEST_SESSION_EVENTS_SQL: &str = "
137SELECT * FROM (
138 SELECT session_id, seq, ts_ms, COALESCE(ts_exact, 0), kind, source, tool, tool_call_id,
139 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload, stop_reason, latency_ms,
140 ttft_ms, retry_count, context_used_tokens, context_max_tokens, cache_creation_tokens,
141 cache_read_tokens, system_prompt_tokens
142 FROM events WHERE session_id = ?1 ORDER BY seq DESC LIMIT ?2
143) ORDER BY seq ASC";
144
145fn pair_event(
146 sessions: &HashMap<String, SessionRecord>,
147 event: Event,
148) -> Result<(SessionRecord, Event)> {
149 let session = sessions
150 .get(&event.session_id)
151 .cloned()
152 .ok_or_else(|| anyhow::anyhow!("event session missing: {}", event.session_id))?;
153 Ok((session, event))
154}