1use std::path::Path;
12use std::time::Duration;
13
14use chrono::Utc;
15use mi6_core::{
16 Event, EventQuery, FilePosition, GitBranchInfo, Order, Session, SessionQuery, Storage,
17 StorageError, StorageStats, StorageStatsQuery,
18};
19use rusqlite::{Connection, params};
20
21use crate::query_builder::QueryBuilder;
22use crate::row_parsing::row_to_event;
23use crate::session;
24use crate::sql;
25
26impl Storage for super::SqliteStorage {
28 fn insert(&self, event: &Event) -> Result<i64, StorageError> {
29 self.conn
31 .execute(
32 sql::INSERT_EVENT,
33 params![
34 event.machine_id,
35 event.timestamp.timestamp_millis(),
36 event.event_type.to_string(),
37 event.session_id,
38 event.framework,
39 event.tool_use_id,
40 event.spawned_agent_id,
41 event.tool_name,
42 event.subagent_type,
43 event.permission_mode,
44 event.transcript_path,
45 event.pid,
46 event.cwd,
47 event.git_branch,
48 event.model,
49 event.tokens_input,
50 event.tokens_output,
51 event.tokens_cache_read,
52 event.tokens_cache_write,
53 event.cost_usd,
54 event.duration_ms,
55 event.payload,
56 event.metadata,
57 event.source,
58 event.is_sidechain,
59 ],
60 )
61 .map_err(|e| StorageError::Query(Box::new(e)))?;
62
63 let event_id = self.conn.last_insert_rowid();
64
65 session::update_for_event(&self.conn, event)?;
67
68 Ok(event_id)
69 }
70
71 fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StorageError> {
72 let mut qb = QueryBuilder::new(
73 "SELECT id, machine_id, timestamp, event_type, session_id, framework, tool_use_id, spawned_agent_id, tool_name, subagent_type, permission_mode, transcript_path, pid, cwd, git_branch, model, tokens_input, tokens_output, tokens_cache_read, tokens_cache_write, cost_usd, duration_ms, payload, metadata, source, is_sidechain FROM events",
74 );
75
76 if let Some(ref session_ids) = query.session_ids {
78 if !qb.and_in("session_id", session_ids) {
80 return Ok(vec![]);
81 }
82 } else if let Some(ref session_id) = query.session_id {
83 qb.and_eq("session_id", session_id.clone());
84 }
85 if let Some(ref event_type) = query.event_type {
86 qb.and_eq_upper("event_type", event_type.clone());
87 }
88 if let Some(ref permission_mode) = query.permission_mode {
89 qb.and_eq("permission_mode", permission_mode.clone());
90 }
91 if let Some(ref framework) = query.framework {
92 qb.and_eq("framework", framework.clone());
93 }
94 if let Some(after_ts) = query.after_ts {
95 qb.and_gt("timestamp", after_ts.timestamp_millis());
96 }
97 if let Some(before_ts) = query.before_ts {
98 qb.and_lt("timestamp", before_ts.timestamp_millis());
99 }
100 if let Some(after_id) = query.after_id {
101 qb.and_gt("id", after_id);
102 }
103
104 if query.api_requests_only {
106 qb.and_eq_upper("event_type", "ApiRequest".to_string());
107 } else if query.exclude_api_requests {
108 qb.and_neq_upper("event_type", "ApiRequest".to_string());
109 }
110
111 let orders_by_id = query.orders_by_id();
113 let direction = query.effective_direction();
114 let order_clause = match (orders_by_id, direction) {
115 (true, Order::Asc) => "id ASC",
116 (true, Order::Desc) => "id DESC",
117 (false, Order::Asc) => "timestamp ASC",
118 (false, Order::Desc) => "timestamp DESC",
119 };
120 qb.order_by(order_clause);
121
122 if let Some(limit) = query.limit {
124 qb.limit(limit);
125 }
126
127 let (sql, params) = qb.build();
128
129 let mut stmt = self
130 .conn
131 .prepare(&sql)
132 .map_err(|e| StorageError::Query(Box::new(e)))?;
133
134 let events = stmt
135 .query_map(params.as_slice(), row_to_event)
136 .map_err(|e| StorageError::Query(Box::new(e)))?
137 .collect::<Result<Vec<_>, _>>()
138 .map_err(|e| StorageError::Query(Box::new(e)))?;
139
140 Ok(events)
141 }
142
143 fn gc(&self, retention: Duration) -> Result<usize, StorageError> {
144 let chrono_retention =
145 chrono::Duration::from_std(retention).map_err(|e| StorageError::Query(Box::new(e)))?;
146 let cutoff = (Utc::now() - chrono_retention).timestamp_millis();
147
148 let deleted = self
150 .conn
151 .execute("DELETE FROM events WHERE timestamp < ?1", [cutoff])
152 .map_err(|e| StorageError::Query(Box::new(e)))?;
153
154 Ok(deleted)
155 }
156
157 fn count_expired(&self, retention: Duration) -> Result<usize, StorageError> {
158 let chrono_retention =
159 chrono::Duration::from_std(retention).map_err(|e| StorageError::Query(Box::new(e)))?;
160 let cutoff = (Utc::now() - chrono_retention).timestamp_millis();
161
162 let count: usize = self
163 .conn
164 .query_row(
165 "SELECT COUNT(*) FROM events WHERE timestamp < ?1",
166 [cutoff],
167 |row| row.get(0),
168 )
169 .map_err(|e| StorageError::Query(Box::new(e)))?;
170
171 Ok(count)
172 }
173
174 fn count(&self) -> Result<usize, StorageError> {
175 let count: usize = self
176 .conn
177 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
178 .map_err(|e| StorageError::Query(Box::new(e)))?;
179
180 Ok(count)
181 }
182
183 fn list_sessions(&self, query: &SessionQuery) -> Result<Vec<Session>, StorageError> {
184 session::list(&self.conn, query)
185 }
186
187 fn get_session(&self, session_id: &str) -> Result<Option<Session>, StorageError> {
188 session::get(&self.conn, session_id)
189 }
190
191 fn get_session_by_key(
192 &self,
193 machine_id: &str,
194 session_id: &str,
195 ) -> Result<Option<Session>, StorageError> {
196 session::get_by_key(&self.conn, machine_id, session_id)
197 }
198
199 fn get_session_by_pid(&self, pid: i32) -> Result<Option<Session>, StorageError> {
200 session::get_by_pid(&self.conn, pid)
201 }
202
203 fn update_session_git_info(
204 &self,
205 session_id: &str,
206 git_info: &GitBranchInfo,
207 ) -> Result<bool, StorageError> {
208 session::update_git_info(&self.conn, session_id, git_info)
209 }
210
211 fn storage_stats(&self, query: &StorageStatsQuery) -> Result<StorageStats, StorageError> {
212 storage_stats(&self.conn, query)
213 }
214
215 fn get_transcript_position(
216 &self,
217 path: &std::path::Path,
218 ) -> Result<Option<FilePosition>, StorageError> {
219 get_transcript_position(&self.conn, path)
220 }
221
222 fn set_transcript_position(
223 &self,
224 path: &std::path::Path,
225 position: &FilePosition,
226 ) -> Result<(), StorageError> {
227 set_transcript_position(&self.conn, path, position)
228 }
229
230 fn event_exists_by_uuid(&self, session_id: &str, uuid: &str) -> Result<bool, StorageError> {
231 event_exists_by_uuid(&self.conn, session_id, uuid)
232 }
233
234 fn query_transcript_positions(&self) -> Result<Vec<(String, FilePosition)>, StorageError> {
235 query_transcript_positions(&self.conn)
236 }
237}
238
239pub(crate) fn storage_stats(
241 conn: &Connection,
242 query: &StorageStatsQuery,
243) -> Result<StorageStats, StorageError> {
244 let mut qb = QueryBuilder::new(
246 r"SELECT
247 COUNT(*) as session_count,
248 COUNT(CASE WHEN last_ended_at IS NULL THEN 1 END) as active_count,
249 COALESCE(SUM(tokens_input + tokens_output + tokens_cache_read + tokens_cache_write), 0) as total_tokens,
250 COALESCE(SUM(cost_usd), 0.0) as total_cost,
251 COALESCE(SUM(api_request_count), 0) as total_requests
252 FROM sessions",
253 );
254
255 if query.active_only {
257 qb.and_is_null("last_ended_at");
258 }
259 if let Some(ref framework) = query.framework {
260 qb.and_eq("framework", framework.clone());
261 }
262
263 let (sql, params) = qb.build();
264
265 conn.query_row(&sql, params.as_slice(), |row| {
266 Ok(StorageStats {
267 session_count: row.get::<_, i64>(0)? as u32,
268 active_session_count: row.get::<_, i64>(1)? as u32,
269 total_tokens: row.get(2)?,
270 total_cost_usd: row.get(3)?,
271 total_api_requests: row.get::<_, i64>(4)? as u32,
272 })
273 })
274 .map_err(|e| StorageError::Query(Box::new(e)))
275}
276
277pub(crate) fn get_transcript_position(
281 conn: &Connection,
282 path: &Path,
283) -> Result<Option<FilePosition>, StorageError> {
284 let path_str = path.to_string_lossy();
285
286 let result = conn.query_row(
287 "SELECT byte_offset, line_number, last_uuid FROM transcript_positions WHERE file_path = ?1",
288 [&path_str],
289 |row| {
290 Ok(FilePosition {
291 offset: row.get::<_, i64>(0)? as u64,
292 line_number: row.get::<_, i64>(1)? as u64,
293 last_uuid: row.get(2)?,
294 })
295 },
296 );
297
298 match result {
299 Ok(pos) => Ok(Some(pos)),
300 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
301 Err(e) => Err(StorageError::Query(Box::new(e))),
302 }
303}
304
305pub(crate) fn set_transcript_position(
309 conn: &Connection,
310 path: &Path,
311 position: &FilePosition,
312) -> Result<(), StorageError> {
313 let path_str = path.to_string_lossy();
314 let now = std::time::SystemTime::now()
315 .duration_since(std::time::UNIX_EPOCH)
316 .map(|d| d.as_millis() as i64)
317 .unwrap_or(0);
318
319 conn.execute(
320 r"INSERT INTO transcript_positions (file_path, byte_offset, line_number, last_uuid, updated_at)
321 VALUES (?1, ?2, ?3, ?4, ?5)
322 ON CONFLICT (file_path) DO UPDATE SET
323 byte_offset = excluded.byte_offset,
324 line_number = excluded.line_number,
325 last_uuid = excluded.last_uuid,
326 updated_at = excluded.updated_at",
327 params![
328 path_str,
329 position.offset as i64,
330 position.line_number as i64,
331 position.last_uuid,
332 now,
333 ],
334 )
335 .map_err(|e| StorageError::Query(Box::new(e)))?;
336
337 Ok(())
338}
339
340pub(crate) fn event_exists_by_uuid(
344 conn: &Connection,
345 session_id: &str,
346 uuid: &str,
347) -> Result<bool, StorageError> {
348 let pattern = format!("%\"uuid\":\"{}\"%", uuid);
350
351 let count: i64 = conn
352 .query_row(
353 "SELECT COUNT(*) FROM events WHERE session_id = ?1 AND metadata LIKE ?2",
354 params![session_id, pattern],
355 |row| row.get(0),
356 )
357 .map_err(|e| StorageError::Query(Box::new(e)))?;
358
359 Ok(count > 0)
360}
361
362pub(crate) fn query_transcript_positions(
366 conn: &Connection,
367) -> Result<Vec<(String, FilePosition)>, StorageError> {
368 let mut stmt = conn
369 .prepare(
370 "SELECT file_path, byte_offset, line_number, last_uuid FROM transcript_positions ORDER BY file_path",
371 )
372 .map_err(|e| StorageError::Query(Box::new(e)))?;
373
374 let positions = stmt
375 .query_map([], |row| {
376 let path: String = row.get(0)?;
377 let position = FilePosition {
378 offset: row.get::<_, i64>(1)? as u64,
379 line_number: row.get::<_, i64>(2)? as u64,
380 last_uuid: row.get(3)?,
381 };
382 Ok((path, position))
383 })
384 .map_err(|e| StorageError::Query(Box::new(e)))?
385 .collect::<Result<Vec<_>, _>>()
386 .map_err(|e| StorageError::Query(Box::new(e)))?;
387
388 Ok(positions)
389}