Skip to main content

opensession_local_db/
lib.rs

1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_api::db::migrations::{LOCAL_MIGRATIONS, MIGRATIONS};
5use opensession_core::session::{is_auxiliary_session, working_directory};
6use opensession_core::trace::Session;
7use rusqlite::{params, Connection, OptionalExtension};
8use serde_json::Value;
9use std::collections::HashSet;
10use std::fs;
11use std::path::PathBuf;
12use std::sync::Mutex;
13
14use git::GitContext;
15
16/// A local session row stored in the local SQLite index/cache database.
17#[derive(Debug, Clone)]
18pub struct LocalSessionRow {
19    pub id: String,
20    pub source_path: Option<String>,
21    pub sync_status: String,
22    pub last_synced_at: Option<String>,
23    pub user_id: Option<String>,
24    pub nickname: Option<String>,
25    pub team_id: Option<String>,
26    pub tool: String,
27    pub agent_provider: Option<String>,
28    pub agent_model: Option<String>,
29    pub title: Option<String>,
30    pub description: Option<String>,
31    pub tags: Option<String>,
32    pub created_at: String,
33    pub uploaded_at: Option<String>,
34    pub message_count: i64,
35    pub user_message_count: i64,
36    pub task_count: i64,
37    pub event_count: i64,
38    pub duration_seconds: i64,
39    pub total_input_tokens: i64,
40    pub total_output_tokens: i64,
41    pub git_remote: Option<String>,
42    pub git_branch: Option<String>,
43    pub git_commit: Option<String>,
44    pub git_repo_name: Option<String>,
45    pub pr_number: Option<i64>,
46    pub pr_url: Option<String>,
47    pub working_directory: Option<String>,
48    pub files_modified: Option<String>,
49    pub files_read: Option<String>,
50    pub has_errors: bool,
51    pub max_active_agents: i64,
52    pub is_auxiliary: bool,
53}
54
55/// A link between a git commit and an AI session.
56#[derive(Debug, Clone)]
57pub struct CommitLink {
58    pub commit_hash: String,
59    pub session_id: String,
60    pub repo_path: Option<String>,
61    pub branch: Option<String>,
62    pub created_at: String,
63}
64
65/// Return true when a cached row corresponds to an OpenCode child session.
66pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
67    row.tool == "opencode" && row.is_auxiliary
68}
69
70/// Parse `parentID` / `parentId` from an OpenCode session JSON file.
71#[deprecated(
72    note = "Use parser/core canonical session role attributes instead of runtime file inspection"
73)]
74pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
75    let text = fs::read_to_string(source_path).ok()?;
76    let json: Value = serde_json::from_str(&text).ok()?;
77    lookup_parent_session_id(&json)
78}
79
80fn lookup_parent_session_id(value: &Value) -> Option<String> {
81    match value {
82        Value::Object(obj) => {
83            for (key, value) in obj {
84                if is_parent_id_key(key) {
85                    if let Some(parent_id) = value.as_str() {
86                        let parent_id = parent_id.trim();
87                        if !parent_id.is_empty() {
88                            return Some(parent_id.to_string());
89                        }
90                    }
91                }
92                if let Some(parent_id) = lookup_parent_session_id(value) {
93                    return Some(parent_id);
94                }
95            }
96            None
97        }
98        Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
99        _ => None,
100    }
101}
102
103fn is_parent_id_key(key: &str) -> bool {
104    let flat = key
105        .chars()
106        .filter(|c| c.is_ascii_alphanumeric())
107        .map(|c| c.to_ascii_lowercase())
108        .collect::<String>();
109
110    flat == "parentid"
111        || flat == "parentuuid"
112        || flat == "parentsessionid"
113        || flat == "parentsessionuuid"
114        || flat.ends_with("parentsessionid")
115        || (flat.contains("parent") && flat.ends_with("id"))
116        || (flat.contains("parent") && flat.ends_with("uuid"))
117}
118
119/// Remove OpenCode child sessions so only parent sessions remain visible.
120pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
121    rows.retain(|row| !row.is_auxiliary);
122    rows
123}
124
125fn infer_tool_from_source_path(source_path: Option<&str>) -> Option<&'static str> {
126    let source_path = source_path.map(|path| path.to_ascii_lowercase())?;
127
128    if source_path.contains("/.codex/sessions/")
129        || source_path.contains("\\.codex\\sessions\\")
130        || source_path.contains("/codex/sessions/")
131        || source_path.contains("\\codex\\sessions\\")
132    {
133        return Some("codex");
134    }
135
136    if source_path.contains("/.claude/projects/")
137        || source_path.contains("\\.claude\\projects\\")
138        || source_path.contains("/claude/projects/")
139        || source_path.contains("\\claude\\projects\\")
140    {
141        return Some("claude-code");
142    }
143
144    None
145}
146
147fn normalize_tool_for_source_path(current_tool: &str, source_path: Option<&str>) -> String {
148    infer_tool_from_source_path(source_path)
149        .unwrap_or(current_tool)
150        .to_string()
151}
152
153/// Filter for listing sessions from the local DB.
154#[derive(Debug, Clone)]
155pub struct LocalSessionFilter {
156    pub team_id: Option<String>,
157    pub sync_status: Option<String>,
158    pub git_repo_name: Option<String>,
159    pub search: Option<String>,
160    pub tool: Option<String>,
161    pub sort: LocalSortOrder,
162    pub time_range: LocalTimeRange,
163    pub limit: Option<u32>,
164    pub offset: Option<u32>,
165}
166
167impl Default for LocalSessionFilter {
168    fn default() -> Self {
169        Self {
170            team_id: None,
171            sync_status: None,
172            git_repo_name: None,
173            search: None,
174            tool: None,
175            sort: LocalSortOrder::Recent,
176            time_range: LocalTimeRange::All,
177            limit: None,
178            offset: None,
179        }
180    }
181}
182
183/// Sort order for local session listing.
184#[derive(Debug, Clone, Default, PartialEq, Eq)]
185pub enum LocalSortOrder {
186    #[default]
187    Recent,
188    Popular,
189    Longest,
190}
191
192/// Time range filter for local session listing.
193#[derive(Debug, Clone, Default, PartialEq, Eq)]
194pub enum LocalTimeRange {
195    Hours24,
196    Days7,
197    Days30,
198    #[default]
199    All,
200}
201
202/// Minimal remote session payload needed for local index/cache upsert.
203#[derive(Debug, Clone)]
204pub struct RemoteSessionSummary {
205    pub id: String,
206    pub user_id: Option<String>,
207    pub nickname: Option<String>,
208    pub team_id: String,
209    pub tool: String,
210    pub agent_provider: Option<String>,
211    pub agent_model: Option<String>,
212    pub title: Option<String>,
213    pub description: Option<String>,
214    pub tags: Option<String>,
215    pub created_at: String,
216    pub uploaded_at: String,
217    pub message_count: i64,
218    pub task_count: i64,
219    pub event_count: i64,
220    pub duration_seconds: i64,
221    pub total_input_tokens: i64,
222    pub total_output_tokens: i64,
223    pub git_remote: Option<String>,
224    pub git_branch: Option<String>,
225    pub git_commit: Option<String>,
226    pub git_repo_name: Option<String>,
227    pub pr_number: Option<i64>,
228    pub pr_url: Option<String>,
229    pub working_directory: Option<String>,
230    pub files_modified: Option<String>,
231    pub files_read: Option<String>,
232    pub has_errors: bool,
233    pub max_active_agents: i64,
234}
235
236/// Extended filter for the `log` command.
237#[derive(Debug, Default)]
238pub struct LogFilter {
239    /// Filter by tool name (exact match).
240    pub tool: Option<String>,
241    /// Filter by model (glob-like, uses LIKE).
242    pub model: Option<String>,
243    /// Filter sessions created after this ISO8601 timestamp.
244    pub since: Option<String>,
245    /// Filter sessions created before this ISO8601 timestamp.
246    pub before: Option<String>,
247    /// Filter sessions that touched this file path (searches files_modified JSON).
248    pub touches: Option<String>,
249    /// Free-text search in title, description, tags.
250    pub grep: Option<String>,
251    /// Only sessions with errors.
252    pub has_errors: Option<bool>,
253    /// Filter by working directory (prefix match).
254    pub working_directory: Option<String>,
255    /// Filter by git repo name.
256    pub git_repo_name: Option<String>,
257    /// Filter sessions linked to this git commit hash.
258    pub commit: Option<String>,
259    /// Maximum number of results.
260    pub limit: Option<u32>,
261    /// Offset for pagination.
262    pub offset: Option<u32>,
263}
264
265/// Base FROM clause for session list queries.
266const FROM_CLAUSE: &str = "\
267FROM sessions s \
268LEFT JOIN session_sync ss ON ss.session_id = s.id \
269LEFT JOIN users u ON u.id = s.user_id";
270
271/// Local SQLite index/cache shared by TUI and Daemon.
272/// This is not the source of truth for canonical session bodies.
273/// Thread-safe: wraps the connection in a Mutex so it can be shared via `Arc<LocalDb>`.
274pub struct LocalDb {
275    conn: Mutex<Connection>,
276}
277
278impl LocalDb {
279    /// Open (or create) the local database at the default path.
280    /// `~/.local/share/opensession/local.db`
281    pub fn open() -> Result<Self> {
282        let path = default_db_path()?;
283        Self::open_path(&path)
284    }
285
286    /// Open (or create) the local database at a specific path.
287    pub fn open_path(path: &PathBuf) -> Result<Self> {
288        if let Some(parent) = path.parent() {
289            std::fs::create_dir_all(parent)
290                .with_context(|| format!("create dir for {}", path.display()))?;
291        }
292        match open_connection_with_latest_schema(path) {
293            Ok(conn) => Ok(Self {
294                conn: Mutex::new(conn),
295            }),
296            Err(err) => {
297                if !is_schema_compat_error(&err) {
298                    return Err(err);
299                }
300
301                // Local DB is a cache. If schema migration cannot safely reconcile
302                // an incompatible/corrupted file, rotate it out and recreate latest schema.
303                rotate_legacy_db(path)?;
304
305                let conn = open_connection_with_latest_schema(path)
306                    .with_context(|| format!("recreate db {}", path.display()))?;
307                Ok(Self {
308                    conn: Mutex::new(conn),
309                })
310            }
311        }
312    }
313
314    fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
315        self.conn.lock().expect("local db mutex poisoned")
316    }
317
318    // ── Upsert local session (parsed from file) ────────────────────────
319
320    pub fn upsert_local_session(
321        &self,
322        session: &Session,
323        source_path: &str,
324        git: &GitContext,
325    ) -> Result<()> {
326        let title = session.context.title.as_deref();
327        let description = session.context.description.as_deref();
328        let tags = if session.context.tags.is_empty() {
329            None
330        } else {
331            Some(session.context.tags.join(","))
332        };
333        let created_at = session.context.created_at.to_rfc3339();
334        let cwd = working_directory(session).map(String::from);
335        let is_auxiliary = is_auxiliary_session(session);
336
337        // Extract files_modified, files_read, and has_errors from events
338        let (files_modified, files_read, has_errors) =
339            opensession_core::extract::extract_file_metadata(session);
340        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
341        let normalized_tool =
342            normalize_tool_for_source_path(&session.agent.tool, Some(source_path));
343
344        let conn = self.conn();
345        // NOTE: `body_storage_key` is kept only for migration/schema parity.
346        // Runtime lookup uses canonical body URLs and local body cache tables.
347        conn.execute(
348            "INSERT INTO sessions \
349             (id, team_id, tool, agent_provider, agent_model, \
350              title, description, tags, created_at, \
351             message_count, user_message_count, task_count, event_count, duration_seconds, \
352              total_input_tokens, total_output_tokens, body_storage_key, \
353              git_remote, git_branch, git_commit, git_repo_name, working_directory, \
354              files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
355             VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24,?25) \
356             ON CONFLICT(id) DO UPDATE SET \
357              tool=excluded.tool, agent_provider=excluded.agent_provider, \
358              agent_model=excluded.agent_model, \
359              title=excluded.title, description=excluded.description, \
360              tags=excluded.tags, \
361              message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
362              task_count=excluded.task_count, \
363              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
364              total_input_tokens=excluded.total_input_tokens, \
365              total_output_tokens=excluded.total_output_tokens, \
366              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
367              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
368              working_directory=excluded.working_directory, \
369              files_modified=excluded.files_modified, files_read=excluded.files_read, \
370              has_errors=excluded.has_errors, \
371              max_active_agents=excluded.max_active_agents, \
372              is_auxiliary=excluded.is_auxiliary",
373            params![
374                &session.session_id,
375                &normalized_tool,
376                &session.agent.provider,
377                &session.agent.model,
378                title,
379                description,
380                &tags,
381                &created_at,
382                session.stats.message_count as i64,
383                session.stats.user_message_count as i64,
384                session.stats.task_count as i64,
385                session.stats.event_count as i64,
386                session.stats.duration_seconds as i64,
387                session.stats.total_input_tokens as i64,
388                session.stats.total_output_tokens as i64,
389                &git.remote,
390                &git.branch,
391                &git.commit,
392                &git.repo_name,
393                &cwd,
394                &files_modified,
395                &files_read,
396                has_errors,
397                max_active_agents,
398                is_auxiliary as i64,
399            ],
400        )?;
401
402        conn.execute(
403            "INSERT INTO session_sync (session_id, source_path, sync_status) \
404             VALUES (?1, ?2, 'local_only') \
405             ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
406            params![&session.session_id, source_path],
407        )?;
408        Ok(())
409    }
410
411    // ── Upsert remote session (from server sync pull) ──────────────────
412
413    pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
414        let conn = self.conn();
415        // NOTE: `body_storage_key` is kept only for migration/schema parity.
416        // Runtime lookup uses canonical body URLs and local body cache tables.
417        conn.execute(
418            "INSERT INTO sessions \
419             (id, user_id, team_id, tool, agent_provider, agent_model, \
420              title, description, tags, created_at, uploaded_at, \
421              message_count, task_count, event_count, duration_seconds, \
422              total_input_tokens, total_output_tokens, body_storage_key, \
423              git_remote, git_branch, git_commit, git_repo_name, \
424              pr_number, pr_url, working_directory, \
425              files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
426             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28,0) \
427             ON CONFLICT(id) DO UPDATE SET \
428              title=excluded.title, description=excluded.description, \
429              tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
430              message_count=excluded.message_count, task_count=excluded.task_count, \
431              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
432              total_input_tokens=excluded.total_input_tokens, \
433              total_output_tokens=excluded.total_output_tokens, \
434              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
435              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
436              pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
437              working_directory=excluded.working_directory, \
438              files_modified=excluded.files_modified, files_read=excluded.files_read, \
439              has_errors=excluded.has_errors, \
440              max_active_agents=excluded.max_active_agents, \
441              is_auxiliary=excluded.is_auxiliary",
442            params![
443                &summary.id,
444                &summary.user_id,
445                &summary.team_id,
446                &summary.tool,
447                &summary.agent_provider,
448                &summary.agent_model,
449                &summary.title,
450                &summary.description,
451                &summary.tags,
452                &summary.created_at,
453                &summary.uploaded_at,
454                summary.message_count,
455                summary.task_count,
456                summary.event_count,
457                summary.duration_seconds,
458                summary.total_input_tokens,
459                summary.total_output_tokens,
460                &summary.git_remote,
461                &summary.git_branch,
462                &summary.git_commit,
463                &summary.git_repo_name,
464                summary.pr_number,
465                &summary.pr_url,
466                &summary.working_directory,
467                &summary.files_modified,
468                &summary.files_read,
469                summary.has_errors,
470                summary.max_active_agents,
471            ],
472        )?;
473
474        conn.execute(
475            "INSERT INTO session_sync (session_id, sync_status) \
476             VALUES (?1, 'remote_only') \
477             ON CONFLICT(session_id) DO UPDATE SET \
478              sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
479            params![&summary.id],
480        )?;
481        Ok(())
482    }
483
484    // ── List sessions ──────────────────────────────────────────────────
485
486    fn build_local_session_where_clause(
487        filter: &LocalSessionFilter,
488    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
489        let mut where_clauses = vec![
490            "1=1".to_string(),
491            "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
492        ];
493        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
494        let mut idx = 1u32;
495
496        if let Some(ref team_id) = filter.team_id {
497            where_clauses.push(format!("s.team_id = ?{idx}"));
498            param_values.push(Box::new(team_id.clone()));
499            idx += 1;
500        }
501
502        if let Some(ref sync_status) = filter.sync_status {
503            where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
504            param_values.push(Box::new(sync_status.clone()));
505            idx += 1;
506        }
507
508        if let Some(ref repo) = filter.git_repo_name {
509            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
510            param_values.push(Box::new(repo.clone()));
511            idx += 1;
512        }
513
514        if let Some(ref tool) = filter.tool {
515            where_clauses.push(format!("s.tool = ?{idx}"));
516            param_values.push(Box::new(tool.clone()));
517            idx += 1;
518        }
519
520        if let Some(ref search) = filter.search {
521            let like = format!("%{search}%");
522            where_clauses.push(format!(
523                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
524                i1 = idx,
525                i2 = idx + 1,
526                i3 = idx + 2,
527            ));
528            param_values.push(Box::new(like.clone()));
529            param_values.push(Box::new(like.clone()));
530            param_values.push(Box::new(like));
531            idx += 3;
532        }
533
534        let interval = match filter.time_range {
535            LocalTimeRange::Hours24 => Some("-1 day"),
536            LocalTimeRange::Days7 => Some("-7 days"),
537            LocalTimeRange::Days30 => Some("-30 days"),
538            LocalTimeRange::All => None,
539        };
540        if let Some(interval) = interval {
541            where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
542            param_values.push(Box::new(interval.to_string()));
543        }
544
545        (where_clauses.join(" AND "), param_values)
546    }
547
548    pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
549        let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
550        let order_clause = match filter.sort {
551            LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
552            LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
553            LocalSortOrder::Recent => "s.created_at DESC",
554        };
555
556        let mut sql = format!(
557            "SELECT {LOCAL_SESSION_COLUMNS} \
558             {FROM_CLAUSE} WHERE {where_str} \
559             ORDER BY {order_clause}"
560        );
561
562        if let Some(limit) = filter.limit {
563            sql.push_str(" LIMIT ?");
564            param_values.push(Box::new(limit));
565            if let Some(offset) = filter.offset {
566                sql.push_str(" OFFSET ?");
567                param_values.push(Box::new(offset));
568            }
569        }
570
571        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
572            param_values.iter().map(|p| p.as_ref()).collect();
573        let conn = self.conn();
574        let mut stmt = conn.prepare(&sql)?;
575        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
576
577        let mut result = Vec::new();
578        for row in rows {
579            result.push(row?);
580        }
581
582        Ok(result)
583    }
584
585    /// Count sessions for a given list filter (before UI-level page slicing).
586    pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
587        let mut count_filter = filter.clone();
588        count_filter.limit = None;
589        count_filter.offset = None;
590        let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
591        let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
592        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
593            param_values.iter().map(|p| p.as_ref()).collect();
594        let conn = self.conn();
595        let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
596        Ok(count)
597    }
598
599    /// List distinct tool names for the current list filter (ignores active tool filter).
600    pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
601        let mut tool_filter = filter.clone();
602        tool_filter.tool = None;
603        tool_filter.limit = None;
604        tool_filter.offset = None;
605        let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
606        let sql = format!(
607            "SELECT DISTINCT s.tool \
608             {FROM_CLAUSE} WHERE {where_str} \
609             ORDER BY s.tool ASC"
610        );
611        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
612            param_values.iter().map(|p| p.as_ref()).collect();
613        let conn = self.conn();
614        let mut stmt = conn.prepare(&sql)?;
615        let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
616
617        let mut tools = Vec::new();
618        for row in rows {
619            let tool = row?;
620            if !tool.trim().is_empty() {
621                tools.push(tool);
622            }
623        }
624        Ok(tools)
625    }
626
627    // ── Log query ─────────────────────────────────────────────────────
628
629    /// Query sessions with extended filters for the `log` command.
630    pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
631        let mut where_clauses = vec![
632            "1=1".to_string(),
633            "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
634        ];
635        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
636        let mut idx = 1u32;
637
638        if let Some(ref tool) = filter.tool {
639            where_clauses.push(format!("s.tool = ?{idx}"));
640            param_values.push(Box::new(tool.clone()));
641            idx += 1;
642        }
643
644        if let Some(ref model) = filter.model {
645            let like = model.replace('*', "%");
646            where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
647            param_values.push(Box::new(like));
648            idx += 1;
649        }
650
651        if let Some(ref since) = filter.since {
652            where_clauses.push(format!("s.created_at >= ?{idx}"));
653            param_values.push(Box::new(since.clone()));
654            idx += 1;
655        }
656
657        if let Some(ref before) = filter.before {
658            where_clauses.push(format!("s.created_at < ?{idx}"));
659            param_values.push(Box::new(before.clone()));
660            idx += 1;
661        }
662
663        if let Some(ref touches) = filter.touches {
664            let like = format!("%\"{touches}\"%");
665            where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
666            param_values.push(Box::new(like));
667            idx += 1;
668        }
669
670        if let Some(ref grep) = filter.grep {
671            let like = format!("%{grep}%");
672            where_clauses.push(format!(
673                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
674                i1 = idx,
675                i2 = idx + 1,
676                i3 = idx + 2,
677            ));
678            param_values.push(Box::new(like.clone()));
679            param_values.push(Box::new(like.clone()));
680            param_values.push(Box::new(like));
681            idx += 3;
682        }
683
684        if let Some(true) = filter.has_errors {
685            where_clauses.push("s.has_errors = 1".to_string());
686        }
687
688        if let Some(ref wd) = filter.working_directory {
689            where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
690            param_values.push(Box::new(format!("{wd}%")));
691            idx += 1;
692        }
693
694        if let Some(ref repo) = filter.git_repo_name {
695            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
696            param_values.push(Box::new(repo.clone()));
697            idx += 1;
698        }
699
700        // Optional JOIN for commit hash filter
701        let mut extra_join = String::new();
702        if let Some(ref commit) = filter.commit {
703            extra_join =
704                " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
705            where_clauses.push(format!("csl.commit_hash = ?{idx}"));
706            param_values.push(Box::new(commit.clone()));
707            idx += 1;
708        }
709
710        let _ = idx; // suppress unused warning
711
712        let where_str = where_clauses.join(" AND ");
713        let mut sql = format!(
714            "SELECT {LOCAL_SESSION_COLUMNS} \
715             {FROM_CLAUSE}{extra_join} WHERE {where_str} \
716             ORDER BY s.created_at DESC"
717        );
718
719        if let Some(limit) = filter.limit {
720            sql.push_str(" LIMIT ?");
721            param_values.push(Box::new(limit));
722            if let Some(offset) = filter.offset {
723                sql.push_str(" OFFSET ?");
724                param_values.push(Box::new(offset));
725            }
726        }
727
728        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
729            param_values.iter().map(|p| p.as_ref()).collect();
730        let conn = self.conn();
731        let mut stmt = conn.prepare(&sql)?;
732        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
733
734        let mut result = Vec::new();
735        for row in rows {
736            result.push(row?);
737        }
738        Ok(result)
739    }
740
741    /// Get the latest N sessions for a specific tool, ordered by created_at DESC.
742    pub fn get_sessions_by_tool_latest(
743        &self,
744        tool: &str,
745        count: u32,
746    ) -> Result<Vec<LocalSessionRow>> {
747        let sql = format!(
748            "SELECT {LOCAL_SESSION_COLUMNS} \
749             {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
750             ORDER BY s.created_at DESC"
751        );
752        let conn = self.conn();
753        let mut stmt = conn.prepare(&sql)?;
754        let rows = stmt.query_map(params![tool], row_to_local_session)?;
755        let mut result = Vec::new();
756        for row in rows {
757            result.push(row?);
758        }
759
760        result.truncate(count as usize);
761        Ok(result)
762    }
763
764    /// Get the latest N sessions across all tools, ordered by created_at DESC.
765    pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
766        let sql = format!(
767            "SELECT {LOCAL_SESSION_COLUMNS} \
768             {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
769             ORDER BY s.created_at DESC"
770        );
771        let conn = self.conn();
772        let mut stmt = conn.prepare(&sql)?;
773        let rows = stmt.query_map([], row_to_local_session)?;
774        let mut result = Vec::new();
775        for row in rows {
776            result.push(row?);
777        }
778
779        result.truncate(count as usize);
780        Ok(result)
781    }
782
783    /// Get the Nth most recent session for a specific tool (0 = HEAD, 1 = HEAD~1, etc.).
784    pub fn get_session_by_tool_offset(
785        &self,
786        tool: &str,
787        offset: u32,
788    ) -> Result<Option<LocalSessionRow>> {
789        let sql = format!(
790            "SELECT {LOCAL_SESSION_COLUMNS} \
791             {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
792             ORDER BY s.created_at DESC"
793        );
794        let conn = self.conn();
795        let mut stmt = conn.prepare(&sql)?;
796        let rows = stmt.query_map(params![tool], row_to_local_session)?;
797        let result = rows.collect::<Result<Vec<_>, _>>()?;
798        Ok(result.into_iter().nth(offset as usize))
799    }
800
801    /// Get the Nth most recent session across all tools (0 = HEAD, 1 = HEAD~1, etc.).
802    pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
803        let sql = format!(
804            "SELECT {LOCAL_SESSION_COLUMNS} \
805             {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
806             ORDER BY s.created_at DESC"
807        );
808        let conn = self.conn();
809        let mut stmt = conn.prepare(&sql)?;
810        let rows = stmt.query_map([], row_to_local_session)?;
811        let result = rows.collect::<Result<Vec<_>, _>>()?;
812        Ok(result.into_iter().nth(offset as usize))
813    }
814
815    /// Fetch the source path used when the session was last parsed/loaded.
816    pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
817        let conn = self.conn();
818        let result = conn
819            .query_row(
820                "SELECT source_path FROM session_sync WHERE session_id = ?1",
821                params![session_id],
822                |row| row.get(0),
823            )
824            .optional()?;
825
826        Ok(result)
827    }
828
829    /// Count total sessions in the local DB.
830    pub fn session_count(&self) -> Result<i64> {
831        let count = self
832            .conn()
833            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
834        Ok(count)
835    }
836
837    // ── Delete session ─────────────────────────────────────────────────
838
839    pub fn delete_session(&self, session_id: &str) -> Result<()> {
840        let conn = self.conn();
841        conn.execute(
842            "DELETE FROM body_cache WHERE session_id = ?1",
843            params![session_id],
844        )?;
845        conn.execute(
846            "DELETE FROM session_sync WHERE session_id = ?1",
847            params![session_id],
848        )?;
849        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
850        Ok(())
851    }
852
853    // ── Sync cursor ────────────────────────────────────────────────────
854
855    pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
856        let cursor = self
857            .conn()
858            .query_row(
859                "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
860                params![team_id],
861                |row| row.get(0),
862            )
863            .optional()?;
864        Ok(cursor)
865    }
866
867    pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
868        self.conn().execute(
869            "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
870             VALUES (?1, ?2, datetime('now')) \
871             ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
872            params![team_id, cursor],
873        )?;
874        Ok(())
875    }
876
877    // ── Upload tracking ────────────────────────────────────────────────
878
879    /// Get sessions that are local_only and need to be uploaded.
880    pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
881        let sql = format!(
882            "SELECT {LOCAL_SESSION_COLUMNS} \
883             FROM sessions s \
884             INNER JOIN session_sync ss ON ss.session_id = s.id \
885             LEFT JOIN users u ON u.id = s.user_id \
886             WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
887             ORDER BY s.created_at ASC"
888        );
889        let conn = self.conn();
890        let mut stmt = conn.prepare(&sql)?;
891        let rows = stmt.query_map(params![team_id], row_to_local_session)?;
892        let mut result = Vec::new();
893        for row in rows {
894            result.push(row?);
895        }
896        Ok(result)
897    }
898
899    pub fn mark_synced(&self, session_id: &str) -> Result<()> {
900        self.conn().execute(
901            "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
902             WHERE session_id = ?1",
903            params![session_id],
904        )?;
905        Ok(())
906    }
907
908    /// Check if a session was already uploaded (synced or remote_only) since the given modification time.
909    pub fn was_uploaded_after(
910        &self,
911        source_path: &str,
912        modified: &chrono::DateTime<chrono::Utc>,
913    ) -> Result<bool> {
914        let result: Option<String> = self
915            .conn()
916            .query_row(
917                "SELECT last_synced_at FROM session_sync \
918                 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
919                params![source_path],
920                |row| row.get(0),
921            )
922            .optional()?;
923
924        if let Some(synced_at) = result {
925            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
926                return Ok(dt >= *modified);
927            }
928        }
929        Ok(false)
930    }
931
932    // ── Body cache (local read acceleration) ───────────────────────────
933
934    pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
935        self.conn().execute(
936            "INSERT INTO body_cache (session_id, body, cached_at) \
937             VALUES (?1, ?2, datetime('now')) \
938             ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
939            params![session_id, body],
940        )?;
941        Ok(())
942    }
943
944    pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
945        let body = self
946            .conn()
947            .query_row(
948                "SELECT body FROM body_cache WHERE session_id = ?1",
949                params![session_id],
950                |row| row.get(0),
951            )
952            .optional()?;
953        Ok(body)
954    }
955
956    // ── Migration helper ───────────────────────────────────────────────
957
958    /// Migrate entries from the old state.json UploadState into the local DB.
959    /// Marks them as `synced` with no metadata (we only know the file path was uploaded).
960    pub fn migrate_from_state_json(
961        &self,
962        uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
963    ) -> Result<usize> {
964        let mut count = 0;
965        for (path, uploaded_at) in uploaded {
966            let exists: bool = self
967                .conn()
968                .query_row(
969                    "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
970                    params![path],
971                    |row| row.get(0),
972                )
973                .unwrap_or(false);
974
975            if exists {
976                self.conn().execute(
977                    "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
978                     WHERE source_path = ?2 AND sync_status = 'local_only'",
979                    params![uploaded_at.to_rfc3339(), path],
980                )?;
981                count += 1;
982            }
983        }
984        Ok(count)
985    }
986
987    // ── Commit ↔ session linking ────────────────────────────────────
988
989    /// Link a git commit to an AI session.
990    pub fn link_commit_session(
991        &self,
992        commit_hash: &str,
993        session_id: &str,
994        repo_path: Option<&str>,
995        branch: Option<&str>,
996    ) -> Result<()> {
997        self.conn().execute(
998            "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
999             VALUES (?1, ?2, ?3, ?4) \
1000             ON CONFLICT(commit_hash, session_id) DO NOTHING",
1001            params![commit_hash, session_id, repo_path, branch],
1002        )?;
1003        Ok(())
1004    }
1005
1006    /// Get all sessions linked to a git commit.
1007    pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1008        let sql = format!(
1009            "SELECT {LOCAL_SESSION_COLUMNS} \
1010             {FROM_CLAUSE} \
1011             INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1012             WHERE csl.commit_hash = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
1013             ORDER BY s.created_at DESC"
1014        );
1015        let conn = self.conn();
1016        let mut stmt = conn.prepare(&sql)?;
1017        let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1018        let mut result = Vec::new();
1019        for row in rows {
1020            result.push(row?);
1021        }
1022        Ok(result)
1023    }
1024
1025    /// Get all commits linked to a session.
1026    pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1027        let conn = self.conn();
1028        let mut stmt = conn.prepare(
1029            "SELECT commit_hash, session_id, repo_path, branch, created_at \
1030             FROM commit_session_links WHERE session_id = ?1 \
1031             ORDER BY created_at DESC",
1032        )?;
1033        let rows = stmt.query_map(params![session_id], |row| {
1034            Ok(CommitLink {
1035                commit_hash: row.get(0)?,
1036                session_id: row.get(1)?,
1037                repo_path: row.get(2)?,
1038                branch: row.get(3)?,
1039                created_at: row.get(4)?,
1040            })
1041        })?;
1042        let mut result = Vec::new();
1043        for row in rows {
1044            result.push(row?);
1045        }
1046        Ok(result)
1047    }
1048
1049    /// Find the most recently active session for a given repo path.
1050    /// "Active" means the session's working_directory matches the repo path
1051    /// and was created within the last `since_minutes` minutes.
1052    pub fn find_active_session_for_repo(
1053        &self,
1054        repo_path: &str,
1055        since_minutes: u32,
1056    ) -> Result<Option<LocalSessionRow>> {
1057        let sql = format!(
1058            "SELECT {LOCAL_SESSION_COLUMNS} \
1059             {FROM_CLAUSE} \
1060             WHERE s.working_directory LIKE ?1 \
1061             AND COALESCE(s.is_auxiliary, 0) = 0 \
1062             AND s.created_at >= datetime('now', ?2) \
1063             ORDER BY s.created_at DESC LIMIT 1"
1064        );
1065        let since = format!("-{since_minutes} minutes");
1066        let like = format!("{repo_path}%");
1067        let conn = self.conn();
1068        let mut stmt = conn.prepare(&sql)?;
1069        let row = stmt
1070            .query_map(params![like, since], row_to_local_session)?
1071            .next()
1072            .transpose()?;
1073        Ok(row)
1074    }
1075
1076    /// Get all session IDs currently in the local DB.
1077    pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1078        let conn = self.conn();
1079        let mut stmt = conn
1080            .prepare("SELECT id FROM sessions")
1081            .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1082        let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1083        let mut set = std::collections::HashSet::new();
1084        if let Ok(rows) = rows {
1085            for row in rows.flatten() {
1086                set.insert(row);
1087            }
1088        }
1089        set
1090    }
1091
1092    /// Update only stats fields for an existing session (no git context re-extraction).
1093    pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1094        let title = session.context.title.as_deref();
1095        let description = session.context.description.as_deref();
1096        let (files_modified, files_read, has_errors) =
1097            opensession_core::extract::extract_file_metadata(session);
1098        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1099        let is_auxiliary = is_auxiliary_session(session);
1100
1101        self.conn().execute(
1102            "UPDATE sessions SET \
1103             title=?2, description=?3, \
1104             message_count=?4, user_message_count=?5, task_count=?6, \
1105             event_count=?7, duration_seconds=?8, \
1106             total_input_tokens=?9, total_output_tokens=?10, \
1107              files_modified=?11, files_read=?12, has_errors=?13, \
1108             max_active_agents=?14, is_auxiliary=?15 \
1109             WHERE id=?1",
1110            params![
1111                &session.session_id,
1112                title,
1113                description,
1114                session.stats.message_count as i64,
1115                session.stats.user_message_count as i64,
1116                session.stats.task_count as i64,
1117                session.stats.event_count as i64,
1118                session.stats.duration_seconds as i64,
1119                session.stats.total_input_tokens as i64,
1120                session.stats.total_output_tokens as i64,
1121                &files_modified,
1122                &files_read,
1123                has_errors,
1124                max_active_agents,
1125                is_auxiliary as i64,
1126            ],
1127        )?;
1128        Ok(())
1129    }
1130
1131    /// Update only sync metadata path for an existing session.
1132    pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1133        self.conn().execute(
1134            "INSERT INTO session_sync (session_id, source_path) \
1135             VALUES (?1, ?2) \
1136             ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1137            params![session_id, source_path],
1138        )?;
1139        Ok(())
1140    }
1141
1142    /// Get a list of distinct git repo names present in the DB.
1143    pub fn list_repos(&self) -> Result<Vec<String>> {
1144        let conn = self.conn();
1145        let mut stmt = conn.prepare(
1146            "SELECT DISTINCT git_repo_name FROM sessions \
1147             WHERE git_repo_name IS NOT NULL AND COALESCE(is_auxiliary, 0) = 0 \
1148             ORDER BY git_repo_name ASC",
1149        )?;
1150        let rows = stmt.query_map([], |row| row.get(0))?;
1151        let mut result = Vec::new();
1152        for row in rows {
1153            result.push(row?);
1154        }
1155        Ok(result)
1156    }
1157
1158    /// Get a list of distinct, non-empty working directories present in the DB.
1159    pub fn list_working_directories(&self) -> Result<Vec<String>> {
1160        let conn = self.conn();
1161        let mut stmt = conn.prepare(
1162            "SELECT DISTINCT working_directory FROM sessions \
1163             WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1164             AND COALESCE(is_auxiliary, 0) = 0 \
1165             ORDER BY working_directory ASC",
1166        )?;
1167        let rows = stmt.query_map([], |row| row.get(0))?;
1168        let mut result = Vec::new();
1169        for row in rows {
1170            result.push(row?);
1171        }
1172        Ok(result)
1173    }
1174}
1175
1176// ── Schema backfill for existing local DB files ───────────────────────
1177
1178fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1179    let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1180    conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1181
1182    // Disable FK constraints for local DB (index/cache, not source of truth)
1183    conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1184
1185    apply_local_migrations(&conn)?;
1186
1187    // Backfill missing columns for existing local DB files where `sessions`
1188    // existed before newer fields were introduced.
1189    ensure_sessions_columns(&conn)?;
1190    repair_session_tools_from_source_path(&conn)?;
1191    validate_local_schema(&conn)?;
1192
1193    Ok(conn)
1194}
1195
1196fn apply_local_migrations(conn: &Connection) -> Result<()> {
1197    conn.execute_batch(
1198        "CREATE TABLE IF NOT EXISTS _migrations (
1199            id INTEGER PRIMARY KEY,
1200            name TEXT NOT NULL UNIQUE,
1201            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1202        );",
1203    )
1204    .context("create _migrations table for local db")?;
1205
1206    for (name, sql) in MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1207        let already_applied: bool = conn
1208            .query_row(
1209                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1210                [name],
1211                |row| row.get(0),
1212            )
1213            .unwrap_or(false);
1214
1215        if already_applied {
1216            continue;
1217        }
1218
1219        if let Err(e) = conn.execute_batch(sql) {
1220            let msg = e.to_string().to_ascii_lowercase();
1221            if !is_local_migration_compat_error(&msg) {
1222                return Err(e).with_context(|| format!("apply local migration {name}"));
1223            }
1224        }
1225
1226        conn.execute(
1227            "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1228            [name],
1229        )
1230        .with_context(|| format!("record local migration {name}"))?;
1231    }
1232
1233    Ok(())
1234}
1235
1236fn is_local_migration_compat_error(msg: &str) -> bool {
1237    msg.contains("duplicate column name")
1238        || msg.contains("no such column")
1239        || msg.contains("already exists")
1240}
1241
1242fn validate_local_schema(conn: &Connection) -> Result<()> {
1243    let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1244    conn.prepare(&sql)
1245        .map(|_| ())
1246        .context("validate local session schema")
1247}
1248
1249fn repair_session_tools_from_source_path(conn: &Connection) -> Result<()> {
1250    let mut stmt = conn.prepare(
1251        "SELECT s.id, s.tool, ss.source_path \
1252         FROM sessions s \
1253         LEFT JOIN session_sync ss ON ss.session_id = s.id \
1254         WHERE ss.source_path IS NOT NULL",
1255    )?;
1256    let rows = stmt.query_map([], |row| {
1257        Ok((
1258            row.get::<_, String>(0)?,
1259            row.get::<_, String>(1)?,
1260            row.get::<_, Option<String>>(2)?,
1261        ))
1262    })?;
1263
1264    let mut updates: Vec<(String, String)> = Vec::new();
1265    for row in rows {
1266        let (id, current_tool, source_path) = row?;
1267        let normalized = normalize_tool_for_source_path(&current_tool, source_path.as_deref());
1268        if normalized != current_tool {
1269            updates.push((id, normalized));
1270        }
1271    }
1272    drop(stmt);
1273
1274    for (id, tool) in updates {
1275        conn.execute(
1276            "UPDATE sessions SET tool = ?1 WHERE id = ?2",
1277            params![tool, id],
1278        )?;
1279    }
1280
1281    Ok(())
1282}
1283
1284fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1285    let msg = format!("{err:#}").to_ascii_lowercase();
1286    msg.contains("no such column")
1287        || msg.contains("no such table")
1288        || msg.contains("cannot add a column")
1289        || msg.contains("already exists")
1290        || msg.contains("views may not be indexed")
1291        || msg.contains("malformed database schema")
1292        || msg.contains("duplicate column name")
1293}
1294
1295fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1296    if !path.exists() {
1297        return Ok(());
1298    }
1299
1300    let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1301    let backup_name = format!(
1302        "{}.legacy-{}.bak",
1303        path.file_name()
1304            .and_then(|n| n.to_str())
1305            .unwrap_or("local.db"),
1306        ts
1307    );
1308    let backup_path = path.with_file_name(backup_name);
1309    std::fs::rename(path, &backup_path).with_context(|| {
1310        format!(
1311            "rotate local db backup {} -> {}",
1312            path.display(),
1313            backup_path.display()
1314        )
1315    })?;
1316
1317    let wal = PathBuf::from(format!("{}-wal", path.display()));
1318    let shm = PathBuf::from(format!("{}-shm", path.display()));
1319    let _ = std::fs::remove_file(wal);
1320    let _ = std::fs::remove_file(shm);
1321    Ok(())
1322}
1323
1324const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1325    ("user_id", "TEXT"),
1326    ("team_id", "TEXT DEFAULT 'personal'"),
1327    ("tool", "TEXT DEFAULT ''"),
1328    ("agent_provider", "TEXT"),
1329    ("agent_model", "TEXT"),
1330    ("title", "TEXT"),
1331    ("description", "TEXT"),
1332    ("tags", "TEXT"),
1333    ("created_at", "TEXT DEFAULT ''"),
1334    ("uploaded_at", "TEXT DEFAULT ''"),
1335    ("message_count", "INTEGER DEFAULT 0"),
1336    ("user_message_count", "INTEGER DEFAULT 0"),
1337    ("task_count", "INTEGER DEFAULT 0"),
1338    ("event_count", "INTEGER DEFAULT 0"),
1339    ("duration_seconds", "INTEGER DEFAULT 0"),
1340    ("total_input_tokens", "INTEGER DEFAULT 0"),
1341    ("total_output_tokens", "INTEGER DEFAULT 0"),
1342    // Migration-only compatibility column from pre git-native body storage.
1343    ("body_storage_key", "TEXT DEFAULT ''"),
1344    ("body_url", "TEXT"),
1345    ("git_remote", "TEXT"),
1346    ("git_branch", "TEXT"),
1347    ("git_commit", "TEXT"),
1348    ("git_repo_name", "TEXT"),
1349    ("pr_number", "INTEGER"),
1350    ("pr_url", "TEXT"),
1351    ("working_directory", "TEXT"),
1352    ("files_modified", "TEXT"),
1353    ("files_read", "TEXT"),
1354    ("has_errors", "BOOLEAN DEFAULT 0"),
1355    ("max_active_agents", "INTEGER DEFAULT 1"),
1356    ("is_auxiliary", "INTEGER NOT NULL DEFAULT 0"),
1357];
1358
1359fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1360    let mut existing = HashSet::new();
1361    let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1362    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1363    for row in rows {
1364        existing.insert(row?);
1365    }
1366
1367    for (name, decl) in REQUIRED_SESSION_COLUMNS {
1368        if existing.contains(*name) {
1369            continue;
1370        }
1371        let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1372        conn.execute_batch(&sql)
1373            .with_context(|| format!("add sessions column '{name}'"))?;
1374    }
1375
1376    Ok(())
1377}
1378
1379/// Column list for SELECT queries against sessions + session_sync + users.
1380pub const LOCAL_SESSION_COLUMNS: &str = "\
1381s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1382s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1383s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1384s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1385s.total_input_tokens, s.total_output_tokens, \
1386s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1387s.pr_number, s.pr_url, s.working_directory, \
1388s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1), COALESCE(s.is_auxiliary, 0)";
1389
1390fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1391    let source_path: Option<String> = row.get(1)?;
1392    let tool: String = row.get(7)?;
1393    let normalized_tool = normalize_tool_for_source_path(&tool, source_path.as_deref());
1394
1395    Ok(LocalSessionRow {
1396        id: row.get(0)?,
1397        source_path,
1398        sync_status: row.get(2)?,
1399        last_synced_at: row.get(3)?,
1400        user_id: row.get(4)?,
1401        nickname: row.get(5)?,
1402        team_id: row.get(6)?,
1403        tool: normalized_tool,
1404        agent_provider: row.get(8)?,
1405        agent_model: row.get(9)?,
1406        title: row.get(10)?,
1407        description: row.get(11)?,
1408        tags: row.get(12)?,
1409        created_at: row.get(13)?,
1410        uploaded_at: row.get(14)?,
1411        message_count: row.get(15)?,
1412        user_message_count: row.get(16)?,
1413        task_count: row.get(17)?,
1414        event_count: row.get(18)?,
1415        duration_seconds: row.get(19)?,
1416        total_input_tokens: row.get(20)?,
1417        total_output_tokens: row.get(21)?,
1418        git_remote: row.get(22)?,
1419        git_branch: row.get(23)?,
1420        git_commit: row.get(24)?,
1421        git_repo_name: row.get(25)?,
1422        pr_number: row.get(26)?,
1423        pr_url: row.get(27)?,
1424        working_directory: row.get(28)?,
1425        files_modified: row.get(29)?,
1426        files_read: row.get(30)?,
1427        has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1428        max_active_agents: row.get(32).unwrap_or(1),
1429        is_auxiliary: row.get::<_, i64>(33).unwrap_or(0) != 0,
1430    })
1431}
1432
1433fn default_db_path() -> Result<PathBuf> {
1434    let home = std::env::var("HOME")
1435        .or_else(|_| std::env::var("USERPROFILE"))
1436        .context("Could not determine home directory")?;
1437    Ok(PathBuf::from(home)
1438        .join(".local")
1439        .join("share")
1440        .join("opensession")
1441        .join("local.db"))
1442}
1443
1444#[cfg(test)]
1445mod tests {
1446    use super::*;
1447
1448    use std::fs::{create_dir_all, write};
1449    use tempfile::tempdir;
1450
1451    fn test_db() -> LocalDb {
1452        let dir = tempdir().unwrap();
1453        let path = dir.keep().join("test.db");
1454        LocalDb::open_path(&path).unwrap()
1455    }
1456
1457    fn temp_root() -> tempfile::TempDir {
1458        tempdir().unwrap()
1459    }
1460
1461    fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1462        LocalSessionRow {
1463            id: id.to_string(),
1464            source_path: source_path.map(String::from),
1465            sync_status: "local_only".to_string(),
1466            last_synced_at: None,
1467            user_id: None,
1468            nickname: None,
1469            team_id: None,
1470            tool: tool.to_string(),
1471            agent_provider: None,
1472            agent_model: None,
1473            title: Some("test".to_string()),
1474            description: None,
1475            tags: None,
1476            created_at: "2024-01-01T00:00:00Z".to_string(),
1477            uploaded_at: None,
1478            message_count: 0,
1479            user_message_count: 0,
1480            task_count: 0,
1481            event_count: 0,
1482            duration_seconds: 0,
1483            total_input_tokens: 0,
1484            total_output_tokens: 0,
1485            git_remote: None,
1486            git_branch: None,
1487            git_commit: None,
1488            git_repo_name: None,
1489            pr_number: None,
1490            pr_url: None,
1491            working_directory: None,
1492            files_modified: None,
1493            files_read: None,
1494            has_errors: false,
1495            max_active_agents: 1,
1496            is_auxiliary: false,
1497        }
1498    }
1499
1500    #[test]
1501    fn test_open_and_schema() {
1502        let _db = test_db();
1503    }
1504
1505    #[test]
1506    fn test_open_repairs_codex_tool_hint_from_source_path() {
1507        let dir = tempfile::tempdir().unwrap();
1508        let path = dir.path().join("repair.db");
1509
1510        {
1511            let _ = LocalDb::open_path(&path).unwrap();
1512        }
1513
1514        {
1515            let conn = Connection::open(&path).unwrap();
1516            conn.execute(
1517                "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key) VALUES (?1, 'personal', 'claude-code', ?2, '')",
1518                params!["rollout-repair", "2026-02-20T00:00:00Z"],
1519            )
1520            .unwrap();
1521            conn.execute(
1522                "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1523                params!["rollout-repair", "/Users/test/.codex/sessions/2026/02/20/rollout-repair.jsonl"],
1524            )
1525            .unwrap();
1526        }
1527
1528        let db = LocalDb::open_path(&path).unwrap();
1529        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1530        let row = rows
1531            .iter()
1532            .find(|row| row.id == "rollout-repair")
1533            .expect("repaired row");
1534        assert_eq!(row.tool, "codex");
1535    }
1536
1537    #[test]
1538    fn test_upsert_local_session_normalizes_tool_from_source_path() {
1539        let db = test_db();
1540        let mut session = Session::new(
1541            "rollout-upsert".to_string(),
1542            opensession_core::trace::Agent {
1543                provider: "openai".to_string(),
1544                model: "gpt-5".to_string(),
1545                tool: "claude-code".to_string(),
1546                tool_version: None,
1547            },
1548        );
1549        session.stats.event_count = 1;
1550
1551        db.upsert_local_session(
1552            &session,
1553            "/Users/test/.codex/sessions/2026/02/20/rollout-upsert.jsonl",
1554            &crate::git::GitContext::default(),
1555        )
1556        .unwrap();
1557
1558        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1559        let row = rows
1560            .iter()
1561            .find(|row| row.id == "rollout-upsert")
1562            .expect("upserted row");
1563        assert_eq!(row.tool, "codex");
1564    }
1565
1566    #[test]
1567    fn test_open_backfills_legacy_sessions_columns() {
1568        let dir = tempfile::tempdir().unwrap();
1569        let path = dir.path().join("legacy.db");
1570        {
1571            let conn = Connection::open(&path).unwrap();
1572            conn.execute_batch(
1573                "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1574                 INSERT INTO sessions (id) VALUES ('legacy-1');",
1575            )
1576            .unwrap();
1577        }
1578
1579        let db = LocalDb::open_path(&path).unwrap();
1580        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1581        assert_eq!(rows.len(), 1);
1582        assert_eq!(rows[0].id, "legacy-1");
1583        assert_eq!(rows[0].user_message_count, 0);
1584    }
1585
1586    #[test]
1587    fn test_open_rotates_incompatible_legacy_schema() {
1588        let dir = tempfile::tempdir().unwrap();
1589        let path = dir.path().join("broken.db");
1590        {
1591            let conn = Connection::open(&path).unwrap();
1592            conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1593                .unwrap();
1594        }
1595
1596        let db = LocalDb::open_path(&path).unwrap();
1597        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1598        assert!(rows.is_empty());
1599
1600        let rotated = std::fs::read_dir(dir.path())
1601            .unwrap()
1602            .filter_map(Result::ok)
1603            .any(|entry| {
1604                let name = entry.file_name();
1605                let name = name.to_string_lossy();
1606                name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1607            });
1608        assert!(rotated, "expected rotated legacy backup file");
1609    }
1610
1611    #[test]
1612    fn test_is_opencode_child_session() {
1613        let root = temp_root();
1614        let dir = root.path().join("sessions");
1615        create_dir_all(&dir).unwrap();
1616        let parent_session = dir.join("parent.json");
1617        write(
1618            &parent_session,
1619            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1620        )
1621        .unwrap();
1622        let child_session = dir.join("child.json");
1623        write(
1624            &child_session,
1625            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1626        )
1627        .unwrap();
1628
1629        let parent = make_row(
1630            "ses_parent",
1631            "opencode",
1632            Some(parent_session.to_str().unwrap()),
1633        );
1634        let mut child = make_row(
1635            "ses_child",
1636            "opencode",
1637            Some(child_session.to_str().unwrap()),
1638        );
1639        child.is_auxiliary = true;
1640        let mut codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1641        codex.is_auxiliary = true;
1642
1643        assert!(!is_opencode_child_session(&parent));
1644        assert!(is_opencode_child_session(&child));
1645        assert!(!is_opencode_child_session(&codex));
1646    }
1647
1648    #[allow(deprecated)]
1649    #[test]
1650    fn test_parse_opencode_parent_session_id_aliases() {
1651        let root = temp_root();
1652        let dir = root.path().join("session-aliases");
1653        create_dir_all(&dir).unwrap();
1654        let child_session = dir.join("child.json");
1655        write(
1656            &child_session,
1657            r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1658        )
1659        .unwrap();
1660        assert_eq!(
1661            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1662            Some("ses_parent")
1663        );
1664    }
1665
1666    #[allow(deprecated)]
1667    #[test]
1668    fn test_parse_opencode_parent_session_id_nested_metadata() {
1669        let root = temp_root();
1670        let dir = root.path().join("session-nested");
1671        create_dir_all(&dir).unwrap();
1672        let child_session = dir.join("child.json");
1673        write(
1674            &child_session,
1675            r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1676        )
1677        .unwrap();
1678        assert_eq!(
1679            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1680            Some("ses_parent")
1681        );
1682    }
1683
1684    #[test]
1685    fn test_hide_opencode_child_sessions() {
1686        let root = temp_root();
1687        let dir = root.path().join("sessions");
1688        create_dir_all(&dir).unwrap();
1689        let parent_session = dir.join("parent.json");
1690        let child_session = dir.join("child.json");
1691        let orphan_session = dir.join("orphan.json");
1692
1693        write(
1694            &parent_session,
1695            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1696        )
1697        .unwrap();
1698        write(
1699            &child_session,
1700            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1701        )
1702        .unwrap();
1703        write(
1704            &orphan_session,
1705            r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1706        )
1707        .unwrap();
1708
1709        let rows = vec![
1710            {
1711                let mut row = make_row(
1712                    "ses_child",
1713                    "opencode",
1714                    Some(child_session.to_str().unwrap()),
1715                );
1716                row.is_auxiliary = true;
1717                row
1718            },
1719            make_row(
1720                "ses_parent",
1721                "opencode",
1722                Some(parent_session.to_str().unwrap()),
1723            ),
1724            {
1725                let mut row = make_row("ses_other", "codex", None);
1726                row.user_message_count = 1;
1727                row
1728            },
1729            make_row(
1730                "ses_orphan",
1731                "opencode",
1732                Some(orphan_session.to_str().unwrap()),
1733            ),
1734        ];
1735
1736        let filtered = hide_opencode_child_sessions(rows);
1737        assert_eq!(filtered.len(), 3);
1738        assert!(filtered.iter().all(|r| r.id != "ses_child"));
1739    }
1740
1741    #[test]
1742    fn test_sync_cursor() {
1743        let db = test_db();
1744        assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1745        db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1746        assert_eq!(
1747            db.get_sync_cursor("team1").unwrap(),
1748            Some("2024-01-01T00:00:00Z".to_string())
1749        );
1750        // Update
1751        db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1752        assert_eq!(
1753            db.get_sync_cursor("team1").unwrap(),
1754            Some("2024-06-01T00:00:00Z".to_string())
1755        );
1756    }
1757
1758    #[test]
1759    fn test_body_cache() {
1760        let db = test_db();
1761        assert_eq!(db.get_cached_body("s1").unwrap(), None);
1762        db.cache_body("s1", b"hello world").unwrap();
1763        assert_eq!(
1764            db.get_cached_body("s1").unwrap(),
1765            Some(b"hello world".to_vec())
1766        );
1767    }
1768
1769    #[test]
1770    fn test_local_migrations_are_loaded_from_api_crate() {
1771        let migration_names: Vec<&str> = super::LOCAL_MIGRATIONS
1772            .iter()
1773            .map(|(name, _)| *name)
1774            .collect();
1775        assert!(
1776            migration_names.contains(&"local_0003_session_flags"),
1777            "expected local_0003_session_flags migration from opensession-api"
1778        );
1779
1780        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1781        let migrations_dir = manifest_dir.join("migrations");
1782        if migrations_dir.exists() {
1783            let sql_files = std::fs::read_dir(migrations_dir)
1784                .expect("read local-db migrations directory")
1785                .filter_map(Result::ok)
1786                .map(|entry| entry.file_name().to_string_lossy().to_string())
1787                .filter(|name| name.ends_with(".sql"))
1788                .collect::<Vec<_>>();
1789            assert!(
1790                sql_files.is_empty(),
1791                "local-db must not ship duplicated migration SQL files"
1792            );
1793        }
1794    }
1795
1796    #[test]
1797    fn test_local_0003_session_flags_backfills_known_auxiliary_rows() {
1798        let dir = tempdir().unwrap();
1799        let path = dir.path().join("local-legacy.db");
1800
1801        {
1802            let conn = Connection::open(&path).unwrap();
1803            conn.execute_batch("PRAGMA foreign_keys=OFF;").unwrap();
1804            conn.execute_batch(
1805                "CREATE TABLE IF NOT EXISTS _migrations (
1806                    id INTEGER PRIMARY KEY,
1807                    name TEXT NOT NULL UNIQUE,
1808                    applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1809                );",
1810            )
1811            .unwrap();
1812
1813            for (name, sql) in super::MIGRATIONS.iter().chain(
1814                super::LOCAL_MIGRATIONS
1815                    .iter()
1816                    .filter(|(name, _)| *name != "local_0003_session_flags"),
1817            ) {
1818                conn.execute_batch(sql).unwrap();
1819                conn.execute(
1820                    "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1821                    params![name],
1822                )
1823                .unwrap();
1824            }
1825
1826            conn.execute(
1827                "INSERT INTO sessions \
1828                 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
1829                 VALUES (?1, 'personal', 'codex', '2026-02-20T00:00:00Z', '', 12, 6, 6, 20)",
1830                params!["primary-visible"],
1831            )
1832            .unwrap();
1833            conn.execute(
1834                "INSERT INTO sessions \
1835                 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
1836                 VALUES (?1, 'personal', 'opencode', '2026-02-20T00:00:00Z', '', 2, 0, 2, 6)",
1837                params!["opencode-heuristic-child"],
1838            )
1839            .unwrap();
1840            conn.execute(
1841                "INSERT INTO sessions \
1842                 (id, team_id, tool, created_at, body_storage_key, message_count, user_message_count, task_count, event_count) \
1843                 VALUES (?1, 'personal', 'claude-code', '2026-02-20T00:00:00Z', '', 3, 1, 3, 12)",
1844                params!["claude-subagent-child"],
1845            )
1846            .unwrap();
1847
1848            conn.execute(
1849                "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1850                params![
1851                    "opencode-heuristic-child",
1852                    "/Users/test/.opencode/sessions/opencode-heuristic-child.json"
1853                ],
1854            )
1855            .unwrap();
1856            conn.execute(
1857                "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1858                params![
1859                    "claude-subagent-child",
1860                    "/Users/test/.claude/projects/foo/subagents/agent-1.jsonl"
1861                ],
1862            )
1863            .unwrap();
1864        }
1865
1866        let db = LocalDb::open_path(&path).unwrap();
1867
1868        let flags = {
1869            let conn = db.conn();
1870            let mut stmt = conn
1871                .prepare("SELECT id, COALESCE(is_auxiliary, 0) FROM sessions ORDER BY id")
1872                .unwrap();
1873            let rows = stmt
1874                .query_map([], |row| {
1875                    Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1876                })
1877                .unwrap();
1878            rows.collect::<Result<Vec<_>, _>>().unwrap()
1879        };
1880
1881        assert_eq!(
1882            flags,
1883            vec![
1884                ("claude-subagent-child".to_string(), 1),
1885                ("opencode-heuristic-child".to_string(), 1),
1886                ("primary-visible".to_string(), 0),
1887            ]
1888        );
1889
1890        let visible = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1891        assert_eq!(visible.len(), 1);
1892        assert_eq!(visible[0].id, "primary-visible");
1893    }
1894
1895    #[test]
1896    fn test_upsert_remote_session() {
1897        let db = test_db();
1898        let summary = RemoteSessionSummary {
1899            id: "remote-1".to_string(),
1900            user_id: Some("u1".to_string()),
1901            nickname: Some("alice".to_string()),
1902            team_id: "t1".to_string(),
1903            tool: "claude-code".to_string(),
1904            agent_provider: None,
1905            agent_model: None,
1906            title: Some("Test session".to_string()),
1907            description: None,
1908            tags: None,
1909            created_at: "2024-01-01T00:00:00Z".to_string(),
1910            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
1911            message_count: 10,
1912            task_count: 2,
1913            event_count: 20,
1914            duration_seconds: 300,
1915            total_input_tokens: 1000,
1916            total_output_tokens: 500,
1917            git_remote: None,
1918            git_branch: None,
1919            git_commit: None,
1920            git_repo_name: None,
1921            pr_number: None,
1922            pr_url: None,
1923            working_directory: None,
1924            files_modified: None,
1925            files_read: None,
1926            has_errors: false,
1927            max_active_agents: 1,
1928        };
1929        db.upsert_remote_session(&summary).unwrap();
1930
1931        let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1932        assert_eq!(sessions.len(), 1);
1933        assert_eq!(sessions[0].id, "remote-1");
1934        assert_eq!(sessions[0].sync_status, "remote_only");
1935        assert_eq!(sessions[0].nickname, None); // no user in local users table
1936        assert!(!sessions[0].is_auxiliary);
1937    }
1938
1939    #[test]
1940    fn test_list_filter_by_repo() {
1941        let db = test_db();
1942        // Insert a remote session with team_id
1943        let summary1 = RemoteSessionSummary {
1944            id: "s1".to_string(),
1945            user_id: None,
1946            nickname: None,
1947            team_id: "t1".to_string(),
1948            tool: "claude-code".to_string(),
1949            agent_provider: None,
1950            agent_model: None,
1951            title: Some("Session 1".to_string()),
1952            description: None,
1953            tags: None,
1954            created_at: "2024-01-01T00:00:00Z".to_string(),
1955            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
1956            message_count: 5,
1957            task_count: 0,
1958            event_count: 10,
1959            duration_seconds: 60,
1960            total_input_tokens: 100,
1961            total_output_tokens: 50,
1962            git_remote: None,
1963            git_branch: None,
1964            git_commit: None,
1965            git_repo_name: None,
1966            pr_number: None,
1967            pr_url: None,
1968            working_directory: None,
1969            files_modified: None,
1970            files_read: None,
1971            has_errors: false,
1972            max_active_agents: 1,
1973        };
1974        db.upsert_remote_session(&summary1).unwrap();
1975
1976        // Filter by team
1977        let filter = LocalSessionFilter {
1978            team_id: Some("t1".to_string()),
1979            ..Default::default()
1980        };
1981        assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
1982
1983        let filter = LocalSessionFilter {
1984            team_id: Some("t999".to_string()),
1985            ..Default::default()
1986        };
1987        assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
1988    }
1989
1990    // ── Helpers for inserting test sessions ────────────────────────────
1991
1992    fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
1993        RemoteSessionSummary {
1994            id: id.to_string(),
1995            user_id: None,
1996            nickname: None,
1997            team_id: "t1".to_string(),
1998            tool: tool.to_string(),
1999            agent_provider: Some("anthropic".to_string()),
2000            agent_model: Some("claude-opus-4-6".to_string()),
2001            title: Some(title.to_string()),
2002            description: None,
2003            tags: None,
2004            created_at: created_at.to_string(),
2005            uploaded_at: created_at.to_string(),
2006            message_count: 5,
2007            task_count: 1,
2008            event_count: 10,
2009            duration_seconds: 300,
2010            total_input_tokens: 1000,
2011            total_output_tokens: 500,
2012            git_remote: None,
2013            git_branch: None,
2014            git_commit: None,
2015            git_repo_name: None,
2016            pr_number: None,
2017            pr_url: None,
2018            working_directory: None,
2019            files_modified: None,
2020            files_read: None,
2021            has_errors: false,
2022            max_active_agents: 1,
2023        }
2024    }
2025
2026    fn seed_sessions(db: &LocalDb) {
2027        // Insert 5 sessions across two tools, ordered by created_at
2028        db.upsert_remote_session(&make_summary(
2029            "s1",
2030            "claude-code",
2031            "First session",
2032            "2024-01-01T00:00:00Z",
2033        ))
2034        .unwrap();
2035        db.upsert_remote_session(&make_summary(
2036            "s2",
2037            "claude-code",
2038            "JWT auth work",
2039            "2024-01-02T00:00:00Z",
2040        ))
2041        .unwrap();
2042        db.upsert_remote_session(&make_summary(
2043            "s3",
2044            "gemini",
2045            "Gemini test",
2046            "2024-01-03T00:00:00Z",
2047        ))
2048        .unwrap();
2049        db.upsert_remote_session(&make_summary(
2050            "s4",
2051            "claude-code",
2052            "Error handling",
2053            "2024-01-04T00:00:00Z",
2054        ))
2055        .unwrap();
2056        db.upsert_remote_session(&make_summary(
2057            "s5",
2058            "claude-code",
2059            "Final polish",
2060            "2024-01-05T00:00:00Z",
2061        ))
2062        .unwrap();
2063    }
2064
2065    // ── list_sessions_log tests ────────────────────────────────────────
2066
2067    #[test]
2068    fn test_log_no_filters() {
2069        let db = test_db();
2070        seed_sessions(&db);
2071        let filter = LogFilter::default();
2072        let results = db.list_sessions_log(&filter).unwrap();
2073        assert_eq!(results.len(), 5);
2074        // Should be ordered by created_at DESC
2075        assert_eq!(results[0].id, "s5");
2076        assert_eq!(results[4].id, "s1");
2077    }
2078
2079    #[test]
2080    fn test_log_filter_by_tool() {
2081        let db = test_db();
2082        seed_sessions(&db);
2083        let filter = LogFilter {
2084            tool: Some("claude-code".to_string()),
2085            ..Default::default()
2086        };
2087        let results = db.list_sessions_log(&filter).unwrap();
2088        assert_eq!(results.len(), 4);
2089        assert!(results.iter().all(|s| s.tool == "claude-code"));
2090    }
2091
2092    #[test]
2093    fn test_log_filter_by_model_wildcard() {
2094        let db = test_db();
2095        seed_sessions(&db);
2096        let filter = LogFilter {
2097            model: Some("claude*".to_string()),
2098            ..Default::default()
2099        };
2100        let results = db.list_sessions_log(&filter).unwrap();
2101        assert_eq!(results.len(), 5); // all have claude-opus model
2102    }
2103
2104    #[test]
2105    fn test_log_filter_since() {
2106        let db = test_db();
2107        seed_sessions(&db);
2108        let filter = LogFilter {
2109            since: Some("2024-01-03T00:00:00Z".to_string()),
2110            ..Default::default()
2111        };
2112        let results = db.list_sessions_log(&filter).unwrap();
2113        assert_eq!(results.len(), 3); // s3, s4, s5
2114    }
2115
2116    #[test]
2117    fn test_log_filter_before() {
2118        let db = test_db();
2119        seed_sessions(&db);
2120        let filter = LogFilter {
2121            before: Some("2024-01-03T00:00:00Z".to_string()),
2122            ..Default::default()
2123        };
2124        let results = db.list_sessions_log(&filter).unwrap();
2125        assert_eq!(results.len(), 2); // s1, s2
2126    }
2127
2128    #[test]
2129    fn test_log_filter_since_and_before() {
2130        let db = test_db();
2131        seed_sessions(&db);
2132        let filter = LogFilter {
2133            since: Some("2024-01-02T00:00:00Z".to_string()),
2134            before: Some("2024-01-04T00:00:00Z".to_string()),
2135            ..Default::default()
2136        };
2137        let results = db.list_sessions_log(&filter).unwrap();
2138        assert_eq!(results.len(), 2); // s2, s3
2139    }
2140
2141    #[test]
2142    fn test_log_filter_grep() {
2143        let db = test_db();
2144        seed_sessions(&db);
2145        let filter = LogFilter {
2146            grep: Some("JWT".to_string()),
2147            ..Default::default()
2148        };
2149        let results = db.list_sessions_log(&filter).unwrap();
2150        assert_eq!(results.len(), 1);
2151        assert_eq!(results[0].id, "s2");
2152    }
2153
2154    #[test]
2155    fn test_log_limit_and_offset() {
2156        let db = test_db();
2157        seed_sessions(&db);
2158        let filter = LogFilter {
2159            limit: Some(2),
2160            offset: Some(1),
2161            ..Default::default()
2162        };
2163        let results = db.list_sessions_log(&filter).unwrap();
2164        assert_eq!(results.len(), 2);
2165        assert_eq!(results[0].id, "s4"); // second most recent
2166        assert_eq!(results[1].id, "s3");
2167    }
2168
2169    #[test]
2170    fn test_log_limit_only() {
2171        let db = test_db();
2172        seed_sessions(&db);
2173        let filter = LogFilter {
2174            limit: Some(3),
2175            ..Default::default()
2176        };
2177        let results = db.list_sessions_log(&filter).unwrap();
2178        assert_eq!(results.len(), 3);
2179    }
2180
2181    #[test]
2182    fn test_list_sessions_limit_offset() {
2183        let db = test_db();
2184        seed_sessions(&db);
2185        let filter = LocalSessionFilter {
2186            limit: Some(2),
2187            offset: Some(1),
2188            ..Default::default()
2189        };
2190        let results = db.list_sessions(&filter).unwrap();
2191        assert_eq!(results.len(), 2);
2192        assert_eq!(results[0].id, "s4");
2193        assert_eq!(results[1].id, "s3");
2194    }
2195
2196    #[test]
2197    fn test_count_sessions_filtered() {
2198        let db = test_db();
2199        seed_sessions(&db);
2200        let count = db
2201            .count_sessions_filtered(&LocalSessionFilter::default())
2202            .unwrap();
2203        assert_eq!(count, 5);
2204    }
2205
2206    #[test]
2207    fn test_list_and_count_filters_match_when_auxiliary_rows_exist() {
2208        let db = test_db();
2209        seed_sessions(&db);
2210        db.conn()
2211            .execute(
2212                "UPDATE sessions SET is_auxiliary = 1 WHERE id IN ('s2', 's3')",
2213                [],
2214            )
2215            .unwrap();
2216
2217        let default_filter = LocalSessionFilter::default();
2218        let rows = db.list_sessions(&default_filter).unwrap();
2219        let count = db.count_sessions_filtered(&default_filter).unwrap();
2220        assert_eq!(rows.len() as i64, count);
2221        assert!(rows.iter().all(|row| !row.is_auxiliary));
2222
2223        let gemini_filter = LocalSessionFilter {
2224            tool: Some("gemini".to_string()),
2225            ..Default::default()
2226        };
2227        let gemini_rows = db.list_sessions(&gemini_filter).unwrap();
2228        let gemini_count = db.count_sessions_filtered(&gemini_filter).unwrap();
2229        assert_eq!(gemini_rows.len() as i64, gemini_count);
2230        assert!(gemini_rows.is_empty());
2231        assert_eq!(gemini_count, 0);
2232    }
2233
2234    #[test]
2235    fn test_list_working_directories_distinct_non_empty() {
2236        let db = test_db();
2237
2238        let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2239        a.working_directory = Some("/tmp/repo-a".to_string());
2240        let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2241        b.working_directory = Some("/tmp/repo-a".to_string());
2242        let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2243        c.working_directory = Some("/tmp/repo-b".to_string());
2244        let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2245        d.working_directory = Some("".to_string());
2246
2247        db.upsert_remote_session(&a).unwrap();
2248        db.upsert_remote_session(&b).unwrap();
2249        db.upsert_remote_session(&c).unwrap();
2250        db.upsert_remote_session(&d).unwrap();
2251
2252        let dirs = db.list_working_directories().unwrap();
2253        assert_eq!(
2254            dirs,
2255            vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2256        );
2257    }
2258
2259    #[test]
2260    fn test_list_session_tools() {
2261        let db = test_db();
2262        seed_sessions(&db);
2263        let tools = db
2264            .list_session_tools(&LocalSessionFilter::default())
2265            .unwrap();
2266        assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2267    }
2268
2269    #[test]
2270    fn test_log_combined_filters() {
2271        let db = test_db();
2272        seed_sessions(&db);
2273        let filter = LogFilter {
2274            tool: Some("claude-code".to_string()),
2275            since: Some("2024-01-03T00:00:00Z".to_string()),
2276            limit: Some(1),
2277            ..Default::default()
2278        };
2279        let results = db.list_sessions_log(&filter).unwrap();
2280        assert_eq!(results.len(), 1);
2281        assert_eq!(results[0].id, "s5"); // most recent claude-code after Jan 3
2282    }
2283
2284    // ── Session offset/latest tests ────────────────────────────────────
2285
2286    #[test]
2287    fn test_get_session_by_offset() {
2288        let db = test_db();
2289        seed_sessions(&db);
2290        let row = db.get_session_by_offset(0).unwrap().unwrap();
2291        assert_eq!(row.id, "s5"); // most recent
2292        let row = db.get_session_by_offset(2).unwrap().unwrap();
2293        assert_eq!(row.id, "s3");
2294        assert!(db.get_session_by_offset(10).unwrap().is_none());
2295    }
2296
2297    #[test]
2298    fn test_get_session_by_tool_offset() {
2299        let db = test_db();
2300        seed_sessions(&db);
2301        let row = db
2302            .get_session_by_tool_offset("claude-code", 0)
2303            .unwrap()
2304            .unwrap();
2305        assert_eq!(row.id, "s5");
2306        let row = db
2307            .get_session_by_tool_offset("claude-code", 1)
2308            .unwrap()
2309            .unwrap();
2310        assert_eq!(row.id, "s4");
2311        let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2312        assert_eq!(row.id, "s3");
2313        assert!(db
2314            .get_session_by_tool_offset("gemini", 1)
2315            .unwrap()
2316            .is_none());
2317    }
2318
2319    #[test]
2320    fn test_get_sessions_latest() {
2321        let db = test_db();
2322        seed_sessions(&db);
2323        let rows = db.get_sessions_latest(3).unwrap();
2324        assert_eq!(rows.len(), 3);
2325        assert_eq!(rows[0].id, "s5");
2326        assert_eq!(rows[1].id, "s4");
2327        assert_eq!(rows[2].id, "s3");
2328    }
2329
2330    #[test]
2331    fn test_get_sessions_by_tool_latest() {
2332        let db = test_db();
2333        seed_sessions(&db);
2334        let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2335        assert_eq!(rows.len(), 2);
2336        assert_eq!(rows[0].id, "s5");
2337        assert_eq!(rows[1].id, "s4");
2338    }
2339
2340    #[test]
2341    fn test_get_sessions_latest_more_than_available() {
2342        let db = test_db();
2343        seed_sessions(&db);
2344        let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2345        assert_eq!(rows.len(), 1); // only 1 gemini session
2346    }
2347
2348    #[test]
2349    fn test_session_count() {
2350        let db = test_db();
2351        assert_eq!(db.session_count().unwrap(), 0);
2352        seed_sessions(&db);
2353        assert_eq!(db.session_count().unwrap(), 5);
2354    }
2355
2356    // ── Commit link tests ─────────────────────────────────────────────
2357
2358    #[test]
2359    fn test_link_commit_session() {
2360        let db = test_db();
2361        seed_sessions(&db);
2362        db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2363            .unwrap();
2364
2365        let commits = db.get_commits_by_session("s1").unwrap();
2366        assert_eq!(commits.len(), 1);
2367        assert_eq!(commits[0].commit_hash, "abc123");
2368        assert_eq!(commits[0].session_id, "s1");
2369        assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2370        assert_eq!(commits[0].branch.as_deref(), Some("main"));
2371
2372        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2373        assert_eq!(sessions.len(), 1);
2374        assert_eq!(sessions[0].id, "s1");
2375    }
2376
2377    #[test]
2378    fn test_get_sessions_by_commit() {
2379        let db = test_db();
2380        seed_sessions(&db);
2381        // Link multiple sessions to the same commit
2382        db.link_commit_session("abc123", "s1", None, None).unwrap();
2383        db.link_commit_session("abc123", "s2", None, None).unwrap();
2384        db.link_commit_session("abc123", "s3", None, None).unwrap();
2385
2386        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2387        assert_eq!(sessions.len(), 3);
2388        // Ordered by created_at DESC
2389        assert_eq!(sessions[0].id, "s3");
2390        assert_eq!(sessions[1].id, "s2");
2391        assert_eq!(sessions[2].id, "s1");
2392    }
2393
2394    #[test]
2395    fn test_get_commits_by_session() {
2396        let db = test_db();
2397        seed_sessions(&db);
2398        // Link multiple commits to the same session
2399        db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2400            .unwrap();
2401        db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2402            .unwrap();
2403        db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2404            .unwrap();
2405
2406        let commits = db.get_commits_by_session("s1").unwrap();
2407        assert_eq!(commits.len(), 3);
2408        // All linked to s1
2409        assert!(commits.iter().all(|c| c.session_id == "s1"));
2410    }
2411
2412    #[test]
2413    fn test_duplicate_link_ignored() {
2414        let db = test_db();
2415        seed_sessions(&db);
2416        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2417            .unwrap();
2418        // Inserting the same link again should not error
2419        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2420            .unwrap();
2421
2422        let commits = db.get_commits_by_session("s1").unwrap();
2423        assert_eq!(commits.len(), 1);
2424    }
2425
2426    #[test]
2427    fn test_log_filter_by_commit() {
2428        let db = test_db();
2429        seed_sessions(&db);
2430        db.link_commit_session("abc123", "s2", None, None).unwrap();
2431        db.link_commit_session("abc123", "s4", None, None).unwrap();
2432
2433        let filter = LogFilter {
2434            commit: Some("abc123".to_string()),
2435            ..Default::default()
2436        };
2437        let results = db.list_sessions_log(&filter).unwrap();
2438        assert_eq!(results.len(), 2);
2439        assert_eq!(results[0].id, "s4");
2440        assert_eq!(results[1].id, "s2");
2441
2442        // Non-existent commit returns nothing
2443        let filter = LogFilter {
2444            commit: Some("nonexistent".to_string()),
2445            ..Default::default()
2446        };
2447        let results = db.list_sessions_log(&filter).unwrap();
2448        assert_eq!(results.len(), 0);
2449    }
2450}