mi6_storage_sqlite/
storage.rs

1//! Storage trait implementation for SqliteStorage.
2//!
3//! This module implements the `Storage` trait from mi6-core, providing
4//! the main database operations:
5//! - Event insertion with session updates
6//! - Event querying with filters
7//! - Garbage collection
8//! - Statistics computation
9//! - Transcript position tracking
10
11use std::path::Path;
12use std::time::Duration;
13
14use chrono::Utc;
15use mi6_core::{
16    Event, EventQuery, FilePosition, GitBranchInfo, Order, Session, SessionQuery, Storage,
17    StorageError, StorageStats, StorageStatsQuery,
18};
19use rusqlite::{Connection, params};
20
21use crate::query_builder::QueryBuilder;
22use crate::row_parsing::row_to_event;
23use crate::session;
24use crate::sql;
25
26/// Implement Storage trait for SqliteStorage.
27impl Storage for super::SqliteStorage {
28    fn insert(&self, event: &Event) -> Result<i64, StorageError> {
29        // Insert the event
30        self.conn
31            .execute(
32                sql::INSERT_EVENT,
33                params![
34                    event.machine_id,
35                    event.timestamp.timestamp_millis(),
36                    event.event_type.to_string(),
37                    event.session_id,
38                    event.framework,
39                    event.tool_use_id,
40                    event.spawned_agent_id,
41                    event.tool_name,
42                    event.subagent_type,
43                    event.permission_mode,
44                    event.transcript_path,
45                    event.pid,
46                    event.cwd,
47                    event.git_branch,
48                    event.model,
49                    event.tokens_input,
50                    event.tokens_output,
51                    event.tokens_cache_read,
52                    event.tokens_cache_write,
53                    event.cost_usd,
54                    event.duration_ms,
55                    event.payload,
56                    event.metadata,
57                    event.source,
58                    event.is_sidechain,
59                ],
60            )
61            .map_err(|e| StorageError::Query(Box::new(e)))?;
62
63        let event_id = self.conn.last_insert_rowid();
64
65        // Update the sessions table based on event type
66        session::update_for_event(&self.conn, event)?;
67
68        Ok(event_id)
69    }
70
71    fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StorageError> {
72        let mut qb = QueryBuilder::new(
73            "SELECT id, machine_id, timestamp, event_type, session_id, framework, tool_use_id, spawned_agent_id, tool_name, subagent_type, permission_mode, transcript_path, pid, cwd, git_branch, model, tokens_input, tokens_output, tokens_cache_read, tokens_cache_write, cost_usd, duration_ms, payload, metadata, source, is_sidechain FROM events",
74        );
75
76        // Apply session filter (single or multiple)
77        if let Some(ref session_ids) = query.session_ids {
78            // Empty session_ids means no results (handled by and_in returning false)
79            if !qb.and_in("session_id", session_ids) {
80                return Ok(vec![]);
81            }
82        } else if let Some(ref session_id) = query.session_id {
83            qb.and_eq("session_id", session_id.clone());
84        }
85        if let Some(ref event_type) = query.event_type {
86            qb.and_eq_upper("event_type", event_type.clone());
87        }
88        if let Some(ref permission_mode) = query.permission_mode {
89            qb.and_eq("permission_mode", permission_mode.clone());
90        }
91        if let Some(ref framework) = query.framework {
92            qb.and_eq("framework", framework.clone());
93        }
94        if let Some(after_ts) = query.after_ts {
95            qb.and_gt("timestamp", after_ts.timestamp_millis());
96        }
97        if let Some(before_ts) = query.before_ts {
98            qb.and_lt("timestamp", before_ts.timestamp_millis());
99        }
100        if let Some(after_id) = query.after_id {
101            qb.and_gt("id", after_id);
102        }
103
104        // Filter for API requests only or exclude them
105        if query.api_requests_only {
106            qb.and_eq_upper("event_type", "ApiRequest".to_string());
107        } else if query.exclude_api_requests {
108            qb.and_neq_upper("event_type", "ApiRequest".to_string());
109        }
110
111        // Use EventQuery helper methods for ordering
112        let orders_by_id = query.orders_by_id();
113        let direction = query.effective_direction();
114        let order_clause = match (orders_by_id, direction) {
115            (true, Order::Asc) => "id ASC",
116            (true, Order::Desc) => "id DESC",
117            (false, Order::Asc) => "timestamp ASC",
118            (false, Order::Desc) => "timestamp DESC",
119        };
120        qb.order_by(order_clause);
121
122        // Apply limit if specified
123        if let Some(limit) = query.limit {
124            qb.limit(limit);
125        }
126
127        let (sql, params) = qb.build();
128
129        let mut stmt = self
130            .conn
131            .prepare(&sql)
132            .map_err(|e| StorageError::Query(Box::new(e)))?;
133
134        let events = stmt
135            .query_map(params.as_slice(), row_to_event)
136            .map_err(|e| StorageError::Query(Box::new(e)))?
137            .collect::<Result<Vec<_>, _>>()
138            .map_err(|e| StorageError::Query(Box::new(e)))?;
139
140        Ok(events)
141    }
142
143    fn gc(&self, retention: Duration) -> Result<usize, StorageError> {
144        let chrono_retention =
145            chrono::Duration::from_std(retention).map_err(|e| StorageError::Query(Box::new(e)))?;
146        let cutoff = (Utc::now() - chrono_retention).timestamp_millis();
147
148        // Delete events older than retention period (includes API requests now)
149        let deleted = self
150            .conn
151            .execute("DELETE FROM events WHERE timestamp < ?1", [cutoff])
152            .map_err(|e| StorageError::Query(Box::new(e)))?;
153
154        Ok(deleted)
155    }
156
157    fn count_expired(&self, retention: Duration) -> Result<usize, StorageError> {
158        let chrono_retention =
159            chrono::Duration::from_std(retention).map_err(|e| StorageError::Query(Box::new(e)))?;
160        let cutoff = (Utc::now() - chrono_retention).timestamp_millis();
161
162        let count: usize = self
163            .conn
164            .query_row(
165                "SELECT COUNT(*) FROM events WHERE timestamp < ?1",
166                [cutoff],
167                |row| row.get(0),
168            )
169            .map_err(|e| StorageError::Query(Box::new(e)))?;
170
171        Ok(count)
172    }
173
174    fn count(&self) -> Result<usize, StorageError> {
175        let count: usize = self
176            .conn
177            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
178            .map_err(|e| StorageError::Query(Box::new(e)))?;
179
180        Ok(count)
181    }
182
183    fn list_sessions(&self, query: &SessionQuery) -> Result<Vec<Session>, StorageError> {
184        session::list(&self.conn, query)
185    }
186
187    fn get_session(&self, session_id: &str) -> Result<Option<Session>, StorageError> {
188        session::get(&self.conn, session_id)
189    }
190
191    fn get_session_by_key(
192        &self,
193        machine_id: &str,
194        session_id: &str,
195    ) -> Result<Option<Session>, StorageError> {
196        session::get_by_key(&self.conn, machine_id, session_id)
197    }
198
199    fn get_session_by_pid(&self, pid: i32) -> Result<Option<Session>, StorageError> {
200        session::get_by_pid(&self.conn, pid)
201    }
202
203    fn update_session_git_info(
204        &self,
205        session_id: &str,
206        git_info: &GitBranchInfo,
207    ) -> Result<bool, StorageError> {
208        session::update_git_info(&self.conn, session_id, git_info)
209    }
210
211    fn storage_stats(&self, query: &StorageStatsQuery) -> Result<StorageStats, StorageError> {
212        storage_stats(&self.conn, query)
213    }
214
215    fn get_transcript_position(
216        &self,
217        path: &std::path::Path,
218    ) -> Result<Option<FilePosition>, StorageError> {
219        get_transcript_position(&self.conn, path)
220    }
221
222    fn set_transcript_position(
223        &self,
224        path: &std::path::Path,
225        position: &FilePosition,
226    ) -> Result<(), StorageError> {
227        set_transcript_position(&self.conn, path, position)
228    }
229
230    fn event_exists_by_uuid(&self, session_id: &str, uuid: &str) -> Result<bool, StorageError> {
231        event_exists_by_uuid(&self.conn, session_id, uuid)
232    }
233
234    fn query_transcript_positions(&self) -> Result<Vec<(String, FilePosition)>, StorageError> {
235        query_transcript_positions(&self.conn)
236    }
237}
238
239/// Compute storage statistics across all sessions.
240pub(crate) fn storage_stats(
241    conn: &Connection,
242    query: &StorageStatsQuery,
243) -> Result<StorageStats, StorageError> {
244    // v14 schema uses unified columns directly
245    let mut qb = QueryBuilder::new(
246        r"SELECT
247            COUNT(*) as session_count,
248            COUNT(CASE WHEN last_ended_at IS NULL THEN 1 END) as active_count,
249            COALESCE(SUM(tokens_input + tokens_output + tokens_cache_read + tokens_cache_write), 0) as total_tokens,
250            COALESCE(SUM(cost_usd), 0.0) as total_cost,
251            COALESCE(SUM(api_request_count), 0) as total_requests
252        FROM sessions",
253    );
254
255    // Apply filters
256    if query.active_only {
257        qb.and_is_null("last_ended_at");
258    }
259    if let Some(ref framework) = query.framework {
260        qb.and_eq("framework", framework.clone());
261    }
262
263    let (sql, params) = qb.build();
264
265    conn.query_row(&sql, params.as_slice(), |row| {
266        Ok(StorageStats {
267            session_count: row.get::<_, i64>(0)? as u32,
268            active_session_count: row.get::<_, i64>(1)? as u32,
269            total_tokens: row.get(2)?,
270            total_cost_usd: row.get(3)?,
271            total_api_requests: row.get::<_, i64>(4)? as u32,
272        })
273    })
274    .map_err(|e| StorageError::Query(Box::new(e)))
275}
276
277/// Get the last scanned position for a transcript file.
278///
279/// Returns `None` if the file has never been scanned.
280pub(crate) fn get_transcript_position(
281    conn: &Connection,
282    path: &Path,
283) -> Result<Option<FilePosition>, StorageError> {
284    let path_str = path.to_string_lossy();
285
286    let result = conn.query_row(
287        "SELECT byte_offset, line_number, last_uuid FROM transcript_positions WHERE file_path = ?1",
288        [&path_str],
289        |row| {
290            Ok(FilePosition {
291                offset: row.get::<_, i64>(0)? as u64,
292                line_number: row.get::<_, i64>(1)? as u64,
293                last_uuid: row.get(2)?,
294            })
295        },
296    );
297
298    match result {
299        Ok(pos) => Ok(Some(pos)),
300        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
301        Err(e) => Err(StorageError::Query(Box::new(e))),
302    }
303}
304
305/// Set the scanned position for a transcript file.
306///
307/// Uses UPSERT to insert or update the position.
308pub(crate) fn set_transcript_position(
309    conn: &Connection,
310    path: &Path,
311    position: &FilePosition,
312) -> Result<(), StorageError> {
313    let path_str = path.to_string_lossy();
314    let now = std::time::SystemTime::now()
315        .duration_since(std::time::UNIX_EPOCH)
316        .map(|d| d.as_millis() as i64)
317        .unwrap_or(0);
318
319    conn.execute(
320        r"INSERT INTO transcript_positions (file_path, byte_offset, line_number, last_uuid, updated_at)
321          VALUES (?1, ?2, ?3, ?4, ?5)
322          ON CONFLICT (file_path) DO UPDATE SET
323            byte_offset = excluded.byte_offset,
324            line_number = excluded.line_number,
325            last_uuid = excluded.last_uuid,
326            updated_at = excluded.updated_at",
327        params![
328            path_str,
329            position.offset as i64,
330            position.line_number as i64,
331            position.last_uuid,
332            now,
333        ],
334    )
335    .map_err(|e| StorageError::Query(Box::new(e)))?;
336
337    Ok(())
338}
339
340/// Check if an event with the given UUID exists for a session.
341///
342/// Used for deduplication when parsing transcripts.
343pub(crate) fn event_exists_by_uuid(
344    conn: &Connection,
345    session_id: &str,
346    uuid: &str,
347) -> Result<bool, StorageError> {
348    // UUIDs are stored in metadata JSON as {"uuid": "..."}
349    let pattern = format!("%\"uuid\":\"{}\"%", uuid);
350
351    let count: i64 = conn
352        .query_row(
353            "SELECT COUNT(*) FROM events WHERE session_id = ?1 AND metadata LIKE ?2",
354            params![session_id, pattern],
355            |row| row.get(0),
356        )
357        .map_err(|e| StorageError::Query(Box::new(e)))?;
358
359    Ok(count > 0)
360}
361
362/// Query all transcript file positions.
363///
364/// Returns a list of (file_path, position) pairs.
365pub(crate) fn query_transcript_positions(
366    conn: &Connection,
367) -> Result<Vec<(String, FilePosition)>, StorageError> {
368    let mut stmt = conn
369        .prepare(
370            "SELECT file_path, byte_offset, line_number, last_uuid FROM transcript_positions ORDER BY file_path",
371        )
372        .map_err(|e| StorageError::Query(Box::new(e)))?;
373
374    let positions = stmt
375        .query_map([], |row| {
376            let path: String = row.get(0)?;
377            let position = FilePosition {
378                offset: row.get::<_, i64>(1)? as u64,
379                line_number: row.get::<_, i64>(2)? as u64,
380                last_uuid: row.get(3)?,
381            };
382            Ok((path, position))
383        })
384        .map_err(|e| StorageError::Query(Box::new(e)))?
385        .collect::<Result<Vec<_>, _>>()
386        .map_err(|e| StorageError::Query(Box::new(e)))?;
387
388    Ok(positions)
389}