1use std::path::Path;
12use std::time::Duration;
13
14use chrono::Utc;
15use mi6_core::{
16 Event, EventQuery, FilePosition, GitBranchInfo, Order, Session, SessionQuery, SessionStatus,
17 Storage, StorageError, StorageStats, StorageStatsQuery,
18};
19use rusqlite::{Connection, params};
20
21use crate::query_builder::QueryBuilder;
22use crate::row_parsing::row_to_event;
23use crate::session;
24use crate::sql;
25
26impl Storage for super::SqliteStorage {
28 fn insert(&self, event: &Event) -> Result<i64, StorageError> {
29 let tx = self
39 .conn
40 .unchecked_transaction()
41 .map_err(|e| StorageError::Query(Box::new(e)))?;
42
43 tx.execute(
45 sql::INSERT_EVENT,
46 params![
47 event.machine_id,
48 event.timestamp.timestamp_millis(),
49 event.event_type.to_string(),
50 event.session_id,
51 event.framework,
52 event.tool_use_id,
53 event.spawned_agent_id,
54 event.tool_name,
55 event.subagent_type,
56 event.permission_mode,
57 event.transcript_path,
58 event.pid,
59 event.cwd,
60 event.git_branch,
61 event.model,
62 event.tokens_input,
63 event.tokens_output,
64 event.tokens_cache_read,
65 event.tokens_cache_write,
66 event.cost_usd,
67 event.duration_ms,
68 event.payload,
69 event.metadata,
70 event.source,
71 event.is_sidechain,
72 ],
73 )
74 .map_err(|e| StorageError::Query(Box::new(e)))?;
75
76 let event_id = tx.last_insert_rowid();
77
78 session::update_for_event(&tx, event)?;
80
81 tx.commit().map_err(|e| StorageError::Query(Box::new(e)))?;
83
84 Ok(event_id)
85 }
86
87 fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StorageError> {
88 let mut qb = QueryBuilder::new(
89 "SELECT id, machine_id, timestamp, event_type, session_id, framework, tool_use_id, spawned_agent_id, tool_name, subagent_type, permission_mode, transcript_path, pid, cwd, git_branch, model, tokens_input, tokens_output, tokens_cache_read, tokens_cache_write, cost_usd, duration_ms, payload, metadata, source, is_sidechain FROM events",
90 );
91
92 if let Some(ref session_ids) = query.session_ids {
94 if !qb.and_in("session_id", session_ids) {
96 return Ok(vec![]);
97 }
98 } else if let Some(ref session_id) = query.session_id {
99 qb.and_eq("session_id", session_id.clone());
100 }
101 if let Some(ref event_type) = query.event_type {
102 qb.and_eq_upper("event_type", event_type.clone());
103 }
104 if let Some(ref permission_mode) = query.permission_mode {
105 qb.and_eq("permission_mode", permission_mode.clone());
106 }
107 if let Some(ref framework) = query.framework {
108 qb.and_eq("framework", framework.clone());
109 }
110 if let Some(after_ts) = query.after_ts {
111 qb.and_gt("timestamp", after_ts.timestamp_millis());
112 }
113 if let Some(before_ts) = query.before_ts {
114 qb.and_lt("timestamp", before_ts.timestamp_millis());
115 }
116 if let Some(after_id) = query.after_id {
117 qb.and_gt("id", after_id);
118 }
119
120 if query.api_requests_only {
122 qb.and_eq_upper("event_type", "ApiRequest".to_string());
123 } else if query.exclude_api_requests {
124 qb.and_neq_upper("event_type", "ApiRequest".to_string());
125 }
126
127 let orders_by_id = query.orders_by_id();
129 let direction = query.effective_direction();
130 let order_clause = match (orders_by_id, direction) {
131 (true, Order::Asc) => "id ASC",
132 (true, Order::Desc) => "id DESC",
133 (false, Order::Asc) => "timestamp ASC",
134 (false, Order::Desc) => "timestamp DESC",
135 };
136 qb.order_by(order_clause);
137
138 if let Some(limit) = query.limit {
140 qb.limit(limit);
141 }
142
143 let (sql, params) = qb.build();
144
145 let mut stmt = self
146 .conn
147 .prepare(&sql)
148 .map_err(|e| StorageError::Query(Box::new(e)))?;
149
150 let events = stmt
151 .query_map(params.as_slice(), row_to_event)
152 .map_err(|e| StorageError::Query(Box::new(e)))?
153 .collect::<Result<Vec<_>, _>>()
154 .map_err(|e| StorageError::Query(Box::new(e)))?;
155
156 Ok(events)
157 }
158
159 fn gc(&self, retention: Duration) -> Result<usize, StorageError> {
160 let chrono_retention =
161 chrono::Duration::from_std(retention).map_err(|e| StorageError::Query(Box::new(e)))?;
162 let cutoff = (Utc::now() - chrono_retention).timestamp_millis();
163
164 let deleted = self
166 .conn
167 .execute("DELETE FROM events WHERE timestamp < ?1", [cutoff])
168 .map_err(|e| StorageError::Query(Box::new(e)))?;
169
170 Ok(deleted)
171 }
172
173 fn count_expired(&self, retention: Duration) -> Result<usize, StorageError> {
174 let chrono_retention =
175 chrono::Duration::from_std(retention).map_err(|e| StorageError::Query(Box::new(e)))?;
176 let cutoff = (Utc::now() - chrono_retention).timestamp_millis();
177
178 let count: usize = self
179 .conn
180 .query_row(
181 "SELECT COUNT(*) FROM events WHERE timestamp < ?1",
182 [cutoff],
183 |row| row.get(0),
184 )
185 .map_err(|e| StorageError::Query(Box::new(e)))?;
186
187 Ok(count)
188 }
189
190 fn count(&self) -> Result<usize, StorageError> {
191 let count: usize = self
192 .conn
193 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
194 .map_err(|e| StorageError::Query(Box::new(e)))?;
195
196 Ok(count)
197 }
198
199 fn list_sessions(&self, query: &SessionQuery) -> Result<Vec<Session>, StorageError> {
200 session::list(&self.conn, query)
201 }
202
203 fn get_session(&self, session_id: &str) -> Result<Option<Session>, StorageError> {
204 session::get(&self.conn, session_id)
205 }
206
207 fn get_session_by_key(
208 &self,
209 machine_id: &str,
210 session_id: &str,
211 ) -> Result<Option<Session>, StorageError> {
212 session::get_by_key(&self.conn, machine_id, session_id)
213 }
214
215 fn get_session_by_pid(&self, pid: i32) -> Result<Option<Session>, StorageError> {
216 session::get_by_pid(&self.conn, pid)
217 }
218
219 fn update_session_git_info(
220 &self,
221 session_id: &str,
222 git_info: &GitBranchInfo,
223 ) -> Result<bool, StorageError> {
224 session::update_git_info(&self.conn, session_id, git_info)
225 }
226
227 fn update_session_github_repo(
228 &self,
229 session_id: &str,
230 github_repo: &str,
231 ) -> Result<bool, StorageError> {
232 session::update_github_repo(&self.conn, session_id, github_repo)
233 }
234
235 fn update_session_transcript_path(
236 &self,
237 machine_id: &str,
238 session_id: &str,
239 transcript_path: &str,
240 ) -> Result<bool, StorageError> {
241 session::update_transcript_path(&self.conn, machine_id, session_id, transcript_path)
242 }
243
244 fn update_session_prompt(
245 &self,
246 machine_id: &str,
247 session_id: &str,
248 prompt: &str,
249 ) -> Result<bool, StorageError> {
250 session::update_prompt(&self.conn, machine_id, session_id, prompt)
251 }
252
253 fn update_amp_session_status(
254 &self,
255 machine_id: &str,
256 session_id: &str,
257 timestamp: i64,
258 status: SessionStatus,
259 ) -> Result<bool, StorageError> {
260 session::update_amp_status(&self.conn, machine_id, session_id, timestamp, status)
261 }
262
263 fn upsert_session_git_context(
264 &self,
265 session_id: &str,
266 machine_id: &str,
267 framework: &str,
268 timestamp: i64,
269 local_git_dir: Option<&str>,
270 github_repo: Option<&str>,
271 ) -> Result<(), StorageError> {
272 session::upsert_git_context(
273 &self.conn,
274 session_id,
275 machine_id,
276 framework,
277 timestamp,
278 local_git_dir,
279 github_repo,
280 )
281 }
282
283 fn storage_stats(&self, query: &StorageStatsQuery) -> Result<StorageStats, StorageError> {
284 storage_stats(&self.conn, query)
285 }
286
287 fn get_transcript_position(
288 &self,
289 path: &std::path::Path,
290 ) -> Result<Option<FilePosition>, StorageError> {
291 get_transcript_position(&self.conn, path)
292 }
293
294 fn set_transcript_position(
295 &self,
296 path: &std::path::Path,
297 position: &FilePosition,
298 ) -> Result<(), StorageError> {
299 set_transcript_position(&self.conn, path, position)
300 }
301
302 fn event_exists_by_uuid(&self, session_id: &str, uuid: &str) -> Result<bool, StorageError> {
303 event_exists_by_uuid(&self.conn, session_id, uuid)
304 }
305
306 fn query_transcript_positions(&self) -> Result<Vec<(String, FilePosition)>, StorageError> {
307 query_transcript_positions(&self.conn)
308 }
309}
310
311pub(crate) fn storage_stats(
313 conn: &Connection,
314 query: &StorageStatsQuery,
315) -> Result<StorageStats, StorageError> {
316 let mut qb = QueryBuilder::new(
318 r"SELECT
319 COUNT(*) as session_count,
320 COUNT(CASE WHEN last_ended_at IS NULL THEN 1 END) as active_count,
321 COALESCE(SUM(tokens_input + tokens_output + tokens_cache_read + tokens_cache_write), 0) as total_tokens,
322 COALESCE(SUM(cost_usd), 0.0) as total_cost,
323 COALESCE(SUM(api_request_count), 0) as total_requests
324 FROM sessions",
325 );
326
327 if query.active_only {
329 qb.and_is_null("last_ended_at");
330 }
331 if let Some(ref framework) = query.framework {
332 qb.and_eq("framework", framework.clone());
333 }
334
335 let (sql, params) = qb.build();
336
337 conn.query_row(&sql, params.as_slice(), |row| {
338 Ok(StorageStats {
339 session_count: row.get::<_, i64>(0)? as u32,
340 active_session_count: row.get::<_, i64>(1)? as u32,
341 total_tokens: row.get(2)?,
342 total_cost_usd: row.get(3)?,
343 total_api_requests: row.get::<_, i64>(4)? as u32,
344 })
345 })
346 .map_err(|e| StorageError::Query(Box::new(e)))
347}
348
349pub(crate) fn get_transcript_position(
353 conn: &Connection,
354 path: &Path,
355) -> Result<Option<FilePosition>, StorageError> {
356 let path_str = path.to_string_lossy();
357
358 let result = conn.query_row(
359 "SELECT byte_offset, line_number, last_uuid FROM transcript_positions WHERE file_path = ?1",
360 [&path_str],
361 |row| {
362 Ok(FilePosition {
363 offset: row.get::<_, i64>(0)? as u64,
364 line_number: row.get::<_, i64>(1)? as u64,
365 last_uuid: row.get(2)?,
366 })
367 },
368 );
369
370 match result {
371 Ok(pos) => Ok(Some(pos)),
372 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
373 Err(e) => Err(StorageError::Query(Box::new(e))),
374 }
375}
376
377pub(crate) fn set_transcript_position(
381 conn: &Connection,
382 path: &Path,
383 position: &FilePosition,
384) -> Result<(), StorageError> {
385 let path_str = path.to_string_lossy();
386 let now = std::time::SystemTime::now()
387 .duration_since(std::time::UNIX_EPOCH)
388 .map(|d| d.as_millis() as i64)
389 .unwrap_or(0);
390
391 conn.execute(
392 r"INSERT INTO transcript_positions (file_path, byte_offset, line_number, last_uuid, updated_at)
393 VALUES (?1, ?2, ?3, ?4, ?5)
394 ON CONFLICT (file_path) DO UPDATE SET
395 byte_offset = excluded.byte_offset,
396 line_number = excluded.line_number,
397 last_uuid = excluded.last_uuid,
398 updated_at = excluded.updated_at",
399 params![
400 path_str,
401 position.offset as i64,
402 position.line_number as i64,
403 position.last_uuid,
404 now,
405 ],
406 )
407 .map_err(|e| StorageError::Query(Box::new(e)))?;
408
409 Ok(())
410}
411
412pub(crate) fn event_exists_by_uuid(
416 conn: &Connection,
417 session_id: &str,
418 uuid: &str,
419) -> Result<bool, StorageError> {
420 let pattern = format!("%\"uuid\":\"{}\"%", uuid);
422
423 let count: i64 = conn
424 .query_row(
425 "SELECT COUNT(*) FROM events WHERE session_id = ?1 AND metadata LIKE ?2",
426 params![session_id, pattern],
427 |row| row.get(0),
428 )
429 .map_err(|e| StorageError::Query(Box::new(e)))?;
430
431 Ok(count > 0)
432}
433
434pub(crate) fn query_transcript_positions(
438 conn: &Connection,
439) -> Result<Vec<(String, FilePosition)>, StorageError> {
440 let mut stmt = conn
441 .prepare(
442 "SELECT file_path, byte_offset, line_number, last_uuid FROM transcript_positions ORDER BY file_path",
443 )
444 .map_err(|e| StorageError::Query(Box::new(e)))?;
445
446 let positions = stmt
447 .query_map([], |row| {
448 let path: String = row.get(0)?;
449 let position = FilePosition {
450 offset: row.get::<_, i64>(1)? as u64,
451 line_number: row.get::<_, i64>(2)? as u64,
452 last_uuid: row.get(3)?,
453 };
454 Ok((path, position))
455 })
456 .map_err(|e| StorageError::Query(Box::new(e)))?
457 .collect::<Result<Vec<_>, _>>()
458 .map_err(|e| StorageError::Query(Box::new(e)))?;
459
460 Ok(positions)
461}