Skip to main content

opensession_local_db/
lib.rs

1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16// Keep local index/cache schema aligned with remote/session schema migrations.
17const REMOTE_MIGRATIONS: &[Migration] = &[
18    ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19    (
20        "0003_max_active_agents",
21        include_str!("../migrations/0003_max_active_agents.sql"),
22    ),
23    (
24        "0004_oauth_states_provider",
25        include_str!("../migrations/0004_oauth_states_provider.sql"),
26    ),
27    (
28        "0005_sessions_body_url_backfill",
29        include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
30    ),
31    (
32        "0006_sessions_remove_fk_constraints",
33        include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
34    ),
35    (
36        "0007_sessions_list_perf_indexes",
37        include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
38    ),
39    (
40        "0009_session_score_plugin",
41        include_str!("../migrations/0009_session_score_plugin.sql"),
42    ),
43    (
44        "0010_api_keys_issuance",
45        include_str!("../migrations/0010_api_keys_issuance.sql"),
46    ),
47];
48
49const LOCAL_MIGRATIONS: &[Migration] = &[
50    (
51        "local_0001_schema",
52        include_str!("../migrations/local_0001_schema.sql"),
53    ),
54    (
55        "local_0002_drop_unused_local_sessions",
56        include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
57    ),
58];
59
60/// A local session row stored in the local SQLite index/cache database.
61#[derive(Debug, Clone)]
62pub struct LocalSessionRow {
63    pub id: String,
64    pub source_path: Option<String>,
65    pub sync_status: String,
66    pub last_synced_at: Option<String>,
67    pub user_id: Option<String>,
68    pub nickname: Option<String>,
69    pub team_id: Option<String>,
70    pub tool: String,
71    pub agent_provider: Option<String>,
72    pub agent_model: Option<String>,
73    pub title: Option<String>,
74    pub description: Option<String>,
75    pub tags: Option<String>,
76    pub created_at: String,
77    pub uploaded_at: Option<String>,
78    pub message_count: i64,
79    pub user_message_count: i64,
80    pub task_count: i64,
81    pub event_count: i64,
82    pub duration_seconds: i64,
83    pub total_input_tokens: i64,
84    pub total_output_tokens: i64,
85    pub git_remote: Option<String>,
86    pub git_branch: Option<String>,
87    pub git_commit: Option<String>,
88    pub git_repo_name: Option<String>,
89    pub pr_number: Option<i64>,
90    pub pr_url: Option<String>,
91    pub working_directory: Option<String>,
92    pub files_modified: Option<String>,
93    pub files_read: Option<String>,
94    pub has_errors: bool,
95    pub max_active_agents: i64,
96}
97
98/// A link between a git commit and an AI session.
99#[derive(Debug, Clone)]
100pub struct CommitLink {
101    pub commit_hash: String,
102    pub session_id: String,
103    pub repo_path: Option<String>,
104    pub branch: Option<String>,
105    pub created_at: String,
106}
107
108/// Return true when a cached row corresponds to an OpenCode child session.
109pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
110    if row.tool != "opencode" {
111        return false;
112    }
113
114    // Strong heuristic: some orchestration artifacts are generated as sessions
115    // that do not carry a user-visible request surface and only contain system
116    // or agent-only events.
117    if row.user_message_count <= 0
118        && row.message_count <= 4
119        && row.task_count <= 4
120        && row.event_count > 0
121        && row.event_count <= 16
122    {
123        return true;
124    }
125
126    if is_opencode_subagent_source(row.source_path.as_deref()) {
127        return true;
128    }
129
130    let source_path = match row.source_path.as_deref() {
131        Some(path) if !path.trim().is_empty() => path,
132        _ => return false,
133    };
134
135    parse_opencode_parent_session_id(source_path)
136        .is_some_and(|parent_id| !parent_id.trim().is_empty())
137}
138
139/// Parse `parentID` / `parentId` from an OpenCode session JSON file.
140pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
141    let text = fs::read_to_string(source_path).ok()?;
142    let json: Value = serde_json::from_str(&text).ok()?;
143    lookup_parent_session_id(&json)
144}
145
146fn lookup_parent_session_id(value: &Value) -> Option<String> {
147    match value {
148        Value::Object(obj) => {
149            for (key, value) in obj {
150                if is_parent_id_key(key) {
151                    if let Some(parent_id) = value.as_str() {
152                        let parent_id = parent_id.trim();
153                        if !parent_id.is_empty() {
154                            return Some(parent_id.to_string());
155                        }
156                    }
157                }
158                if let Some(parent_id) = lookup_parent_session_id(value) {
159                    return Some(parent_id);
160                }
161            }
162            None
163        }
164        Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
165        _ => None,
166    }
167}
168
169fn is_parent_id_key(key: &str) -> bool {
170    let flat = key
171        .chars()
172        .filter(|c| c.is_ascii_alphanumeric())
173        .map(|c| c.to_ascii_lowercase())
174        .collect::<String>();
175
176    flat == "parentid"
177        || flat == "parentuuid"
178        || flat == "parentsessionid"
179        || flat == "parentsessionuuid"
180        || flat.ends_with("parentsessionid")
181        || (flat.contains("parent") && flat.ends_with("id"))
182        || (flat.contains("parent") && flat.ends_with("uuid"))
183}
184
185/// Remove OpenCode child sessions so only parent sessions remain visible.
186pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
187    rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
188    rows
189}
190
191fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
192    is_subagent_source(source_path)
193}
194
195fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
196    if row.tool != "claude-code" {
197        return false;
198    }
199
200    is_subagent_source(row.source_path.as_deref())
201}
202
203fn is_subagent_source(source_path: Option<&str>) -> bool {
204    let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
205        return false;
206    };
207
208    if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
209        return true;
210    }
211
212    let filename = match std::path::Path::new(&source_path).file_name() {
213        Some(name) => name.to_string_lossy(),
214        None => return false,
215    };
216
217    filename.starts_with("agent-")
218        || filename.starts_with("agent_")
219        || filename.starts_with("subagent-")
220        || filename.starts_with("subagent_")
221}
222
223fn infer_tool_from_source_path(source_path: Option<&str>) -> Option<&'static str> {
224    let source_path = source_path.map(|path| path.to_ascii_lowercase())?;
225
226    if source_path.contains("/.codex/sessions/")
227        || source_path.contains("\\.codex\\sessions\\")
228        || source_path.contains("/codex/sessions/")
229        || source_path.contains("\\codex\\sessions\\")
230    {
231        return Some("codex");
232    }
233
234    if source_path.contains("/.claude/projects/")
235        || source_path.contains("\\.claude\\projects\\")
236        || source_path.contains("/claude/projects/")
237        || source_path.contains("\\claude\\projects\\")
238    {
239        return Some("claude-code");
240    }
241
242    None
243}
244
245fn normalize_tool_for_source_path(current_tool: &str, source_path: Option<&str>) -> String {
246    infer_tool_from_source_path(source_path)
247        .unwrap_or(current_tool)
248        .to_string()
249}
250
251/// Filter for listing sessions from the local DB.
252#[derive(Debug, Clone)]
253pub struct LocalSessionFilter {
254    pub team_id: Option<String>,
255    pub sync_status: Option<String>,
256    pub git_repo_name: Option<String>,
257    pub search: Option<String>,
258    pub tool: Option<String>,
259    pub sort: LocalSortOrder,
260    pub time_range: LocalTimeRange,
261    pub limit: Option<u32>,
262    pub offset: Option<u32>,
263}
264
265impl Default for LocalSessionFilter {
266    fn default() -> Self {
267        Self {
268            team_id: None,
269            sync_status: None,
270            git_repo_name: None,
271            search: None,
272            tool: None,
273            sort: LocalSortOrder::Recent,
274            time_range: LocalTimeRange::All,
275            limit: None,
276            offset: None,
277        }
278    }
279}
280
281/// Sort order for local session listing.
282#[derive(Debug, Clone, Default, PartialEq, Eq)]
283pub enum LocalSortOrder {
284    #[default]
285    Recent,
286    Popular,
287    Longest,
288}
289
290/// Time range filter for local session listing.
291#[derive(Debug, Clone, Default, PartialEq, Eq)]
292pub enum LocalTimeRange {
293    Hours24,
294    Days7,
295    Days30,
296    #[default]
297    All,
298}
299
300/// Minimal remote session payload needed for local index/cache upsert.
301#[derive(Debug, Clone)]
302pub struct RemoteSessionSummary {
303    pub id: String,
304    pub user_id: Option<String>,
305    pub nickname: Option<String>,
306    pub team_id: String,
307    pub tool: String,
308    pub agent_provider: Option<String>,
309    pub agent_model: Option<String>,
310    pub title: Option<String>,
311    pub description: Option<String>,
312    pub tags: Option<String>,
313    pub created_at: String,
314    pub uploaded_at: String,
315    pub message_count: i64,
316    pub task_count: i64,
317    pub event_count: i64,
318    pub duration_seconds: i64,
319    pub total_input_tokens: i64,
320    pub total_output_tokens: i64,
321    pub git_remote: Option<String>,
322    pub git_branch: Option<String>,
323    pub git_commit: Option<String>,
324    pub git_repo_name: Option<String>,
325    pub pr_number: Option<i64>,
326    pub pr_url: Option<String>,
327    pub working_directory: Option<String>,
328    pub files_modified: Option<String>,
329    pub files_read: Option<String>,
330    pub has_errors: bool,
331    pub max_active_agents: i64,
332}
333
334/// Extended filter for the `log` command.
335#[derive(Debug, Default)]
336pub struct LogFilter {
337    /// Filter by tool name (exact match).
338    pub tool: Option<String>,
339    /// Filter by model (glob-like, uses LIKE).
340    pub model: Option<String>,
341    /// Filter sessions created after this ISO8601 timestamp.
342    pub since: Option<String>,
343    /// Filter sessions created before this ISO8601 timestamp.
344    pub before: Option<String>,
345    /// Filter sessions that touched this file path (searches files_modified JSON).
346    pub touches: Option<String>,
347    /// Free-text search in title, description, tags.
348    pub grep: Option<String>,
349    /// Only sessions with errors.
350    pub has_errors: Option<bool>,
351    /// Filter by working directory (prefix match).
352    pub working_directory: Option<String>,
353    /// Filter by git repo name.
354    pub git_repo_name: Option<String>,
355    /// Filter sessions linked to this git commit hash.
356    pub commit: Option<String>,
357    /// Maximum number of results.
358    pub limit: Option<u32>,
359    /// Offset for pagination.
360    pub offset: Option<u32>,
361}
362
363/// Base FROM clause for session list queries.
364const FROM_CLAUSE: &str = "\
365FROM sessions s \
366LEFT JOIN session_sync ss ON ss.session_id = s.id \
367LEFT JOIN users u ON u.id = s.user_id";
368
369/// Local SQLite index/cache shared by TUI and Daemon.
370/// This is not the source of truth for canonical session bodies.
371/// Thread-safe: wraps the connection in a Mutex so it can be shared via `Arc<LocalDb>`.
372pub struct LocalDb {
373    conn: Mutex<Connection>,
374}
375
376impl LocalDb {
377    /// Open (or create) the local database at the default path.
378    /// `~/.local/share/opensession/local.db`
379    pub fn open() -> Result<Self> {
380        let path = default_db_path()?;
381        Self::open_path(&path)
382    }
383
384    /// Open (or create) the local database at a specific path.
385    pub fn open_path(path: &PathBuf) -> Result<Self> {
386        if let Some(parent) = path.parent() {
387            std::fs::create_dir_all(parent)
388                .with_context(|| format!("create dir for {}", path.display()))?;
389        }
390        match open_connection_with_latest_schema(path) {
391            Ok(conn) => Ok(Self {
392                conn: Mutex::new(conn),
393            }),
394            Err(err) => {
395                if !is_schema_compat_error(&err) {
396                    return Err(err);
397                }
398
399                // Local DB is a cache. If schema migration cannot safely reconcile
400                // an incompatible/corrupted file, rotate it out and recreate latest schema.
401                rotate_legacy_db(path)?;
402
403                let conn = open_connection_with_latest_schema(path)
404                    .with_context(|| format!("recreate db {}", path.display()))?;
405                Ok(Self {
406                    conn: Mutex::new(conn),
407                })
408            }
409        }
410    }
411
412    fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
413        self.conn.lock().expect("local db mutex poisoned")
414    }
415
416    // ── Upsert local session (parsed from file) ────────────────────────
417
418    pub fn upsert_local_session(
419        &self,
420        session: &Session,
421        source_path: &str,
422        git: &GitContext,
423    ) -> Result<()> {
424        let title = session.context.title.as_deref();
425        let description = session.context.description.as_deref();
426        let tags = if session.context.tags.is_empty() {
427            None
428        } else {
429            Some(session.context.tags.join(","))
430        };
431        let created_at = session.context.created_at.to_rfc3339();
432        let cwd = session
433            .context
434            .attributes
435            .get("cwd")
436            .or_else(|| session.context.attributes.get("working_directory"))
437            .and_then(|v| v.as_str().map(String::from));
438
439        // Extract files_modified, files_read, and has_errors from events
440        let (files_modified, files_read, has_errors) =
441            opensession_core::extract::extract_file_metadata(session);
442        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
443        let normalized_tool =
444            normalize_tool_for_source_path(&session.agent.tool, Some(source_path));
445
446        let conn = self.conn();
447        // NOTE: `body_storage_key` is kept only for migration/schema parity.
448        // Runtime lookup uses canonical body URLs and local body cache tables.
449        conn.execute(
450            "INSERT INTO sessions \
451             (id, team_id, tool, agent_provider, agent_model, \
452              title, description, tags, created_at, \
453              message_count, user_message_count, task_count, event_count, duration_seconds, \
454              total_input_tokens, total_output_tokens, body_storage_key, \
455              git_remote, git_branch, git_commit, git_repo_name, working_directory, \
456              files_modified, files_read, has_errors, max_active_agents) \
457             VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
458             ON CONFLICT(id) DO UPDATE SET \
459              tool=excluded.tool, agent_provider=excluded.agent_provider, \
460              agent_model=excluded.agent_model, \
461              title=excluded.title, description=excluded.description, \
462              tags=excluded.tags, \
463              message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
464              task_count=excluded.task_count, \
465              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
466              total_input_tokens=excluded.total_input_tokens, \
467              total_output_tokens=excluded.total_output_tokens, \
468              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
469              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
470              working_directory=excluded.working_directory, \
471              files_modified=excluded.files_modified, files_read=excluded.files_read, \
472              has_errors=excluded.has_errors, \
473              max_active_agents=excluded.max_active_agents",
474            params![
475                &session.session_id,
476                &normalized_tool,
477                &session.agent.provider,
478                &session.agent.model,
479                title,
480                description,
481                &tags,
482                &created_at,
483                session.stats.message_count as i64,
484                session.stats.user_message_count as i64,
485                session.stats.task_count as i64,
486                session.stats.event_count as i64,
487                session.stats.duration_seconds as i64,
488                session.stats.total_input_tokens as i64,
489                session.stats.total_output_tokens as i64,
490                &git.remote,
491                &git.branch,
492                &git.commit,
493                &git.repo_name,
494                &cwd,
495                &files_modified,
496                &files_read,
497                has_errors,
498                max_active_agents,
499            ],
500        )?;
501
502        conn.execute(
503            "INSERT INTO session_sync (session_id, source_path, sync_status) \
504             VALUES (?1, ?2, 'local_only') \
505             ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
506            params![&session.session_id, source_path],
507        )?;
508        Ok(())
509    }
510
511    // ── Upsert remote session (from server sync pull) ──────────────────
512
513    pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
514        let conn = self.conn();
515        // NOTE: `body_storage_key` is kept only for migration/schema parity.
516        // Runtime lookup uses canonical body URLs and local body cache tables.
517        conn.execute(
518            "INSERT INTO sessions \
519             (id, user_id, team_id, tool, agent_provider, agent_model, \
520              title, description, tags, created_at, uploaded_at, \
521              message_count, task_count, event_count, duration_seconds, \
522              total_input_tokens, total_output_tokens, body_storage_key, \
523              git_remote, git_branch, git_commit, git_repo_name, \
524              pr_number, pr_url, working_directory, \
525              files_modified, files_read, has_errors, max_active_agents) \
526             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
527             ON CONFLICT(id) DO UPDATE SET \
528              title=excluded.title, description=excluded.description, \
529              tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
530              message_count=excluded.message_count, task_count=excluded.task_count, \
531              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
532              total_input_tokens=excluded.total_input_tokens, \
533              total_output_tokens=excluded.total_output_tokens, \
534              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
535              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
536              pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
537              working_directory=excluded.working_directory, \
538              files_modified=excluded.files_modified, files_read=excluded.files_read, \
539              has_errors=excluded.has_errors, \
540              max_active_agents=excluded.max_active_agents",
541            params![
542                &summary.id,
543                &summary.user_id,
544                &summary.team_id,
545                &summary.tool,
546                &summary.agent_provider,
547                &summary.agent_model,
548                &summary.title,
549                &summary.description,
550                &summary.tags,
551                &summary.created_at,
552                &summary.uploaded_at,
553                summary.message_count,
554                summary.task_count,
555                summary.event_count,
556                summary.duration_seconds,
557                summary.total_input_tokens,
558                summary.total_output_tokens,
559                &summary.git_remote,
560                &summary.git_branch,
561                &summary.git_commit,
562                &summary.git_repo_name,
563                summary.pr_number,
564                &summary.pr_url,
565                &summary.working_directory,
566                &summary.files_modified,
567                &summary.files_read,
568                summary.has_errors,
569                summary.max_active_agents,
570            ],
571        )?;
572
573        conn.execute(
574            "INSERT INTO session_sync (session_id, sync_status) \
575             VALUES (?1, 'remote_only') \
576             ON CONFLICT(session_id) DO UPDATE SET \
577              sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
578            params![&summary.id],
579        )?;
580        Ok(())
581    }
582
583    // ── List sessions ──────────────────────────────────────────────────
584
585    fn build_local_session_where_clause(
586        filter: &LocalSessionFilter,
587    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
588        let mut where_clauses = vec![
589            "1=1".to_string(),
590            "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
591            "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
592            "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
593        ];
594        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
595        let mut idx = 1u32;
596
597        if let Some(ref team_id) = filter.team_id {
598            where_clauses.push(format!("s.team_id = ?{idx}"));
599            param_values.push(Box::new(team_id.clone()));
600            idx += 1;
601        }
602
603        if let Some(ref sync_status) = filter.sync_status {
604            where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
605            param_values.push(Box::new(sync_status.clone()));
606            idx += 1;
607        }
608
609        if let Some(ref repo) = filter.git_repo_name {
610            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
611            param_values.push(Box::new(repo.clone()));
612            idx += 1;
613        }
614
615        if let Some(ref tool) = filter.tool {
616            where_clauses.push(format!("s.tool = ?{idx}"));
617            param_values.push(Box::new(tool.clone()));
618            idx += 1;
619        }
620
621        if let Some(ref search) = filter.search {
622            let like = format!("%{search}%");
623            where_clauses.push(format!(
624                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
625                i1 = idx,
626                i2 = idx + 1,
627                i3 = idx + 2,
628            ));
629            param_values.push(Box::new(like.clone()));
630            param_values.push(Box::new(like.clone()));
631            param_values.push(Box::new(like));
632            idx += 3;
633        }
634
635        let interval = match filter.time_range {
636            LocalTimeRange::Hours24 => Some("-1 day"),
637            LocalTimeRange::Days7 => Some("-7 days"),
638            LocalTimeRange::Days30 => Some("-30 days"),
639            LocalTimeRange::All => None,
640        };
641        if let Some(interval) = interval {
642            where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
643            param_values.push(Box::new(interval.to_string()));
644        }
645
646        (where_clauses.join(" AND "), param_values)
647    }
648
649    pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
650        let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
651        let order_clause = match filter.sort {
652            LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
653            LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
654            LocalSortOrder::Recent => "s.created_at DESC",
655        };
656
657        let mut sql = format!(
658            "SELECT {LOCAL_SESSION_COLUMNS} \
659             {FROM_CLAUSE} WHERE {where_str} \
660             ORDER BY {order_clause}"
661        );
662
663        if let Some(limit) = filter.limit {
664            sql.push_str(" LIMIT ?");
665            param_values.push(Box::new(limit));
666            if let Some(offset) = filter.offset {
667                sql.push_str(" OFFSET ?");
668                param_values.push(Box::new(offset));
669            }
670        }
671
672        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
673            param_values.iter().map(|p| p.as_ref()).collect();
674        let conn = self.conn();
675        let mut stmt = conn.prepare(&sql)?;
676        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
677
678        let mut result = Vec::new();
679        for row in rows {
680            result.push(row?);
681        }
682
683        Ok(hide_opencode_child_sessions(result))
684    }
685
686    /// Count sessions for a given list filter (before UI-level page slicing).
687    pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
688        let mut count_filter = filter.clone();
689        count_filter.limit = None;
690        count_filter.offset = None;
691        let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
692        let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
693        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
694            param_values.iter().map(|p| p.as_ref()).collect();
695        let conn = self.conn();
696        let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
697        Ok(count)
698    }
699
700    /// List distinct tool names for the current list filter (ignores active tool filter).
701    pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
702        let mut tool_filter = filter.clone();
703        tool_filter.tool = None;
704        tool_filter.limit = None;
705        tool_filter.offset = None;
706        let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
707        let sql = format!(
708            "SELECT DISTINCT s.tool \
709             {FROM_CLAUSE} WHERE {where_str} \
710             ORDER BY s.tool ASC"
711        );
712        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
713            param_values.iter().map(|p| p.as_ref()).collect();
714        let conn = self.conn();
715        let mut stmt = conn.prepare(&sql)?;
716        let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
717
718        let mut tools = Vec::new();
719        for row in rows {
720            let tool = row?;
721            if !tool.trim().is_empty() {
722                tools.push(tool);
723            }
724        }
725        Ok(tools)
726    }
727
728    // ── Log query ─────────────────────────────────────────────────────
729
730    /// Query sessions with extended filters for the `log` command.
731    pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
732        let mut where_clauses = vec!["1=1".to_string()];
733        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
734        let mut idx = 1u32;
735
736        if let Some(ref tool) = filter.tool {
737            where_clauses.push(format!("s.tool = ?{idx}"));
738            param_values.push(Box::new(tool.clone()));
739            idx += 1;
740        }
741
742        if let Some(ref model) = filter.model {
743            let like = model.replace('*', "%");
744            where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
745            param_values.push(Box::new(like));
746            idx += 1;
747        }
748
749        if let Some(ref since) = filter.since {
750            where_clauses.push(format!("s.created_at >= ?{idx}"));
751            param_values.push(Box::new(since.clone()));
752            idx += 1;
753        }
754
755        if let Some(ref before) = filter.before {
756            where_clauses.push(format!("s.created_at < ?{idx}"));
757            param_values.push(Box::new(before.clone()));
758            idx += 1;
759        }
760
761        if let Some(ref touches) = filter.touches {
762            let like = format!("%\"{touches}\"%");
763            where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
764            param_values.push(Box::new(like));
765            idx += 1;
766        }
767
768        if let Some(ref grep) = filter.grep {
769            let like = format!("%{grep}%");
770            where_clauses.push(format!(
771                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
772                i1 = idx,
773                i2 = idx + 1,
774                i3 = idx + 2,
775            ));
776            param_values.push(Box::new(like.clone()));
777            param_values.push(Box::new(like.clone()));
778            param_values.push(Box::new(like));
779            idx += 3;
780        }
781
782        if let Some(true) = filter.has_errors {
783            where_clauses.push("s.has_errors = 1".to_string());
784        }
785
786        if let Some(ref wd) = filter.working_directory {
787            where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
788            param_values.push(Box::new(format!("{wd}%")));
789            idx += 1;
790        }
791
792        if let Some(ref repo) = filter.git_repo_name {
793            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
794            param_values.push(Box::new(repo.clone()));
795            idx += 1;
796        }
797
798        // Optional JOIN for commit hash filter
799        let mut extra_join = String::new();
800        if let Some(ref commit) = filter.commit {
801            extra_join =
802                " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
803            where_clauses.push(format!("csl.commit_hash = ?{idx}"));
804            param_values.push(Box::new(commit.clone()));
805            idx += 1;
806        }
807
808        let _ = idx; // suppress unused warning
809
810        let where_str = where_clauses.join(" AND ");
811        let mut sql = format!(
812            "SELECT {LOCAL_SESSION_COLUMNS} \
813             {FROM_CLAUSE}{extra_join} WHERE {where_str} \
814             ORDER BY s.created_at DESC"
815        );
816
817        if let Some(limit) = filter.limit {
818            sql.push_str(" LIMIT ?");
819            param_values.push(Box::new(limit));
820            if let Some(offset) = filter.offset {
821                sql.push_str(" OFFSET ?");
822                param_values.push(Box::new(offset));
823            }
824        }
825
826        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
827            param_values.iter().map(|p| p.as_ref()).collect();
828        let conn = self.conn();
829        let mut stmt = conn.prepare(&sql)?;
830        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
831
832        let mut result = Vec::new();
833        for row in rows {
834            result.push(row?);
835        }
836        Ok(hide_opencode_child_sessions(result))
837    }
838
839    /// Get the latest N sessions for a specific tool, ordered by created_at DESC.
840    pub fn get_sessions_by_tool_latest(
841        &self,
842        tool: &str,
843        count: u32,
844    ) -> Result<Vec<LocalSessionRow>> {
845        let sql = format!(
846            "SELECT {LOCAL_SESSION_COLUMNS} \
847             {FROM_CLAUSE} WHERE s.tool = ?1 \
848             ORDER BY s.created_at DESC"
849        );
850        let conn = self.conn();
851        let mut stmt = conn.prepare(&sql)?;
852        let rows = stmt.query_map(params![tool], row_to_local_session)?;
853        let mut result = Vec::new();
854        for row in rows {
855            result.push(row?);
856        }
857
858        let mut filtered = hide_opencode_child_sessions(result);
859        filtered.truncate(count as usize);
860        Ok(filtered)
861    }
862
863    /// Get the latest N sessions across all tools, ordered by created_at DESC.
864    pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
865        let sql = format!(
866            "SELECT {LOCAL_SESSION_COLUMNS} \
867             {FROM_CLAUSE} \
868             ORDER BY s.created_at DESC"
869        );
870        let conn = self.conn();
871        let mut stmt = conn.prepare(&sql)?;
872        let rows = stmt.query_map([], row_to_local_session)?;
873        let mut result = Vec::new();
874        for row in rows {
875            result.push(row?);
876        }
877
878        let mut filtered = hide_opencode_child_sessions(result);
879        filtered.truncate(count as usize);
880        Ok(filtered)
881    }
882
883    /// Get the Nth most recent session for a specific tool (0 = HEAD, 1 = HEAD~1, etc.).
884    pub fn get_session_by_tool_offset(
885        &self,
886        tool: &str,
887        offset: u32,
888    ) -> Result<Option<LocalSessionRow>> {
889        let sql = format!(
890            "SELECT {LOCAL_SESSION_COLUMNS} \
891             {FROM_CLAUSE} WHERE s.tool = ?1 \
892             ORDER BY s.created_at DESC"
893        );
894        let conn = self.conn();
895        let mut stmt = conn.prepare(&sql)?;
896        let rows = stmt.query_map(params![tool], row_to_local_session)?;
897        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
898        Ok(filtered.into_iter().nth(offset as usize))
899    }
900
901    /// Get the Nth most recent session across all tools (0 = HEAD, 1 = HEAD~1, etc.).
902    pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
903        let sql = format!(
904            "SELECT {LOCAL_SESSION_COLUMNS} \
905             {FROM_CLAUSE} \
906             ORDER BY s.created_at DESC"
907        );
908        let conn = self.conn();
909        let mut stmt = conn.prepare(&sql)?;
910        let rows = stmt.query_map([], row_to_local_session)?;
911        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
912        Ok(filtered.into_iter().nth(offset as usize))
913    }
914
915    /// Fetch the source path used when the session was last parsed/loaded.
916    pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
917        let conn = self.conn();
918        let result = conn
919            .query_row(
920                "SELECT source_path FROM session_sync WHERE session_id = ?1",
921                params![session_id],
922                |row| row.get(0),
923            )
924            .optional()?;
925
926        Ok(result)
927    }
928
929    /// Count total sessions in the local DB.
930    pub fn session_count(&self) -> Result<i64> {
931        let count = self
932            .conn()
933            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
934        Ok(count)
935    }
936
937    // ── Delete session ─────────────────────────────────────────────────
938
939    pub fn delete_session(&self, session_id: &str) -> Result<()> {
940        let conn = self.conn();
941        conn.execute(
942            "DELETE FROM body_cache WHERE session_id = ?1",
943            params![session_id],
944        )?;
945        conn.execute(
946            "DELETE FROM session_sync WHERE session_id = ?1",
947            params![session_id],
948        )?;
949        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
950        Ok(())
951    }
952
953    // ── Sync cursor ────────────────────────────────────────────────────
954
955    pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
956        let cursor = self
957            .conn()
958            .query_row(
959                "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
960                params![team_id],
961                |row| row.get(0),
962            )
963            .optional()?;
964        Ok(cursor)
965    }
966
967    pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
968        self.conn().execute(
969            "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
970             VALUES (?1, ?2, datetime('now')) \
971             ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
972            params![team_id, cursor],
973        )?;
974        Ok(())
975    }
976
977    // ── Upload tracking ────────────────────────────────────────────────
978
979    /// Get sessions that are local_only and need to be uploaded.
980    pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
981        let sql = format!(
982            "SELECT {LOCAL_SESSION_COLUMNS} \
983             FROM sessions s \
984             INNER JOIN session_sync ss ON ss.session_id = s.id \
985             LEFT JOIN users u ON u.id = s.user_id \
986             WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
987             ORDER BY s.created_at ASC"
988        );
989        let conn = self.conn();
990        let mut stmt = conn.prepare(&sql)?;
991        let rows = stmt.query_map(params![team_id], row_to_local_session)?;
992        let mut result = Vec::new();
993        for row in rows {
994            result.push(row?);
995        }
996        Ok(result)
997    }
998
999    pub fn mark_synced(&self, session_id: &str) -> Result<()> {
1000        self.conn().execute(
1001            "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
1002             WHERE session_id = ?1",
1003            params![session_id],
1004        )?;
1005        Ok(())
1006    }
1007
1008    /// Check if a session was already uploaded (synced or remote_only) since the given modification time.
1009    pub fn was_uploaded_after(
1010        &self,
1011        source_path: &str,
1012        modified: &chrono::DateTime<chrono::Utc>,
1013    ) -> Result<bool> {
1014        let result: Option<String> = self
1015            .conn()
1016            .query_row(
1017                "SELECT last_synced_at FROM session_sync \
1018                 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1019                params![source_path],
1020                |row| row.get(0),
1021            )
1022            .optional()?;
1023
1024        if let Some(synced_at) = result {
1025            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1026                return Ok(dt >= *modified);
1027            }
1028        }
1029        Ok(false)
1030    }
1031
1032    // ── Body cache (local read acceleration) ───────────────────────────
1033
1034    pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1035        self.conn().execute(
1036            "INSERT INTO body_cache (session_id, body, cached_at) \
1037             VALUES (?1, ?2, datetime('now')) \
1038             ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1039            params![session_id, body],
1040        )?;
1041        Ok(())
1042    }
1043
1044    pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1045        let body = self
1046            .conn()
1047            .query_row(
1048                "SELECT body FROM body_cache WHERE session_id = ?1",
1049                params![session_id],
1050                |row| row.get(0),
1051            )
1052            .optional()?;
1053        Ok(body)
1054    }
1055
1056    // ── Migration helper ───────────────────────────────────────────────
1057
1058    /// Migrate entries from the old state.json UploadState into the local DB.
1059    /// Marks them as `synced` with no metadata (we only know the file path was uploaded).
1060    pub fn migrate_from_state_json(
1061        &self,
1062        uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1063    ) -> Result<usize> {
1064        let mut count = 0;
1065        for (path, uploaded_at) in uploaded {
1066            let exists: bool = self
1067                .conn()
1068                .query_row(
1069                    "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1070                    params![path],
1071                    |row| row.get(0),
1072                )
1073                .unwrap_or(false);
1074
1075            if exists {
1076                self.conn().execute(
1077                    "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1078                     WHERE source_path = ?2 AND sync_status = 'local_only'",
1079                    params![uploaded_at.to_rfc3339(), path],
1080                )?;
1081                count += 1;
1082            }
1083        }
1084        Ok(count)
1085    }
1086
1087    // ── Commit ↔ session linking ────────────────────────────────────
1088
1089    /// Link a git commit to an AI session.
1090    pub fn link_commit_session(
1091        &self,
1092        commit_hash: &str,
1093        session_id: &str,
1094        repo_path: Option<&str>,
1095        branch: Option<&str>,
1096    ) -> Result<()> {
1097        self.conn().execute(
1098            "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1099             VALUES (?1, ?2, ?3, ?4) \
1100             ON CONFLICT(commit_hash, session_id) DO NOTHING",
1101            params![commit_hash, session_id, repo_path, branch],
1102        )?;
1103        Ok(())
1104    }
1105
1106    /// Get all sessions linked to a git commit.
1107    pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1108        let sql = format!(
1109            "SELECT {LOCAL_SESSION_COLUMNS} \
1110             {FROM_CLAUSE} \
1111             INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1112             WHERE csl.commit_hash = ?1 \
1113             ORDER BY s.created_at DESC"
1114        );
1115        let conn = self.conn();
1116        let mut stmt = conn.prepare(&sql)?;
1117        let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1118        let mut result = Vec::new();
1119        for row in rows {
1120            result.push(row?);
1121        }
1122        Ok(result)
1123    }
1124
1125    /// Get all commits linked to a session.
1126    pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1127        let conn = self.conn();
1128        let mut stmt = conn.prepare(
1129            "SELECT commit_hash, session_id, repo_path, branch, created_at \
1130             FROM commit_session_links WHERE session_id = ?1 \
1131             ORDER BY created_at DESC",
1132        )?;
1133        let rows = stmt.query_map(params![session_id], |row| {
1134            Ok(CommitLink {
1135                commit_hash: row.get(0)?,
1136                session_id: row.get(1)?,
1137                repo_path: row.get(2)?,
1138                branch: row.get(3)?,
1139                created_at: row.get(4)?,
1140            })
1141        })?;
1142        let mut result = Vec::new();
1143        for row in rows {
1144            result.push(row?);
1145        }
1146        Ok(result)
1147    }
1148
1149    /// Find the most recently active session for a given repo path.
1150    /// "Active" means the session's working_directory matches the repo path
1151    /// and was created within the last `since_minutes` minutes.
1152    pub fn find_active_session_for_repo(
1153        &self,
1154        repo_path: &str,
1155        since_minutes: u32,
1156    ) -> Result<Option<LocalSessionRow>> {
1157        let sql = format!(
1158            "SELECT {LOCAL_SESSION_COLUMNS} \
1159             {FROM_CLAUSE} \
1160             WHERE s.working_directory LIKE ?1 \
1161             AND s.created_at >= datetime('now', ?2) \
1162             ORDER BY s.created_at DESC LIMIT 1"
1163        );
1164        let since = format!("-{since_minutes} minutes");
1165        let like = format!("{repo_path}%");
1166        let conn = self.conn();
1167        let mut stmt = conn.prepare(&sql)?;
1168        let row = stmt
1169            .query_map(params![like, since], row_to_local_session)?
1170            .next()
1171            .transpose()?;
1172        Ok(row)
1173    }
1174
1175    /// Get all session IDs currently in the local DB.
1176    pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1177        let conn = self.conn();
1178        let mut stmt = conn
1179            .prepare("SELECT id FROM sessions")
1180            .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1181        let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1182        let mut set = std::collections::HashSet::new();
1183        if let Ok(rows) = rows {
1184            for row in rows.flatten() {
1185                set.insert(row);
1186            }
1187        }
1188        set
1189    }
1190
1191    /// Update only stats fields for an existing session (no git context re-extraction).
1192    pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1193        let title = session.context.title.as_deref();
1194        let description = session.context.description.as_deref();
1195        let (files_modified, files_read, has_errors) =
1196            opensession_core::extract::extract_file_metadata(session);
1197        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1198
1199        self.conn().execute(
1200            "UPDATE sessions SET \
1201             title=?2, description=?3, \
1202             message_count=?4, user_message_count=?5, task_count=?6, \
1203             event_count=?7, duration_seconds=?8, \
1204             total_input_tokens=?9, total_output_tokens=?10, \
1205             files_modified=?11, files_read=?12, has_errors=?13, \
1206             max_active_agents=?14 \
1207             WHERE id=?1",
1208            params![
1209                &session.session_id,
1210                title,
1211                description,
1212                session.stats.message_count as i64,
1213                session.stats.user_message_count as i64,
1214                session.stats.task_count as i64,
1215                session.stats.event_count as i64,
1216                session.stats.duration_seconds as i64,
1217                session.stats.total_input_tokens as i64,
1218                session.stats.total_output_tokens as i64,
1219                &files_modified,
1220                &files_read,
1221                has_errors,
1222                max_active_agents,
1223            ],
1224        )?;
1225        Ok(())
1226    }
1227
1228    /// Update only sync metadata path for an existing session.
1229    pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1230        self.conn().execute(
1231            "INSERT INTO session_sync (session_id, source_path) \
1232             VALUES (?1, ?2) \
1233             ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1234            params![session_id, source_path],
1235        )?;
1236        Ok(())
1237    }
1238
1239    /// Get a list of distinct git repo names present in the DB.
1240    pub fn list_repos(&self) -> Result<Vec<String>> {
1241        let conn = self.conn();
1242        let mut stmt = conn.prepare(
1243            "SELECT DISTINCT git_repo_name FROM sessions \
1244             WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1245        )?;
1246        let rows = stmt.query_map([], |row| row.get(0))?;
1247        let mut result = Vec::new();
1248        for row in rows {
1249            result.push(row?);
1250        }
1251        Ok(result)
1252    }
1253
1254    /// Get a list of distinct, non-empty working directories present in the DB.
1255    pub fn list_working_directories(&self) -> Result<Vec<String>> {
1256        let conn = self.conn();
1257        let mut stmt = conn.prepare(
1258            "SELECT DISTINCT working_directory FROM sessions \
1259             WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1260             ORDER BY working_directory ASC",
1261        )?;
1262        let rows = stmt.query_map([], |row| row.get(0))?;
1263        let mut result = Vec::new();
1264        for row in rows {
1265            result.push(row?);
1266        }
1267        Ok(result)
1268    }
1269}
1270
1271// ── Schema backfill for existing local DB files ───────────────────────
1272
1273fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1274    let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1275    conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1276
1277    // Disable FK constraints for local DB (index/cache, not source of truth)
1278    conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1279
1280    apply_local_migrations(&conn)?;
1281
1282    // Backfill missing columns for existing local DB files where `sessions`
1283    // existed before newer fields were introduced.
1284    ensure_sessions_columns(&conn)?;
1285    repair_session_tools_from_source_path(&conn)?;
1286    validate_local_schema(&conn)?;
1287
1288    Ok(conn)
1289}
1290
1291fn apply_local_migrations(conn: &Connection) -> Result<()> {
1292    conn.execute_batch(
1293        "CREATE TABLE IF NOT EXISTS _migrations (
1294            id INTEGER PRIMARY KEY,
1295            name TEXT NOT NULL UNIQUE,
1296            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1297        );",
1298    )
1299    .context("create _migrations table for local db")?;
1300
1301    for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1302        let already_applied: bool = conn
1303            .query_row(
1304                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1305                [name],
1306                |row| row.get(0),
1307            )
1308            .unwrap_or(false);
1309
1310        if already_applied {
1311            continue;
1312        }
1313
1314        if let Err(e) = conn.execute_batch(sql) {
1315            let msg = e.to_string().to_ascii_lowercase();
1316            if !is_local_migration_compat_error(&msg) {
1317                return Err(e).with_context(|| format!("apply local migration {name}"));
1318            }
1319        }
1320
1321        conn.execute(
1322            "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1323            [name],
1324        )
1325        .with_context(|| format!("record local migration {name}"))?;
1326    }
1327
1328    Ok(())
1329}
1330
1331fn is_local_migration_compat_error(msg: &str) -> bool {
1332    msg.contains("duplicate column name")
1333        || msg.contains("no such column")
1334        || msg.contains("already exists")
1335}
1336
1337fn validate_local_schema(conn: &Connection) -> Result<()> {
1338    let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1339    conn.prepare(&sql)
1340        .map(|_| ())
1341        .context("validate local session schema")
1342}
1343
1344fn repair_session_tools_from_source_path(conn: &Connection) -> Result<()> {
1345    let mut stmt = conn.prepare(
1346        "SELECT s.id, s.tool, ss.source_path \
1347         FROM sessions s \
1348         LEFT JOIN session_sync ss ON ss.session_id = s.id \
1349         WHERE ss.source_path IS NOT NULL",
1350    )?;
1351    let rows = stmt.query_map([], |row| {
1352        Ok((
1353            row.get::<_, String>(0)?,
1354            row.get::<_, String>(1)?,
1355            row.get::<_, Option<String>>(2)?,
1356        ))
1357    })?;
1358
1359    let mut updates: Vec<(String, String)> = Vec::new();
1360    for row in rows {
1361        let (id, current_tool, source_path) = row?;
1362        let normalized = normalize_tool_for_source_path(&current_tool, source_path.as_deref());
1363        if normalized != current_tool {
1364            updates.push((id, normalized));
1365        }
1366    }
1367    drop(stmt);
1368
1369    for (id, tool) in updates {
1370        conn.execute(
1371            "UPDATE sessions SET tool = ?1 WHERE id = ?2",
1372            params![tool, id],
1373        )?;
1374    }
1375
1376    Ok(())
1377}
1378
1379fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1380    let msg = format!("{err:#}").to_ascii_lowercase();
1381    msg.contains("no such column")
1382        || msg.contains("no such table")
1383        || msg.contains("cannot add a column")
1384        || msg.contains("already exists")
1385        || msg.contains("views may not be indexed")
1386        || msg.contains("malformed database schema")
1387        || msg.contains("duplicate column name")
1388}
1389
1390fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1391    if !path.exists() {
1392        return Ok(());
1393    }
1394
1395    let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1396    let backup_name = format!(
1397        "{}.legacy-{}.bak",
1398        path.file_name()
1399            .and_then(|n| n.to_str())
1400            .unwrap_or("local.db"),
1401        ts
1402    );
1403    let backup_path = path.with_file_name(backup_name);
1404    std::fs::rename(path, &backup_path).with_context(|| {
1405        format!(
1406            "rotate local db backup {} -> {}",
1407            path.display(),
1408            backup_path.display()
1409        )
1410    })?;
1411
1412    let wal = PathBuf::from(format!("{}-wal", path.display()));
1413    let shm = PathBuf::from(format!("{}-shm", path.display()));
1414    let _ = std::fs::remove_file(wal);
1415    let _ = std::fs::remove_file(shm);
1416    Ok(())
1417}
1418
1419const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1420    ("user_id", "TEXT"),
1421    ("team_id", "TEXT DEFAULT 'personal'"),
1422    ("tool", "TEXT DEFAULT ''"),
1423    ("agent_provider", "TEXT"),
1424    ("agent_model", "TEXT"),
1425    ("title", "TEXT"),
1426    ("description", "TEXT"),
1427    ("tags", "TEXT"),
1428    ("created_at", "TEXT DEFAULT ''"),
1429    ("uploaded_at", "TEXT DEFAULT ''"),
1430    ("message_count", "INTEGER DEFAULT 0"),
1431    ("user_message_count", "INTEGER DEFAULT 0"),
1432    ("task_count", "INTEGER DEFAULT 0"),
1433    ("event_count", "INTEGER DEFAULT 0"),
1434    ("duration_seconds", "INTEGER DEFAULT 0"),
1435    ("total_input_tokens", "INTEGER DEFAULT 0"),
1436    ("total_output_tokens", "INTEGER DEFAULT 0"),
1437    // Migration-only compatibility column from pre git-native body storage.
1438    ("body_storage_key", "TEXT DEFAULT ''"),
1439    ("body_url", "TEXT"),
1440    ("git_remote", "TEXT"),
1441    ("git_branch", "TEXT"),
1442    ("git_commit", "TEXT"),
1443    ("git_repo_name", "TEXT"),
1444    ("pr_number", "INTEGER"),
1445    ("pr_url", "TEXT"),
1446    ("working_directory", "TEXT"),
1447    ("files_modified", "TEXT"),
1448    ("files_read", "TEXT"),
1449    ("has_errors", "BOOLEAN DEFAULT 0"),
1450    ("max_active_agents", "INTEGER DEFAULT 1"),
1451];
1452
1453fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1454    let mut existing = HashSet::new();
1455    let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1456    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1457    for row in rows {
1458        existing.insert(row?);
1459    }
1460
1461    for (name, decl) in REQUIRED_SESSION_COLUMNS {
1462        if existing.contains(*name) {
1463            continue;
1464        }
1465        let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1466        conn.execute_batch(&sql)
1467            .with_context(|| format!("add sessions column '{name}'"))?;
1468    }
1469
1470    Ok(())
1471}
1472
1473/// Column list for SELECT queries against sessions + session_sync + users.
1474pub const LOCAL_SESSION_COLUMNS: &str = "\
1475s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1476s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1477s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1478s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1479s.total_input_tokens, s.total_output_tokens, \
1480s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1481s.pr_number, s.pr_url, s.working_directory, \
1482s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1483
1484fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1485    let source_path: Option<String> = row.get(1)?;
1486    let tool: String = row.get(7)?;
1487    let normalized_tool = normalize_tool_for_source_path(&tool, source_path.as_deref());
1488
1489    Ok(LocalSessionRow {
1490        id: row.get(0)?,
1491        source_path,
1492        sync_status: row.get(2)?,
1493        last_synced_at: row.get(3)?,
1494        user_id: row.get(4)?,
1495        nickname: row.get(5)?,
1496        team_id: row.get(6)?,
1497        tool: normalized_tool,
1498        agent_provider: row.get(8)?,
1499        agent_model: row.get(9)?,
1500        title: row.get(10)?,
1501        description: row.get(11)?,
1502        tags: row.get(12)?,
1503        created_at: row.get(13)?,
1504        uploaded_at: row.get(14)?,
1505        message_count: row.get(15)?,
1506        user_message_count: row.get(16)?,
1507        task_count: row.get(17)?,
1508        event_count: row.get(18)?,
1509        duration_seconds: row.get(19)?,
1510        total_input_tokens: row.get(20)?,
1511        total_output_tokens: row.get(21)?,
1512        git_remote: row.get(22)?,
1513        git_branch: row.get(23)?,
1514        git_commit: row.get(24)?,
1515        git_repo_name: row.get(25)?,
1516        pr_number: row.get(26)?,
1517        pr_url: row.get(27)?,
1518        working_directory: row.get(28)?,
1519        files_modified: row.get(29)?,
1520        files_read: row.get(30)?,
1521        has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1522        max_active_agents: row.get(32).unwrap_or(1),
1523    })
1524}
1525
1526fn default_db_path() -> Result<PathBuf> {
1527    let home = std::env::var("HOME")
1528        .or_else(|_| std::env::var("USERPROFILE"))
1529        .context("Could not determine home directory")?;
1530    Ok(PathBuf::from(home)
1531        .join(".local")
1532        .join("share")
1533        .join("opensession")
1534        .join("local.db"))
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539    use super::*;
1540
1541    use std::collections::BTreeSet;
1542    use std::fs::{create_dir_all, write};
1543    use tempfile::tempdir;
1544
1545    fn test_db() -> LocalDb {
1546        let dir = tempdir().unwrap();
1547        let path = dir.keep().join("test.db");
1548        LocalDb::open_path(&path).unwrap()
1549    }
1550
1551    fn temp_root() -> tempfile::TempDir {
1552        tempdir().unwrap()
1553    }
1554
1555    fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1556        LocalSessionRow {
1557            id: id.to_string(),
1558            source_path: source_path.map(String::from),
1559            sync_status: "local_only".to_string(),
1560            last_synced_at: None,
1561            user_id: None,
1562            nickname: None,
1563            team_id: None,
1564            tool: tool.to_string(),
1565            agent_provider: None,
1566            agent_model: None,
1567            title: Some("test".to_string()),
1568            description: None,
1569            tags: None,
1570            created_at: "2024-01-01T00:00:00Z".to_string(),
1571            uploaded_at: None,
1572            message_count: 0,
1573            user_message_count: 0,
1574            task_count: 0,
1575            event_count: 0,
1576            duration_seconds: 0,
1577            total_input_tokens: 0,
1578            total_output_tokens: 0,
1579            git_remote: None,
1580            git_branch: None,
1581            git_commit: None,
1582            git_repo_name: None,
1583            pr_number: None,
1584            pr_url: None,
1585            working_directory: None,
1586            files_modified: None,
1587            files_read: None,
1588            has_errors: false,
1589            max_active_agents: 1,
1590        }
1591    }
1592
1593    #[test]
1594    fn test_open_and_schema() {
1595        let _db = test_db();
1596    }
1597
1598    #[test]
1599    fn test_open_repairs_codex_tool_hint_from_source_path() {
1600        let dir = tempfile::tempdir().unwrap();
1601        let path = dir.path().join("repair.db");
1602
1603        {
1604            let _ = LocalDb::open_path(&path).unwrap();
1605        }
1606
1607        {
1608            let conn = Connection::open(&path).unwrap();
1609            conn.execute(
1610                "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key) VALUES (?1, 'personal', 'claude-code', ?2, '')",
1611                params!["rollout-repair", "2026-02-20T00:00:00Z"],
1612            )
1613            .unwrap();
1614            conn.execute(
1615                "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1616                params!["rollout-repair", "/Users/test/.codex/sessions/2026/02/20/rollout-repair.jsonl"],
1617            )
1618            .unwrap();
1619        }
1620
1621        let db = LocalDb::open_path(&path).unwrap();
1622        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1623        let row = rows
1624            .iter()
1625            .find(|row| row.id == "rollout-repair")
1626            .expect("repaired row");
1627        assert_eq!(row.tool, "codex");
1628    }
1629
1630    #[test]
1631    fn test_upsert_local_session_normalizes_tool_from_source_path() {
1632        let db = test_db();
1633        let mut session = Session::new(
1634            "rollout-upsert".to_string(),
1635            opensession_core::trace::Agent {
1636                provider: "openai".to_string(),
1637                model: "gpt-5".to_string(),
1638                tool: "claude-code".to_string(),
1639                tool_version: None,
1640            },
1641        );
1642        session.stats.event_count = 1;
1643
1644        db.upsert_local_session(
1645            &session,
1646            "/Users/test/.codex/sessions/2026/02/20/rollout-upsert.jsonl",
1647            &crate::git::GitContext::default(),
1648        )
1649        .unwrap();
1650
1651        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1652        let row = rows
1653            .iter()
1654            .find(|row| row.id == "rollout-upsert")
1655            .expect("upserted row");
1656        assert_eq!(row.tool, "codex");
1657    }
1658
1659    #[test]
1660    fn test_open_backfills_legacy_sessions_columns() {
1661        let dir = tempfile::tempdir().unwrap();
1662        let path = dir.path().join("legacy.db");
1663        {
1664            let conn = Connection::open(&path).unwrap();
1665            conn.execute_batch(
1666                "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1667                 INSERT INTO sessions (id) VALUES ('legacy-1');",
1668            )
1669            .unwrap();
1670        }
1671
1672        let db = LocalDb::open_path(&path).unwrap();
1673        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1674        assert_eq!(rows.len(), 1);
1675        assert_eq!(rows[0].id, "legacy-1");
1676        assert_eq!(rows[0].user_message_count, 0);
1677    }
1678
1679    #[test]
1680    fn test_open_rotates_incompatible_legacy_schema() {
1681        let dir = tempfile::tempdir().unwrap();
1682        let path = dir.path().join("broken.db");
1683        {
1684            let conn = Connection::open(&path).unwrap();
1685            conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1686                .unwrap();
1687        }
1688
1689        let db = LocalDb::open_path(&path).unwrap();
1690        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1691        assert!(rows.is_empty());
1692
1693        let rotated = std::fs::read_dir(dir.path())
1694            .unwrap()
1695            .filter_map(Result::ok)
1696            .any(|entry| {
1697                let name = entry.file_name();
1698                let name = name.to_string_lossy();
1699                name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1700            });
1701        assert!(rotated, "expected rotated legacy backup file");
1702    }
1703
1704    #[test]
1705    fn test_is_opencode_child_session() {
1706        let root = temp_root();
1707        let dir = root.path().join("sessions");
1708        create_dir_all(&dir).unwrap();
1709        let parent_session = dir.join("parent.json");
1710        write(
1711            &parent_session,
1712            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1713        )
1714        .unwrap();
1715        let child_session = dir.join("child.json");
1716        write(
1717            &child_session,
1718            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1719        )
1720        .unwrap();
1721
1722        let parent = make_row(
1723            "ses_parent",
1724            "opencode",
1725            Some(parent_session.to_str().unwrap()),
1726        );
1727        let child = make_row(
1728            "ses_child",
1729            "opencode",
1730            Some(child_session.to_str().unwrap()),
1731        );
1732        let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1733
1734        assert!(!is_opencode_child_session(&parent));
1735        assert!(is_opencode_child_session(&child));
1736        assert!(!is_opencode_child_session(&codex));
1737    }
1738
1739    #[test]
1740    fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1741        let child = LocalSessionRow {
1742            id: "sess_child".to_string(),
1743            source_path: None,
1744            sync_status: "local_only".to_string(),
1745            last_synced_at: None,
1746            user_id: None,
1747            nickname: None,
1748            team_id: None,
1749            tool: "opencode".to_string(),
1750            agent_provider: None,
1751            agent_model: None,
1752            title: None,
1753            description: None,
1754            tags: None,
1755            created_at: "2024-01-01T00:00:00Z".to_string(),
1756            uploaded_at: None,
1757            message_count: 1,
1758            user_message_count: 0,
1759            task_count: 4,
1760            event_count: 4,
1761            duration_seconds: 0,
1762            total_input_tokens: 0,
1763            total_output_tokens: 0,
1764            git_remote: None,
1765            git_branch: None,
1766            git_commit: None,
1767            git_repo_name: None,
1768            pr_number: None,
1769            pr_url: None,
1770            working_directory: None,
1771            files_modified: None,
1772            files_read: None,
1773            has_errors: false,
1774            max_active_agents: 1,
1775        };
1776
1777        let parent = LocalSessionRow {
1778            id: "sess_parent".to_string(),
1779            source_path: None,
1780            sync_status: "local_only".to_string(),
1781            last_synced_at: None,
1782            user_id: None,
1783            nickname: None,
1784            team_id: None,
1785            tool: "opencode".to_string(),
1786            agent_provider: None,
1787            agent_model: None,
1788            title: Some("regular".to_string()),
1789            description: None,
1790            tags: None,
1791            created_at: "2024-01-01T00:00:00Z".to_string(),
1792            uploaded_at: None,
1793            message_count: 1,
1794            user_message_count: 1,
1795            task_count: 2,
1796            event_count: 20,
1797            duration_seconds: 0,
1798            total_input_tokens: 0,
1799            total_output_tokens: 0,
1800            git_remote: None,
1801            git_branch: None,
1802            git_commit: None,
1803            git_repo_name: None,
1804            pr_number: None,
1805            pr_url: None,
1806            working_directory: None,
1807            files_modified: None,
1808            files_read: None,
1809            has_errors: false,
1810            max_active_agents: 1,
1811        };
1812
1813        assert!(is_opencode_child_session(&child));
1814        assert!(!is_opencode_child_session(&parent));
1815    }
1816
1817    #[test]
1818    fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1819        let child = LocalSessionRow {
1820            id: "sess_child_2".to_string(),
1821            source_path: None,
1822            sync_status: "local_only".to_string(),
1823            last_synced_at: None,
1824            user_id: None,
1825            nickname: None,
1826            team_id: None,
1827            tool: "opencode".to_string(),
1828            agent_provider: None,
1829            agent_model: None,
1830            title: None,
1831            description: None,
1832            tags: None,
1833            created_at: "2024-01-01T00:00:00Z".to_string(),
1834            uploaded_at: None,
1835            message_count: 2,
1836            user_message_count: 0,
1837            task_count: 4,
1838            event_count: 4,
1839            duration_seconds: 0,
1840            total_input_tokens: 0,
1841            total_output_tokens: 0,
1842            git_remote: None,
1843            git_branch: None,
1844            git_commit: None,
1845            git_repo_name: None,
1846            pr_number: None,
1847            pr_url: None,
1848            working_directory: None,
1849            files_modified: None,
1850            files_read: None,
1851            has_errors: false,
1852            max_active_agents: 1,
1853        };
1854
1855        let parent = LocalSessionRow {
1856            id: "sess_parent".to_string(),
1857            source_path: None,
1858            sync_status: "local_only".to_string(),
1859            last_synced_at: None,
1860            user_id: None,
1861            nickname: None,
1862            team_id: None,
1863            tool: "opencode".to_string(),
1864            agent_provider: None,
1865            agent_model: None,
1866            title: Some("regular".to_string()),
1867            description: None,
1868            tags: None,
1869            created_at: "2024-01-01T00:00:00Z".to_string(),
1870            uploaded_at: None,
1871            message_count: 2,
1872            user_message_count: 1,
1873            task_count: 5,
1874            event_count: 20,
1875            duration_seconds: 0,
1876            total_input_tokens: 0,
1877            total_output_tokens: 0,
1878            git_remote: None,
1879            git_branch: None,
1880            git_commit: None,
1881            git_repo_name: None,
1882            pr_number: None,
1883            pr_url: None,
1884            working_directory: None,
1885            files_modified: None,
1886            files_read: None,
1887            has_errors: false,
1888            max_active_agents: 1,
1889        };
1890
1891        assert!(is_opencode_child_session(&child));
1892        assert!(!is_opencode_child_session(&parent));
1893    }
1894
1895    #[test]
1896    fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1897        let child = LocalSessionRow {
1898            id: "sess_child_3".to_string(),
1899            source_path: None,
1900            sync_status: "local_only".to_string(),
1901            last_synced_at: None,
1902            user_id: None,
1903            nickname: None,
1904            team_id: None,
1905            tool: "opencode".to_string(),
1906            agent_provider: None,
1907            agent_model: None,
1908            title: None,
1909            description: None,
1910            tags: None,
1911            created_at: "2024-01-01T00:00:00Z".to_string(),
1912            uploaded_at: None,
1913            message_count: 3,
1914            user_message_count: 0,
1915            task_count: 2,
1916            event_count: 6,
1917            duration_seconds: 0,
1918            total_input_tokens: 0,
1919            total_output_tokens: 0,
1920            git_remote: None,
1921            git_branch: None,
1922            git_commit: None,
1923            git_repo_name: None,
1924            pr_number: None,
1925            pr_url: None,
1926            working_directory: None,
1927            files_modified: None,
1928            files_read: None,
1929            has_errors: false,
1930            max_active_agents: 1,
1931        };
1932
1933        assert!(is_opencode_child_session(&child));
1934    }
1935
1936    #[test]
1937    fn test_parse_opencode_parent_session_id_aliases() {
1938        let root = temp_root();
1939        let dir = root.path().join("session-aliases");
1940        create_dir_all(&dir).unwrap();
1941        let child_session = dir.join("child.json");
1942        write(
1943            &child_session,
1944            r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1945        )
1946        .unwrap();
1947        assert_eq!(
1948            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1949            Some("ses_parent")
1950        );
1951    }
1952
1953    #[test]
1954    fn test_parse_opencode_parent_session_id_nested_metadata() {
1955        let root = temp_root();
1956        let dir = root.path().join("session-nested");
1957        create_dir_all(&dir).unwrap();
1958        let child_session = dir.join("child.json");
1959        write(
1960            &child_session,
1961            r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1962        )
1963        .unwrap();
1964        assert_eq!(
1965            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1966            Some("ses_parent")
1967        );
1968    }
1969
1970    #[test]
1971    fn test_is_claude_subagent_session() {
1972        let row = make_row(
1973            "ses_parent",
1974            "claude-code",
1975            Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1976        );
1977        assert!(!is_opencode_child_session(&row));
1978        assert!(is_claude_subagent_session(&row));
1979        assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1980    }
1981
1982    #[test]
1983    fn test_hide_opencode_child_sessions() {
1984        let root = temp_root();
1985        let dir = root.path().join("sessions");
1986        create_dir_all(&dir).unwrap();
1987        let parent_session = dir.join("parent.json");
1988        let child_session = dir.join("child.json");
1989        let orphan_session = dir.join("orphan.json");
1990
1991        write(
1992            &parent_session,
1993            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1994        )
1995        .unwrap();
1996        write(
1997            &child_session,
1998            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1999        )
2000        .unwrap();
2001        write(
2002            &orphan_session,
2003            r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
2004        )
2005        .unwrap();
2006
2007        let rows = vec![
2008            make_row(
2009                "ses_child",
2010                "opencode",
2011                Some(child_session.to_str().unwrap()),
2012            ),
2013            make_row(
2014                "ses_parent",
2015                "opencode",
2016                Some(parent_session.to_str().unwrap()),
2017            ),
2018            {
2019                let mut row = make_row("ses_other", "codex", None);
2020                row.user_message_count = 1;
2021                row
2022            },
2023            make_row(
2024                "ses_orphan",
2025                "opencode",
2026                Some(orphan_session.to_str().unwrap()),
2027            ),
2028        ];
2029
2030        let filtered = hide_opencode_child_sessions(rows);
2031        assert_eq!(filtered.len(), 3);
2032        assert!(filtered.iter().all(|r| r.id != "ses_child"));
2033    }
2034
2035    #[test]
2036    fn test_sync_cursor() {
2037        let db = test_db();
2038        assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
2039        db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
2040        assert_eq!(
2041            db.get_sync_cursor("team1").unwrap(),
2042            Some("2024-01-01T00:00:00Z".to_string())
2043        );
2044        // Update
2045        db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
2046        assert_eq!(
2047            db.get_sync_cursor("team1").unwrap(),
2048            Some("2024-06-01T00:00:00Z".to_string())
2049        );
2050    }
2051
2052    #[test]
2053    fn test_body_cache() {
2054        let db = test_db();
2055        assert_eq!(db.get_cached_body("s1").unwrap(), None);
2056        db.cache_body("s1", b"hello world").unwrap();
2057        assert_eq!(
2058            db.get_cached_body("s1").unwrap(),
2059            Some(b"hello world".to_vec())
2060        );
2061    }
2062
2063    #[test]
2064    fn test_local_migration_files_match_api_local_migrations() {
2065        fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2066            std::fs::read_dir(dir)
2067                .expect("read migrations directory")
2068                .filter_map(Result::ok)
2069                .map(|entry| entry.file_name().to_string_lossy().to_string())
2070                .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2071                .collect()
2072        }
2073
2074        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2075        let local_files = collect_local_sql(manifest_dir.join("migrations"));
2076        let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2077
2078        assert_eq!(
2079            local_files, api_files,
2080            "local-db local migrations must stay in parity with api local migrations"
2081        );
2082    }
2083
2084    #[test]
2085    fn test_upsert_remote_session() {
2086        let db = test_db();
2087        let summary = RemoteSessionSummary {
2088            id: "remote-1".to_string(),
2089            user_id: Some("u1".to_string()),
2090            nickname: Some("alice".to_string()),
2091            team_id: "t1".to_string(),
2092            tool: "claude-code".to_string(),
2093            agent_provider: None,
2094            agent_model: None,
2095            title: Some("Test session".to_string()),
2096            description: None,
2097            tags: None,
2098            created_at: "2024-01-01T00:00:00Z".to_string(),
2099            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2100            message_count: 10,
2101            task_count: 2,
2102            event_count: 20,
2103            duration_seconds: 300,
2104            total_input_tokens: 1000,
2105            total_output_tokens: 500,
2106            git_remote: None,
2107            git_branch: None,
2108            git_commit: None,
2109            git_repo_name: None,
2110            pr_number: None,
2111            pr_url: None,
2112            working_directory: None,
2113            files_modified: None,
2114            files_read: None,
2115            has_errors: false,
2116            max_active_agents: 1,
2117        };
2118        db.upsert_remote_session(&summary).unwrap();
2119
2120        let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2121        assert_eq!(sessions.len(), 1);
2122        assert_eq!(sessions[0].id, "remote-1");
2123        assert_eq!(sessions[0].sync_status, "remote_only");
2124        assert_eq!(sessions[0].nickname, None); // no user in local users table
2125    }
2126
2127    #[test]
2128    fn test_list_filter_by_repo() {
2129        let db = test_db();
2130        // Insert a remote session with team_id
2131        let summary1 = RemoteSessionSummary {
2132            id: "s1".to_string(),
2133            user_id: None,
2134            nickname: None,
2135            team_id: "t1".to_string(),
2136            tool: "claude-code".to_string(),
2137            agent_provider: None,
2138            agent_model: None,
2139            title: Some("Session 1".to_string()),
2140            description: None,
2141            tags: None,
2142            created_at: "2024-01-01T00:00:00Z".to_string(),
2143            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2144            message_count: 5,
2145            task_count: 0,
2146            event_count: 10,
2147            duration_seconds: 60,
2148            total_input_tokens: 100,
2149            total_output_tokens: 50,
2150            git_remote: None,
2151            git_branch: None,
2152            git_commit: None,
2153            git_repo_name: None,
2154            pr_number: None,
2155            pr_url: None,
2156            working_directory: None,
2157            files_modified: None,
2158            files_read: None,
2159            has_errors: false,
2160            max_active_agents: 1,
2161        };
2162        db.upsert_remote_session(&summary1).unwrap();
2163
2164        // Filter by team
2165        let filter = LocalSessionFilter {
2166            team_id: Some("t1".to_string()),
2167            ..Default::default()
2168        };
2169        assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2170
2171        let filter = LocalSessionFilter {
2172            team_id: Some("t999".to_string()),
2173            ..Default::default()
2174        };
2175        assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2176    }
2177
2178    // ── Helpers for inserting test sessions ────────────────────────────
2179
2180    fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2181        RemoteSessionSummary {
2182            id: id.to_string(),
2183            user_id: None,
2184            nickname: None,
2185            team_id: "t1".to_string(),
2186            tool: tool.to_string(),
2187            agent_provider: Some("anthropic".to_string()),
2188            agent_model: Some("claude-opus-4-6".to_string()),
2189            title: Some(title.to_string()),
2190            description: None,
2191            tags: None,
2192            created_at: created_at.to_string(),
2193            uploaded_at: created_at.to_string(),
2194            message_count: 5,
2195            task_count: 1,
2196            event_count: 10,
2197            duration_seconds: 300,
2198            total_input_tokens: 1000,
2199            total_output_tokens: 500,
2200            git_remote: None,
2201            git_branch: None,
2202            git_commit: None,
2203            git_repo_name: None,
2204            pr_number: None,
2205            pr_url: None,
2206            working_directory: None,
2207            files_modified: None,
2208            files_read: None,
2209            has_errors: false,
2210            max_active_agents: 1,
2211        }
2212    }
2213
2214    fn seed_sessions(db: &LocalDb) {
2215        // Insert 5 sessions across two tools, ordered by created_at
2216        db.upsert_remote_session(&make_summary(
2217            "s1",
2218            "claude-code",
2219            "First session",
2220            "2024-01-01T00:00:00Z",
2221        ))
2222        .unwrap();
2223        db.upsert_remote_session(&make_summary(
2224            "s2",
2225            "claude-code",
2226            "JWT auth work",
2227            "2024-01-02T00:00:00Z",
2228        ))
2229        .unwrap();
2230        db.upsert_remote_session(&make_summary(
2231            "s3",
2232            "gemini",
2233            "Gemini test",
2234            "2024-01-03T00:00:00Z",
2235        ))
2236        .unwrap();
2237        db.upsert_remote_session(&make_summary(
2238            "s4",
2239            "claude-code",
2240            "Error handling",
2241            "2024-01-04T00:00:00Z",
2242        ))
2243        .unwrap();
2244        db.upsert_remote_session(&make_summary(
2245            "s5",
2246            "claude-code",
2247            "Final polish",
2248            "2024-01-05T00:00:00Z",
2249        ))
2250        .unwrap();
2251    }
2252
2253    // ── list_sessions_log tests ────────────────────────────────────────
2254
2255    #[test]
2256    fn test_log_no_filters() {
2257        let db = test_db();
2258        seed_sessions(&db);
2259        let filter = LogFilter::default();
2260        let results = db.list_sessions_log(&filter).unwrap();
2261        assert_eq!(results.len(), 5);
2262        // Should be ordered by created_at DESC
2263        assert_eq!(results[0].id, "s5");
2264        assert_eq!(results[4].id, "s1");
2265    }
2266
2267    #[test]
2268    fn test_log_filter_by_tool() {
2269        let db = test_db();
2270        seed_sessions(&db);
2271        let filter = LogFilter {
2272            tool: Some("claude-code".to_string()),
2273            ..Default::default()
2274        };
2275        let results = db.list_sessions_log(&filter).unwrap();
2276        assert_eq!(results.len(), 4);
2277        assert!(results.iter().all(|s| s.tool == "claude-code"));
2278    }
2279
2280    #[test]
2281    fn test_log_filter_by_model_wildcard() {
2282        let db = test_db();
2283        seed_sessions(&db);
2284        let filter = LogFilter {
2285            model: Some("claude*".to_string()),
2286            ..Default::default()
2287        };
2288        let results = db.list_sessions_log(&filter).unwrap();
2289        assert_eq!(results.len(), 5); // all have claude-opus model
2290    }
2291
2292    #[test]
2293    fn test_log_filter_since() {
2294        let db = test_db();
2295        seed_sessions(&db);
2296        let filter = LogFilter {
2297            since: Some("2024-01-03T00:00:00Z".to_string()),
2298            ..Default::default()
2299        };
2300        let results = db.list_sessions_log(&filter).unwrap();
2301        assert_eq!(results.len(), 3); // s3, s4, s5
2302    }
2303
2304    #[test]
2305    fn test_log_filter_before() {
2306        let db = test_db();
2307        seed_sessions(&db);
2308        let filter = LogFilter {
2309            before: Some("2024-01-03T00:00:00Z".to_string()),
2310            ..Default::default()
2311        };
2312        let results = db.list_sessions_log(&filter).unwrap();
2313        assert_eq!(results.len(), 2); // s1, s2
2314    }
2315
2316    #[test]
2317    fn test_log_filter_since_and_before() {
2318        let db = test_db();
2319        seed_sessions(&db);
2320        let filter = LogFilter {
2321            since: Some("2024-01-02T00:00:00Z".to_string()),
2322            before: Some("2024-01-04T00:00:00Z".to_string()),
2323            ..Default::default()
2324        };
2325        let results = db.list_sessions_log(&filter).unwrap();
2326        assert_eq!(results.len(), 2); // s2, s3
2327    }
2328
2329    #[test]
2330    fn test_log_filter_grep() {
2331        let db = test_db();
2332        seed_sessions(&db);
2333        let filter = LogFilter {
2334            grep: Some("JWT".to_string()),
2335            ..Default::default()
2336        };
2337        let results = db.list_sessions_log(&filter).unwrap();
2338        assert_eq!(results.len(), 1);
2339        assert_eq!(results[0].id, "s2");
2340    }
2341
2342    #[test]
2343    fn test_log_limit_and_offset() {
2344        let db = test_db();
2345        seed_sessions(&db);
2346        let filter = LogFilter {
2347            limit: Some(2),
2348            offset: Some(1),
2349            ..Default::default()
2350        };
2351        let results = db.list_sessions_log(&filter).unwrap();
2352        assert_eq!(results.len(), 2);
2353        assert_eq!(results[0].id, "s4"); // second most recent
2354        assert_eq!(results[1].id, "s3");
2355    }
2356
2357    #[test]
2358    fn test_log_limit_only() {
2359        let db = test_db();
2360        seed_sessions(&db);
2361        let filter = LogFilter {
2362            limit: Some(3),
2363            ..Default::default()
2364        };
2365        let results = db.list_sessions_log(&filter).unwrap();
2366        assert_eq!(results.len(), 3);
2367    }
2368
2369    #[test]
2370    fn test_list_sessions_limit_offset() {
2371        let db = test_db();
2372        seed_sessions(&db);
2373        let filter = LocalSessionFilter {
2374            limit: Some(2),
2375            offset: Some(1),
2376            ..Default::default()
2377        };
2378        let results = db.list_sessions(&filter).unwrap();
2379        assert_eq!(results.len(), 2);
2380        assert_eq!(results[0].id, "s4");
2381        assert_eq!(results[1].id, "s3");
2382    }
2383
2384    #[test]
2385    fn test_count_sessions_filtered() {
2386        let db = test_db();
2387        seed_sessions(&db);
2388        let count = db
2389            .count_sessions_filtered(&LocalSessionFilter::default())
2390            .unwrap();
2391        assert_eq!(count, 5);
2392    }
2393
2394    #[test]
2395    fn test_list_working_directories_distinct_non_empty() {
2396        let db = test_db();
2397
2398        let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2399        a.working_directory = Some("/tmp/repo-a".to_string());
2400        let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2401        b.working_directory = Some("/tmp/repo-a".to_string());
2402        let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2403        c.working_directory = Some("/tmp/repo-b".to_string());
2404        let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2405        d.working_directory = Some("".to_string());
2406
2407        db.upsert_remote_session(&a).unwrap();
2408        db.upsert_remote_session(&b).unwrap();
2409        db.upsert_remote_session(&c).unwrap();
2410        db.upsert_remote_session(&d).unwrap();
2411
2412        let dirs = db.list_working_directories().unwrap();
2413        assert_eq!(
2414            dirs,
2415            vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2416        );
2417    }
2418
2419    #[test]
2420    fn test_list_session_tools() {
2421        let db = test_db();
2422        seed_sessions(&db);
2423        let tools = db
2424            .list_session_tools(&LocalSessionFilter::default())
2425            .unwrap();
2426        assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2427    }
2428
2429    #[test]
2430    fn test_log_combined_filters() {
2431        let db = test_db();
2432        seed_sessions(&db);
2433        let filter = LogFilter {
2434            tool: Some("claude-code".to_string()),
2435            since: Some("2024-01-03T00:00:00Z".to_string()),
2436            limit: Some(1),
2437            ..Default::default()
2438        };
2439        let results = db.list_sessions_log(&filter).unwrap();
2440        assert_eq!(results.len(), 1);
2441        assert_eq!(results[0].id, "s5"); // most recent claude-code after Jan 3
2442    }
2443
2444    // ── Session offset/latest tests ────────────────────────────────────
2445
2446    #[test]
2447    fn test_get_session_by_offset() {
2448        let db = test_db();
2449        seed_sessions(&db);
2450        let row = db.get_session_by_offset(0).unwrap().unwrap();
2451        assert_eq!(row.id, "s5"); // most recent
2452        let row = db.get_session_by_offset(2).unwrap().unwrap();
2453        assert_eq!(row.id, "s3");
2454        assert!(db.get_session_by_offset(10).unwrap().is_none());
2455    }
2456
2457    #[test]
2458    fn test_get_session_by_tool_offset() {
2459        let db = test_db();
2460        seed_sessions(&db);
2461        let row = db
2462            .get_session_by_tool_offset("claude-code", 0)
2463            .unwrap()
2464            .unwrap();
2465        assert_eq!(row.id, "s5");
2466        let row = db
2467            .get_session_by_tool_offset("claude-code", 1)
2468            .unwrap()
2469            .unwrap();
2470        assert_eq!(row.id, "s4");
2471        let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2472        assert_eq!(row.id, "s3");
2473        assert!(db
2474            .get_session_by_tool_offset("gemini", 1)
2475            .unwrap()
2476            .is_none());
2477    }
2478
2479    #[test]
2480    fn test_get_sessions_latest() {
2481        let db = test_db();
2482        seed_sessions(&db);
2483        let rows = db.get_sessions_latest(3).unwrap();
2484        assert_eq!(rows.len(), 3);
2485        assert_eq!(rows[0].id, "s5");
2486        assert_eq!(rows[1].id, "s4");
2487        assert_eq!(rows[2].id, "s3");
2488    }
2489
2490    #[test]
2491    fn test_get_sessions_by_tool_latest() {
2492        let db = test_db();
2493        seed_sessions(&db);
2494        let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2495        assert_eq!(rows.len(), 2);
2496        assert_eq!(rows[0].id, "s5");
2497        assert_eq!(rows[1].id, "s4");
2498    }
2499
2500    #[test]
2501    fn test_get_sessions_latest_more_than_available() {
2502        let db = test_db();
2503        seed_sessions(&db);
2504        let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2505        assert_eq!(rows.len(), 1); // only 1 gemini session
2506    }
2507
2508    #[test]
2509    fn test_session_count() {
2510        let db = test_db();
2511        assert_eq!(db.session_count().unwrap(), 0);
2512        seed_sessions(&db);
2513        assert_eq!(db.session_count().unwrap(), 5);
2514    }
2515
2516    // ── Commit link tests ─────────────────────────────────────────────
2517
2518    #[test]
2519    fn test_link_commit_session() {
2520        let db = test_db();
2521        seed_sessions(&db);
2522        db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2523            .unwrap();
2524
2525        let commits = db.get_commits_by_session("s1").unwrap();
2526        assert_eq!(commits.len(), 1);
2527        assert_eq!(commits[0].commit_hash, "abc123");
2528        assert_eq!(commits[0].session_id, "s1");
2529        assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2530        assert_eq!(commits[0].branch.as_deref(), Some("main"));
2531
2532        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2533        assert_eq!(sessions.len(), 1);
2534        assert_eq!(sessions[0].id, "s1");
2535    }
2536
2537    #[test]
2538    fn test_get_sessions_by_commit() {
2539        let db = test_db();
2540        seed_sessions(&db);
2541        // Link multiple sessions to the same commit
2542        db.link_commit_session("abc123", "s1", None, None).unwrap();
2543        db.link_commit_session("abc123", "s2", None, None).unwrap();
2544        db.link_commit_session("abc123", "s3", None, None).unwrap();
2545
2546        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2547        assert_eq!(sessions.len(), 3);
2548        // Ordered by created_at DESC
2549        assert_eq!(sessions[0].id, "s3");
2550        assert_eq!(sessions[1].id, "s2");
2551        assert_eq!(sessions[2].id, "s1");
2552    }
2553
2554    #[test]
2555    fn test_get_commits_by_session() {
2556        let db = test_db();
2557        seed_sessions(&db);
2558        // Link multiple commits to the same session
2559        db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2560            .unwrap();
2561        db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2562            .unwrap();
2563        db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2564            .unwrap();
2565
2566        let commits = db.get_commits_by_session("s1").unwrap();
2567        assert_eq!(commits.len(), 3);
2568        // All linked to s1
2569        assert!(commits.iter().all(|c| c.session_id == "s1"));
2570    }
2571
2572    #[test]
2573    fn test_duplicate_link_ignored() {
2574        let db = test_db();
2575        seed_sessions(&db);
2576        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2577            .unwrap();
2578        // Inserting the same link again should not error
2579        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2580            .unwrap();
2581
2582        let commits = db.get_commits_by_session("s1").unwrap();
2583        assert_eq!(commits.len(), 1);
2584    }
2585
2586    #[test]
2587    fn test_log_filter_by_commit() {
2588        let db = test_db();
2589        seed_sessions(&db);
2590        db.link_commit_session("abc123", "s2", None, None).unwrap();
2591        db.link_commit_session("abc123", "s4", None, None).unwrap();
2592
2593        let filter = LogFilter {
2594            commit: Some("abc123".to_string()),
2595            ..Default::default()
2596        };
2597        let results = db.list_sessions_log(&filter).unwrap();
2598        assert_eq!(results.len(), 2);
2599        assert_eq!(results[0].id, "s4");
2600        assert_eq!(results[1].id, "s2");
2601
2602        // Non-existent commit returns nothing
2603        let filter = LogFilter {
2604            commit: Some("nonexistent".to_string()),
2605            ..Default::default()
2606        };
2607        let results = db.list_sessions_log(&filter).unwrap();
2608        assert_eq!(results.len(), 0);
2609    }
2610}