Skip to main content

opensession_local_db/
lib.rs

1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16// Keep local cache schema aligned with remote/session schema migrations.
17const REMOTE_MIGRATIONS: &[Migration] = &[
18    ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19    (
20        "0002_team_invite_keys",
21        include_str!("../migrations/0002_team_invite_keys.sql"),
22    ),
23    (
24        "0003_max_active_agents",
25        include_str!("../migrations/0003_max_active_agents.sql"),
26    ),
27    (
28        "0004_oauth_states_provider",
29        include_str!("../migrations/0004_oauth_states_provider.sql"),
30    ),
31    (
32        "0005_sessions_body_url_backfill",
33        include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34    ),
35    (
36        "0006_sessions_remove_fk_constraints",
37        include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38    ),
39    (
40        "0007_sessions_list_perf_indexes",
41        include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42    ),
43    (
44        "0008_teams_force_public",
45        include_str!("../migrations/0008_teams_force_public.sql"),
46    ),
47];
48
49const LOCAL_MIGRATIONS: &[Migration] = &[
50    (
51        "local_0001_schema",
52        include_str!("../migrations/local_0001_schema.sql"),
53    ),
54    (
55        "local_0002_drop_unused_local_sessions",
56        include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
57    ),
58    (
59        "local_0003_timeline_summary_cache",
60        include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
61    ),
62];
63
64/// A local session row stored in the local SQLite database.
65#[derive(Debug, Clone)]
66pub struct LocalSessionRow {
67    pub id: String,
68    pub source_path: Option<String>,
69    pub sync_status: String,
70    pub last_synced_at: Option<String>,
71    pub user_id: Option<String>,
72    pub nickname: Option<String>,
73    pub team_id: Option<String>,
74    pub tool: String,
75    pub agent_provider: Option<String>,
76    pub agent_model: Option<String>,
77    pub title: Option<String>,
78    pub description: Option<String>,
79    pub tags: Option<String>,
80    pub created_at: String,
81    pub uploaded_at: Option<String>,
82    pub message_count: i64,
83    pub user_message_count: i64,
84    pub task_count: i64,
85    pub event_count: i64,
86    pub duration_seconds: i64,
87    pub total_input_tokens: i64,
88    pub total_output_tokens: i64,
89    pub git_remote: Option<String>,
90    pub git_branch: Option<String>,
91    pub git_commit: Option<String>,
92    pub git_repo_name: Option<String>,
93    pub pr_number: Option<i64>,
94    pub pr_url: Option<String>,
95    pub working_directory: Option<String>,
96    pub files_modified: Option<String>,
97    pub files_read: Option<String>,
98    pub has_errors: bool,
99    pub max_active_agents: i64,
100}
101
102/// A link between a git commit and an AI session.
103#[derive(Debug, Clone)]
104pub struct CommitLink {
105    pub commit_hash: String,
106    pub session_id: String,
107    pub repo_path: Option<String>,
108    pub branch: Option<String>,
109    pub created_at: String,
110}
111
112/// A cached timeline summary row stored in local DB.
113#[derive(Debug, Clone)]
114pub struct TimelineSummaryCacheRow {
115    pub lookup_key: String,
116    pub namespace: String,
117    pub compact: String,
118    pub payload: String,
119    pub raw: String,
120    pub cached_at: String,
121}
122
123/// Return true when a cached row corresponds to an OpenCode child session.
124pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
125    if row.tool != "opencode" {
126        return false;
127    }
128
129    // Strong heuristic: some orchestration artifacts are generated as sessions
130    // that do not carry a user-visible request surface and only contain system
131    // or agent-only events.
132    if row.user_message_count <= 0
133        && row.message_count <= 4
134        && row.task_count <= 4
135        && row.event_count > 0
136        && row.event_count <= 16
137    {
138        return true;
139    }
140
141    if is_opencode_subagent_source(row.source_path.as_deref()) {
142        return true;
143    }
144
145    let source_path = match row.source_path.as_deref() {
146        Some(path) if !path.trim().is_empty() => path,
147        _ => return false,
148    };
149
150    parse_opencode_parent_session_id(source_path)
151        .is_some_and(|parent_id| !parent_id.trim().is_empty())
152}
153
154/// Parse `parentID` / `parentId` from an OpenCode session JSON file.
155pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
156    let text = fs::read_to_string(source_path).ok()?;
157    let json: Value = serde_json::from_str(&text).ok()?;
158    lookup_parent_session_id(&json)
159}
160
161fn lookup_parent_session_id(value: &Value) -> Option<String> {
162    match value {
163        Value::Object(obj) => {
164            for (key, value) in obj {
165                if is_parent_id_key(key) {
166                    if let Some(parent_id) = value.as_str() {
167                        let parent_id = parent_id.trim();
168                        if !parent_id.is_empty() {
169                            return Some(parent_id.to_string());
170                        }
171                    }
172                }
173                if let Some(parent_id) = lookup_parent_session_id(value) {
174                    return Some(parent_id);
175                }
176            }
177            None
178        }
179        Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
180        _ => None,
181    }
182}
183
184fn is_parent_id_key(key: &str) -> bool {
185    let flat = key
186        .chars()
187        .filter(|c| c.is_ascii_alphanumeric())
188        .map(|c| c.to_ascii_lowercase())
189        .collect::<String>();
190
191    flat == "parentid"
192        || flat == "parentuuid"
193        || flat == "parentsessionid"
194        || flat == "parentsessionuuid"
195        || flat.ends_with("parentsessionid")
196        || (flat.contains("parent") && flat.ends_with("id"))
197        || (flat.contains("parent") && flat.ends_with("uuid"))
198}
199
200/// Remove OpenCode child sessions so only parent sessions remain visible.
201pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
202    rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
203    rows
204}
205
206fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
207    is_subagent_source(source_path)
208}
209
210fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
211    if row.tool != "claude-code" {
212        return false;
213    }
214
215    is_subagent_source(row.source_path.as_deref())
216}
217
218fn is_subagent_source(source_path: Option<&str>) -> bool {
219    let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
220        return false;
221    };
222
223    if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
224        return true;
225    }
226
227    let filename = match std::path::Path::new(&source_path).file_name() {
228        Some(name) => name.to_string_lossy(),
229        None => return false,
230    };
231
232    filename.starts_with("agent-")
233        || filename.starts_with("agent_")
234        || filename.starts_with("subagent-")
235        || filename.starts_with("subagent_")
236}
237
238/// Filter for listing sessions from the local DB.
239#[derive(Debug, Clone)]
240pub struct LocalSessionFilter {
241    pub team_id: Option<String>,
242    pub sync_status: Option<String>,
243    pub git_repo_name: Option<String>,
244    pub search: Option<String>,
245    pub tool: Option<String>,
246    pub sort: LocalSortOrder,
247    pub time_range: LocalTimeRange,
248    pub limit: Option<u32>,
249    pub offset: Option<u32>,
250}
251
252impl Default for LocalSessionFilter {
253    fn default() -> Self {
254        Self {
255            team_id: None,
256            sync_status: None,
257            git_repo_name: None,
258            search: None,
259            tool: None,
260            sort: LocalSortOrder::Recent,
261            time_range: LocalTimeRange::All,
262            limit: None,
263            offset: None,
264        }
265    }
266}
267
268/// Sort order for local session listing.
269#[derive(Debug, Clone, Default, PartialEq, Eq)]
270pub enum LocalSortOrder {
271    #[default]
272    Recent,
273    Popular,
274    Longest,
275}
276
277/// Time range filter for local session listing.
278#[derive(Debug, Clone, Default, PartialEq, Eq)]
279pub enum LocalTimeRange {
280    Hours24,
281    Days7,
282    Days30,
283    #[default]
284    All,
285}
286
287/// Minimal remote session payload needed for local cache upsert.
288#[derive(Debug, Clone)]
289pub struct RemoteSessionSummary {
290    pub id: String,
291    pub user_id: Option<String>,
292    pub nickname: Option<String>,
293    pub team_id: String,
294    pub tool: String,
295    pub agent_provider: Option<String>,
296    pub agent_model: Option<String>,
297    pub title: Option<String>,
298    pub description: Option<String>,
299    pub tags: Option<String>,
300    pub created_at: String,
301    pub uploaded_at: String,
302    pub message_count: i64,
303    pub task_count: i64,
304    pub event_count: i64,
305    pub duration_seconds: i64,
306    pub total_input_tokens: i64,
307    pub total_output_tokens: i64,
308    pub git_remote: Option<String>,
309    pub git_branch: Option<String>,
310    pub git_commit: Option<String>,
311    pub git_repo_name: Option<String>,
312    pub pr_number: Option<i64>,
313    pub pr_url: Option<String>,
314    pub working_directory: Option<String>,
315    pub files_modified: Option<String>,
316    pub files_read: Option<String>,
317    pub has_errors: bool,
318    pub max_active_agents: i64,
319}
320
321/// Extended filter for the `log` command.
322#[derive(Debug, Default)]
323pub struct LogFilter {
324    /// Filter by tool name (exact match).
325    pub tool: Option<String>,
326    /// Filter by model (glob-like, uses LIKE).
327    pub model: Option<String>,
328    /// Filter sessions created after this ISO8601 timestamp.
329    pub since: Option<String>,
330    /// Filter sessions created before this ISO8601 timestamp.
331    pub before: Option<String>,
332    /// Filter sessions that touched this file path (searches files_modified JSON).
333    pub touches: Option<String>,
334    /// Free-text search in title, description, tags.
335    pub grep: Option<String>,
336    /// Only sessions with errors.
337    pub has_errors: Option<bool>,
338    /// Filter by working directory (prefix match).
339    pub working_directory: Option<String>,
340    /// Filter by git repo name.
341    pub git_repo_name: Option<String>,
342    /// Filter sessions linked to this git commit hash.
343    pub commit: Option<String>,
344    /// Maximum number of results.
345    pub limit: Option<u32>,
346    /// Offset for pagination.
347    pub offset: Option<u32>,
348}
349
350/// Base FROM clause for session list queries.
351const FROM_CLAUSE: &str = "\
352FROM sessions s \
353LEFT JOIN session_sync ss ON ss.session_id = s.id \
354LEFT JOIN users u ON u.id = s.user_id";
355
356/// Local SQLite database shared by TUI and Daemon.
357/// Thread-safe: wraps the connection in a Mutex so it can be shared via `Arc<LocalDb>`.
358pub struct LocalDb {
359    conn: Mutex<Connection>,
360}
361
362impl LocalDb {
363    /// Open (or create) the local database at the default path.
364    /// `~/.local/share/opensession/local.db`
365    pub fn open() -> Result<Self> {
366        let path = default_db_path()?;
367        Self::open_path(&path)
368    }
369
370    /// Open (or create) the local database at a specific path.
371    pub fn open_path(path: &PathBuf) -> Result<Self> {
372        if let Some(parent) = path.parent() {
373            std::fs::create_dir_all(parent)
374                .with_context(|| format!("create dir for {}", path.display()))?;
375        }
376        match open_connection_with_latest_schema(path) {
377            Ok(conn) => Ok(Self {
378                conn: Mutex::new(conn),
379            }),
380            Err(err) => {
381                if !is_schema_compat_error(&err) {
382                    return Err(err);
383                }
384
385                // Local DB is a cache. If schema migration cannot safely reconcile
386                // a legacy/corrupted file, rotate it out and recreate with latest schema.
387                rotate_legacy_db(path)?;
388
389                let conn = open_connection_with_latest_schema(path)
390                    .with_context(|| format!("recreate db {}", path.display()))?;
391                Ok(Self {
392                    conn: Mutex::new(conn),
393                })
394            }
395        }
396    }
397
398    fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
399        self.conn.lock().expect("local db mutex poisoned")
400    }
401
402    // ── Upsert local session (parsed from file) ────────────────────────
403
404    pub fn upsert_local_session(
405        &self,
406        session: &Session,
407        source_path: &str,
408        git: &GitContext,
409    ) -> Result<()> {
410        let title = session.context.title.as_deref();
411        let description = session.context.description.as_deref();
412        let tags = if session.context.tags.is_empty() {
413            None
414        } else {
415            Some(session.context.tags.join(","))
416        };
417        let created_at = session.context.created_at.to_rfc3339();
418        let cwd = session
419            .context
420            .attributes
421            .get("cwd")
422            .or_else(|| session.context.attributes.get("working_directory"))
423            .and_then(|v| v.as_str().map(String::from));
424
425        // Extract files_modified, files_read, and has_errors from events
426        let (files_modified, files_read, has_errors) =
427            opensession_core::extract::extract_file_metadata(session);
428        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
429
430        let conn = self.conn();
431        conn.execute(
432            "INSERT INTO sessions \
433             (id, team_id, tool, agent_provider, agent_model, \
434              title, description, tags, created_at, \
435              message_count, user_message_count, task_count, event_count, duration_seconds, \
436              total_input_tokens, total_output_tokens, body_storage_key, \
437              git_remote, git_branch, git_commit, git_repo_name, working_directory, \
438              files_modified, files_read, has_errors, max_active_agents) \
439             VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
440             ON CONFLICT(id) DO UPDATE SET \
441              tool=excluded.tool, agent_provider=excluded.agent_provider, \
442              agent_model=excluded.agent_model, \
443              title=excluded.title, description=excluded.description, \
444              tags=excluded.tags, \
445              message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
446              task_count=excluded.task_count, \
447              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
448              total_input_tokens=excluded.total_input_tokens, \
449              total_output_tokens=excluded.total_output_tokens, \
450              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
451              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
452              working_directory=excluded.working_directory, \
453              files_modified=excluded.files_modified, files_read=excluded.files_read, \
454              has_errors=excluded.has_errors, \
455              max_active_agents=excluded.max_active_agents",
456            params![
457                &session.session_id,
458                &session.agent.tool,
459                &session.agent.provider,
460                &session.agent.model,
461                title,
462                description,
463                &tags,
464                &created_at,
465                session.stats.message_count as i64,
466                session.stats.user_message_count as i64,
467                session.stats.task_count as i64,
468                session.stats.event_count as i64,
469                session.stats.duration_seconds as i64,
470                session.stats.total_input_tokens as i64,
471                session.stats.total_output_tokens as i64,
472                &git.remote,
473                &git.branch,
474                &git.commit,
475                &git.repo_name,
476                &cwd,
477                &files_modified,
478                &files_read,
479                has_errors,
480                max_active_agents,
481            ],
482        )?;
483
484        conn.execute(
485            "INSERT INTO session_sync (session_id, source_path, sync_status) \
486             VALUES (?1, ?2, 'local_only') \
487             ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
488            params![&session.session_id, source_path],
489        )?;
490        Ok(())
491    }
492
493    // ── Upsert remote session (from server sync pull) ──────────────────
494
495    pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
496        let conn = self.conn();
497        conn.execute(
498            "INSERT INTO sessions \
499             (id, user_id, team_id, tool, agent_provider, agent_model, \
500              title, description, tags, created_at, uploaded_at, \
501              message_count, task_count, event_count, duration_seconds, \
502              total_input_tokens, total_output_tokens, body_storage_key, \
503              git_remote, git_branch, git_commit, git_repo_name, \
504              pr_number, pr_url, working_directory, \
505              files_modified, files_read, has_errors, max_active_agents) \
506             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
507             ON CONFLICT(id) DO UPDATE SET \
508              title=excluded.title, description=excluded.description, \
509              tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
510              message_count=excluded.message_count, task_count=excluded.task_count, \
511              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
512              total_input_tokens=excluded.total_input_tokens, \
513              total_output_tokens=excluded.total_output_tokens, \
514              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
515              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
516              pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
517              working_directory=excluded.working_directory, \
518              files_modified=excluded.files_modified, files_read=excluded.files_read, \
519              has_errors=excluded.has_errors, \
520              max_active_agents=excluded.max_active_agents",
521            params![
522                &summary.id,
523                &summary.user_id,
524                &summary.team_id,
525                &summary.tool,
526                &summary.agent_provider,
527                &summary.agent_model,
528                &summary.title,
529                &summary.description,
530                &summary.tags,
531                &summary.created_at,
532                &summary.uploaded_at,
533                summary.message_count,
534                summary.task_count,
535                summary.event_count,
536                summary.duration_seconds,
537                summary.total_input_tokens,
538                summary.total_output_tokens,
539                &summary.git_remote,
540                &summary.git_branch,
541                &summary.git_commit,
542                &summary.git_repo_name,
543                summary.pr_number,
544                &summary.pr_url,
545                &summary.working_directory,
546                &summary.files_modified,
547                &summary.files_read,
548                summary.has_errors,
549                summary.max_active_agents,
550            ],
551        )?;
552
553        conn.execute(
554            "INSERT INTO session_sync (session_id, sync_status) \
555             VALUES (?1, 'remote_only') \
556             ON CONFLICT(session_id) DO UPDATE SET \
557              sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
558            params![&summary.id],
559        )?;
560        Ok(())
561    }
562
563    // ── List sessions ──────────────────────────────────────────────────
564
565    fn build_local_session_where_clause(
566        filter: &LocalSessionFilter,
567    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
568        let mut where_clauses = vec![
569            "1=1".to_string(),
570            "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
571            "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
572            "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
573        ];
574        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
575        let mut idx = 1u32;
576
577        if let Some(ref team_id) = filter.team_id {
578            where_clauses.push(format!("s.team_id = ?{idx}"));
579            param_values.push(Box::new(team_id.clone()));
580            idx += 1;
581        }
582
583        if let Some(ref sync_status) = filter.sync_status {
584            where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
585            param_values.push(Box::new(sync_status.clone()));
586            idx += 1;
587        }
588
589        if let Some(ref repo) = filter.git_repo_name {
590            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
591            param_values.push(Box::new(repo.clone()));
592            idx += 1;
593        }
594
595        if let Some(ref tool) = filter.tool {
596            where_clauses.push(format!("s.tool = ?{idx}"));
597            param_values.push(Box::new(tool.clone()));
598            idx += 1;
599        }
600
601        if let Some(ref search) = filter.search {
602            let like = format!("%{search}%");
603            where_clauses.push(format!(
604                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
605                i1 = idx,
606                i2 = idx + 1,
607                i3 = idx + 2,
608            ));
609            param_values.push(Box::new(like.clone()));
610            param_values.push(Box::new(like.clone()));
611            param_values.push(Box::new(like));
612            idx += 3;
613        }
614
615        let interval = match filter.time_range {
616            LocalTimeRange::Hours24 => Some("-1 day"),
617            LocalTimeRange::Days7 => Some("-7 days"),
618            LocalTimeRange::Days30 => Some("-30 days"),
619            LocalTimeRange::All => None,
620        };
621        if let Some(interval) = interval {
622            where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
623            param_values.push(Box::new(interval.to_string()));
624        }
625
626        (where_clauses.join(" AND "), param_values)
627    }
628
629    pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
630        let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
631        let order_clause = match filter.sort {
632            LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
633            LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
634            LocalSortOrder::Recent => "s.created_at DESC",
635        };
636
637        let mut sql = format!(
638            "SELECT {LOCAL_SESSION_COLUMNS} \
639             {FROM_CLAUSE} WHERE {where_str} \
640             ORDER BY {order_clause}"
641        );
642
643        if let Some(limit) = filter.limit {
644            sql.push_str(" LIMIT ?");
645            param_values.push(Box::new(limit));
646            if let Some(offset) = filter.offset {
647                sql.push_str(" OFFSET ?");
648                param_values.push(Box::new(offset));
649            }
650        }
651
652        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
653            param_values.iter().map(|p| p.as_ref()).collect();
654        let conn = self.conn();
655        let mut stmt = conn.prepare(&sql)?;
656        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
657
658        let mut result = Vec::new();
659        for row in rows {
660            result.push(row?);
661        }
662
663        Ok(hide_opencode_child_sessions(result))
664    }
665
666    /// Count sessions for a given list filter (before UI-level page slicing).
667    pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
668        let mut count_filter = filter.clone();
669        count_filter.limit = None;
670        count_filter.offset = None;
671        let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
672        let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
673        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
674            param_values.iter().map(|p| p.as_ref()).collect();
675        let conn = self.conn();
676        let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
677        Ok(count)
678    }
679
680    /// List distinct tool names for the current list filter (ignores active tool filter).
681    pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
682        let mut tool_filter = filter.clone();
683        tool_filter.tool = None;
684        tool_filter.limit = None;
685        tool_filter.offset = None;
686        let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
687        let sql = format!(
688            "SELECT DISTINCT s.tool \
689             {FROM_CLAUSE} WHERE {where_str} \
690             ORDER BY s.tool ASC"
691        );
692        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
693            param_values.iter().map(|p| p.as_ref()).collect();
694        let conn = self.conn();
695        let mut stmt = conn.prepare(&sql)?;
696        let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
697
698        let mut tools = Vec::new();
699        for row in rows {
700            let tool = row?;
701            if !tool.trim().is_empty() {
702                tools.push(tool);
703            }
704        }
705        Ok(tools)
706    }
707
708    // ── Log query ─────────────────────────────────────────────────────
709
710    /// Query sessions with extended filters for the `log` command.
711    pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
712        let mut where_clauses = vec!["1=1".to_string()];
713        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
714        let mut idx = 1u32;
715
716        if let Some(ref tool) = filter.tool {
717            where_clauses.push(format!("s.tool = ?{idx}"));
718            param_values.push(Box::new(tool.clone()));
719            idx += 1;
720        }
721
722        if let Some(ref model) = filter.model {
723            let like = model.replace('*', "%");
724            where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
725            param_values.push(Box::new(like));
726            idx += 1;
727        }
728
729        if let Some(ref since) = filter.since {
730            where_clauses.push(format!("s.created_at >= ?{idx}"));
731            param_values.push(Box::new(since.clone()));
732            idx += 1;
733        }
734
735        if let Some(ref before) = filter.before {
736            where_clauses.push(format!("s.created_at < ?{idx}"));
737            param_values.push(Box::new(before.clone()));
738            idx += 1;
739        }
740
741        if let Some(ref touches) = filter.touches {
742            let like = format!("%\"{touches}\"%");
743            where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
744            param_values.push(Box::new(like));
745            idx += 1;
746        }
747
748        if let Some(ref grep) = filter.grep {
749            let like = format!("%{grep}%");
750            where_clauses.push(format!(
751                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
752                i1 = idx,
753                i2 = idx + 1,
754                i3 = idx + 2,
755            ));
756            param_values.push(Box::new(like.clone()));
757            param_values.push(Box::new(like.clone()));
758            param_values.push(Box::new(like));
759            idx += 3;
760        }
761
762        if let Some(true) = filter.has_errors {
763            where_clauses.push("s.has_errors = 1".to_string());
764        }
765
766        if let Some(ref wd) = filter.working_directory {
767            where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
768            param_values.push(Box::new(format!("{wd}%")));
769            idx += 1;
770        }
771
772        if let Some(ref repo) = filter.git_repo_name {
773            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
774            param_values.push(Box::new(repo.clone()));
775            idx += 1;
776        }
777
778        // Optional JOIN for commit hash filter
779        let mut extra_join = String::new();
780        if let Some(ref commit) = filter.commit {
781            extra_join =
782                " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
783            where_clauses.push(format!("csl.commit_hash = ?{idx}"));
784            param_values.push(Box::new(commit.clone()));
785            idx += 1;
786        }
787
788        let _ = idx; // suppress unused warning
789
790        let where_str = where_clauses.join(" AND ");
791        let mut sql = format!(
792            "SELECT {LOCAL_SESSION_COLUMNS} \
793             {FROM_CLAUSE}{extra_join} WHERE {where_str} \
794             ORDER BY s.created_at DESC"
795        );
796
797        if let Some(limit) = filter.limit {
798            sql.push_str(" LIMIT ?");
799            param_values.push(Box::new(limit));
800            if let Some(offset) = filter.offset {
801                sql.push_str(" OFFSET ?");
802                param_values.push(Box::new(offset));
803            }
804        }
805
806        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
807            param_values.iter().map(|p| p.as_ref()).collect();
808        let conn = self.conn();
809        let mut stmt = conn.prepare(&sql)?;
810        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
811
812        let mut result = Vec::new();
813        for row in rows {
814            result.push(row?);
815        }
816        Ok(hide_opencode_child_sessions(result))
817    }
818
819    /// Get the latest N sessions for a specific tool, ordered by created_at DESC.
820    pub fn get_sessions_by_tool_latest(
821        &self,
822        tool: &str,
823        count: u32,
824    ) -> Result<Vec<LocalSessionRow>> {
825        let sql = format!(
826            "SELECT {LOCAL_SESSION_COLUMNS} \
827             {FROM_CLAUSE} WHERE s.tool = ?1 \
828             ORDER BY s.created_at DESC"
829        );
830        let conn = self.conn();
831        let mut stmt = conn.prepare(&sql)?;
832        let rows = stmt.query_map(params![tool], row_to_local_session)?;
833        let mut result = Vec::new();
834        for row in rows {
835            result.push(row?);
836        }
837
838        let mut filtered = hide_opencode_child_sessions(result);
839        filtered.truncate(count as usize);
840        Ok(filtered)
841    }
842
843    /// Get the latest N sessions across all tools, ordered by created_at DESC.
844    pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
845        let sql = format!(
846            "SELECT {LOCAL_SESSION_COLUMNS} \
847             {FROM_CLAUSE} \
848             ORDER BY s.created_at DESC"
849        );
850        let conn = self.conn();
851        let mut stmt = conn.prepare(&sql)?;
852        let rows = stmt.query_map([], row_to_local_session)?;
853        let mut result = Vec::new();
854        for row in rows {
855            result.push(row?);
856        }
857
858        let mut filtered = hide_opencode_child_sessions(result);
859        filtered.truncate(count as usize);
860        Ok(filtered)
861    }
862
863    /// Get the Nth most recent session for a specific tool (0 = HEAD, 1 = HEAD~1, etc.).
864    pub fn get_session_by_tool_offset(
865        &self,
866        tool: &str,
867        offset: u32,
868    ) -> Result<Option<LocalSessionRow>> {
869        let sql = format!(
870            "SELECT {LOCAL_SESSION_COLUMNS} \
871             {FROM_CLAUSE} WHERE s.tool = ?1 \
872             ORDER BY s.created_at DESC"
873        );
874        let conn = self.conn();
875        let mut stmt = conn.prepare(&sql)?;
876        let rows = stmt.query_map(params![tool], row_to_local_session)?;
877        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
878        Ok(filtered.into_iter().nth(offset as usize))
879    }
880
881    /// Get the Nth most recent session across all tools (0 = HEAD, 1 = HEAD~1, etc.).
882    pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
883        let sql = format!(
884            "SELECT {LOCAL_SESSION_COLUMNS} \
885             {FROM_CLAUSE} \
886             ORDER BY s.created_at DESC"
887        );
888        let conn = self.conn();
889        let mut stmt = conn.prepare(&sql)?;
890        let rows = stmt.query_map([], row_to_local_session)?;
891        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
892        Ok(filtered.into_iter().nth(offset as usize))
893    }
894
895    /// Fetch the source path used when the session was last parsed/loaded.
896    pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
897        let conn = self.conn();
898        let result = conn
899            .query_row(
900                "SELECT source_path FROM session_sync WHERE session_id = ?1",
901                params![session_id],
902                |row| row.get(0),
903            )
904            .optional()?;
905
906        Ok(result)
907    }
908
909    /// Count total sessions in the local DB.
910    pub fn session_count(&self) -> Result<i64> {
911        let count = self
912            .conn()
913            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
914        Ok(count)
915    }
916
917    // ── Delete session ─────────────────────────────────────────────────
918
919    pub fn delete_session(&self, session_id: &str) -> Result<()> {
920        let conn = self.conn();
921        conn.execute(
922            "DELETE FROM body_cache WHERE session_id = ?1",
923            params![session_id],
924        )?;
925        conn.execute(
926            "DELETE FROM session_sync WHERE session_id = ?1",
927            params![session_id],
928        )?;
929        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
930        Ok(())
931    }
932
933    // ── Sync cursor ────────────────────────────────────────────────────
934
935    pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
936        let cursor = self
937            .conn()
938            .query_row(
939                "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
940                params![team_id],
941                |row| row.get(0),
942            )
943            .optional()?;
944        Ok(cursor)
945    }
946
947    pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
948        self.conn().execute(
949            "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
950             VALUES (?1, ?2, datetime('now')) \
951             ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
952            params![team_id, cursor],
953        )?;
954        Ok(())
955    }
956
957    // ── Upload tracking ────────────────────────────────────────────────
958
959    /// Get sessions that are local_only and need to be uploaded.
960    pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
961        let sql = format!(
962            "SELECT {LOCAL_SESSION_COLUMNS} \
963             FROM sessions s \
964             INNER JOIN session_sync ss ON ss.session_id = s.id \
965             LEFT JOIN users u ON u.id = s.user_id \
966             WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
967             ORDER BY s.created_at ASC"
968        );
969        let conn = self.conn();
970        let mut stmt = conn.prepare(&sql)?;
971        let rows = stmt.query_map(params![team_id], row_to_local_session)?;
972        let mut result = Vec::new();
973        for row in rows {
974            result.push(row?);
975        }
976        Ok(result)
977    }
978
979    pub fn mark_synced(&self, session_id: &str) -> Result<()> {
980        self.conn().execute(
981            "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
982             WHERE session_id = ?1",
983            params![session_id],
984        )?;
985        Ok(())
986    }
987
988    /// Check if a session was already uploaded (synced or remote_only) since the given modification time.
989    pub fn was_uploaded_after(
990        &self,
991        source_path: &str,
992        modified: &chrono::DateTime<chrono::Utc>,
993    ) -> Result<bool> {
994        let result: Option<String> = self
995            .conn()
996            .query_row(
997                "SELECT last_synced_at FROM session_sync \
998                 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
999                params![source_path],
1000                |row| row.get(0),
1001            )
1002            .optional()?;
1003
1004        if let Some(synced_at) = result {
1005            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1006                return Ok(dt >= *modified);
1007            }
1008        }
1009        Ok(false)
1010    }
1011
1012    // ── Body cache ─────────────────────────────────────────────────────
1013
1014    pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1015        self.conn().execute(
1016            "INSERT INTO body_cache (session_id, body, cached_at) \
1017             VALUES (?1, ?2, datetime('now')) \
1018             ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1019            params![session_id, body],
1020        )?;
1021        Ok(())
1022    }
1023
1024    pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1025        let body = self
1026            .conn()
1027            .query_row(
1028                "SELECT body FROM body_cache WHERE session_id = ?1",
1029                params![session_id],
1030                |row| row.get(0),
1031            )
1032            .optional()?;
1033        Ok(body)
1034    }
1035
1036    // ── Timeline summary cache ────────────────────────────────────────
1037
1038    pub fn upsert_timeline_summary_cache(
1039        &self,
1040        lookup_key: &str,
1041        namespace: &str,
1042        compact: &str,
1043        payload: &str,
1044        raw: &str,
1045    ) -> Result<()> {
1046        self.conn().execute(
1047            "INSERT INTO timeline_summary_cache \
1048             (lookup_key, namespace, compact, payload, raw, cached_at) \
1049             VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1050             ON CONFLICT(lookup_key) DO UPDATE SET \
1051               namespace = excluded.namespace, \
1052               compact = excluded.compact, \
1053               payload = excluded.payload, \
1054               raw = excluded.raw, \
1055               cached_at = datetime('now')",
1056            params![lookup_key, namespace, compact, payload, raw],
1057        )?;
1058        Ok(())
1059    }
1060
1061    pub fn list_timeline_summary_cache_by_namespace(
1062        &self,
1063        namespace: &str,
1064    ) -> Result<Vec<TimelineSummaryCacheRow>> {
1065        let conn = self.conn();
1066        let mut stmt = conn.prepare(
1067            "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1068             FROM timeline_summary_cache \
1069             WHERE namespace = ?1 \
1070             ORDER BY cached_at DESC",
1071        )?;
1072        let rows = stmt.query_map(params![namespace], |row| {
1073            Ok(TimelineSummaryCacheRow {
1074                lookup_key: row.get(0)?,
1075                namespace: row.get(1)?,
1076                compact: row.get(2)?,
1077                payload: row.get(3)?,
1078                raw: row.get(4)?,
1079                cached_at: row.get(5)?,
1080            })
1081        })?;
1082
1083        let mut out = Vec::new();
1084        for row in rows {
1085            out.push(row?);
1086        }
1087        Ok(out)
1088    }
1089
1090    pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1091        let affected = self
1092            .conn()
1093            .execute("DELETE FROM timeline_summary_cache", [])?;
1094        Ok(affected)
1095    }
1096
1097    // ── Migration helper ───────────────────────────────────────────────
1098
1099    /// Migrate entries from the old state.json UploadState into the local DB.
1100    /// Marks them as `synced` with no metadata (we only know the file path was uploaded).
1101    pub fn migrate_from_state_json(
1102        &self,
1103        uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1104    ) -> Result<usize> {
1105        let mut count = 0;
1106        for (path, uploaded_at) in uploaded {
1107            let exists: bool = self
1108                .conn()
1109                .query_row(
1110                    "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1111                    params![path],
1112                    |row| row.get(0),
1113                )
1114                .unwrap_or(false);
1115
1116            if exists {
1117                self.conn().execute(
1118                    "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1119                     WHERE source_path = ?2 AND sync_status = 'local_only'",
1120                    params![uploaded_at.to_rfc3339(), path],
1121                )?;
1122                count += 1;
1123            }
1124        }
1125        Ok(count)
1126    }
1127
1128    // ── Commit ↔ session linking ────────────────────────────────────
1129
1130    /// Link a git commit to an AI session.
1131    pub fn link_commit_session(
1132        &self,
1133        commit_hash: &str,
1134        session_id: &str,
1135        repo_path: Option<&str>,
1136        branch: Option<&str>,
1137    ) -> Result<()> {
1138        self.conn().execute(
1139            "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1140             VALUES (?1, ?2, ?3, ?4) \
1141             ON CONFLICT(commit_hash, session_id) DO NOTHING",
1142            params![commit_hash, session_id, repo_path, branch],
1143        )?;
1144        Ok(())
1145    }
1146
1147    /// Get all sessions linked to a git commit.
1148    pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1149        let sql = format!(
1150            "SELECT {LOCAL_SESSION_COLUMNS} \
1151             {FROM_CLAUSE} \
1152             INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1153             WHERE csl.commit_hash = ?1 \
1154             ORDER BY s.created_at DESC"
1155        );
1156        let conn = self.conn();
1157        let mut stmt = conn.prepare(&sql)?;
1158        let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1159        let mut result = Vec::new();
1160        for row in rows {
1161            result.push(row?);
1162        }
1163        Ok(result)
1164    }
1165
1166    /// Get all commits linked to a session.
1167    pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1168        let conn = self.conn();
1169        let mut stmt = conn.prepare(
1170            "SELECT commit_hash, session_id, repo_path, branch, created_at \
1171             FROM commit_session_links WHERE session_id = ?1 \
1172             ORDER BY created_at DESC",
1173        )?;
1174        let rows = stmt.query_map(params![session_id], |row| {
1175            Ok(CommitLink {
1176                commit_hash: row.get(0)?,
1177                session_id: row.get(1)?,
1178                repo_path: row.get(2)?,
1179                branch: row.get(3)?,
1180                created_at: row.get(4)?,
1181            })
1182        })?;
1183        let mut result = Vec::new();
1184        for row in rows {
1185            result.push(row?);
1186        }
1187        Ok(result)
1188    }
1189
1190    /// Find the most recently active session for a given repo path.
1191    /// "Active" means the session's working_directory matches the repo path
1192    /// and was created within the last `since_minutes` minutes.
1193    pub fn find_active_session_for_repo(
1194        &self,
1195        repo_path: &str,
1196        since_minutes: u32,
1197    ) -> Result<Option<LocalSessionRow>> {
1198        let sql = format!(
1199            "SELECT {LOCAL_SESSION_COLUMNS} \
1200             {FROM_CLAUSE} \
1201             WHERE s.working_directory LIKE ?1 \
1202             AND s.created_at >= datetime('now', ?2) \
1203             ORDER BY s.created_at DESC LIMIT 1"
1204        );
1205        let since = format!("-{since_minutes} minutes");
1206        let like = format!("{repo_path}%");
1207        let conn = self.conn();
1208        let mut stmt = conn.prepare(&sql)?;
1209        let row = stmt
1210            .query_map(params![like, since], row_to_local_session)?
1211            .next()
1212            .transpose()?;
1213        Ok(row)
1214    }
1215
1216    /// Get all session IDs currently in the local DB.
1217    pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1218        let conn = self.conn();
1219        let mut stmt = conn
1220            .prepare("SELECT id FROM sessions")
1221            .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1222        let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1223        let mut set = std::collections::HashSet::new();
1224        if let Ok(rows) = rows {
1225            for row in rows.flatten() {
1226                set.insert(row);
1227            }
1228        }
1229        set
1230    }
1231
1232    /// Update only stats fields for an existing session (no git context re-extraction).
1233    pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1234        let title = session.context.title.as_deref();
1235        let description = session.context.description.as_deref();
1236        let (files_modified, files_read, has_errors) =
1237            opensession_core::extract::extract_file_metadata(session);
1238        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1239
1240        self.conn().execute(
1241            "UPDATE sessions SET \
1242             title=?2, description=?3, \
1243             message_count=?4, user_message_count=?5, task_count=?6, \
1244             event_count=?7, duration_seconds=?8, \
1245             total_input_tokens=?9, total_output_tokens=?10, \
1246             files_modified=?11, files_read=?12, has_errors=?13, \
1247             max_active_agents=?14 \
1248             WHERE id=?1",
1249            params![
1250                &session.session_id,
1251                title,
1252                description,
1253                session.stats.message_count as i64,
1254                session.stats.user_message_count as i64,
1255                session.stats.task_count as i64,
1256                session.stats.event_count as i64,
1257                session.stats.duration_seconds as i64,
1258                session.stats.total_input_tokens as i64,
1259                session.stats.total_output_tokens as i64,
1260                &files_modified,
1261                &files_read,
1262                has_errors,
1263                max_active_agents,
1264            ],
1265        )?;
1266        Ok(())
1267    }
1268
1269    /// Update only sync metadata path for an existing session.
1270    pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1271        self.conn().execute(
1272            "INSERT INTO session_sync (session_id, source_path) \
1273             VALUES (?1, ?2) \
1274             ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1275            params![session_id, source_path],
1276        )?;
1277        Ok(())
1278    }
1279
1280    /// Get a list of distinct git repo names present in the DB.
1281    pub fn list_repos(&self) -> Result<Vec<String>> {
1282        let conn = self.conn();
1283        let mut stmt = conn.prepare(
1284            "SELECT DISTINCT git_repo_name FROM sessions \
1285             WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1286        )?;
1287        let rows = stmt.query_map([], |row| row.get(0))?;
1288        let mut result = Vec::new();
1289        for row in rows {
1290            result.push(row?);
1291        }
1292        Ok(result)
1293    }
1294}
1295
1296// ── Legacy schema backfill ─────────────────────────────────────────────
1297
1298fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1299    let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1300    conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1301
1302    // Disable FK constraints for local DB (it's a cache, not source of truth)
1303    conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1304
1305    apply_local_migrations(&conn)?;
1306
1307    // Backfill missing columns for legacy local DBs where `sessions` already
1308    // existed before newer fields were introduced.
1309    ensure_sessions_columns(&conn)?;
1310    validate_local_schema(&conn)?;
1311
1312    Ok(conn)
1313}
1314
1315fn apply_local_migrations(conn: &Connection) -> Result<()> {
1316    conn.execute_batch(
1317        "CREATE TABLE IF NOT EXISTS _migrations (
1318            id INTEGER PRIMARY KEY,
1319            name TEXT NOT NULL UNIQUE,
1320            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1321        );",
1322    )
1323    .context("create _migrations table for local db")?;
1324
1325    for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1326        let already_applied: bool = conn
1327            .query_row(
1328                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1329                [name],
1330                |row| row.get(0),
1331            )
1332            .unwrap_or(false);
1333
1334        if already_applied {
1335            continue;
1336        }
1337
1338        if let Err(e) = conn.execute_batch(sql) {
1339            let msg = e.to_string().to_ascii_lowercase();
1340            if !is_local_migration_compat_error(&msg) {
1341                return Err(e).with_context(|| format!("apply local migration {name}"));
1342            }
1343        }
1344
1345        conn.execute(
1346            "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1347            [name],
1348        )
1349        .with_context(|| format!("record local migration {name}"))?;
1350    }
1351
1352    Ok(())
1353}
1354
1355fn is_local_migration_compat_error(msg: &str) -> bool {
1356    msg.contains("duplicate column name")
1357        || msg.contains("no such column")
1358        || msg.contains("already exists")
1359}
1360
1361fn validate_local_schema(conn: &Connection) -> Result<()> {
1362    let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1363    conn.prepare(&sql)
1364        .map(|_| ())
1365        .context("validate local session schema")
1366}
1367
1368fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1369    let msg = format!("{err:#}").to_ascii_lowercase();
1370    msg.contains("no such column")
1371        || msg.contains("no such table")
1372        || msg.contains("cannot add a column")
1373        || msg.contains("already exists")
1374        || msg.contains("views may not be indexed")
1375        || msg.contains("malformed database schema")
1376        || msg.contains("duplicate column name")
1377}
1378
1379fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1380    if !path.exists() {
1381        return Ok(());
1382    }
1383
1384    let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1385    let backup_name = format!(
1386        "{}.legacy-{}.bak",
1387        path.file_name()
1388            .and_then(|n| n.to_str())
1389            .unwrap_or("local.db"),
1390        ts
1391    );
1392    let backup_path = path.with_file_name(backup_name);
1393    std::fs::rename(path, &backup_path).with_context(|| {
1394        format!(
1395            "rotate legacy db {} -> {}",
1396            path.display(),
1397            backup_path.display()
1398        )
1399    })?;
1400
1401    let wal = PathBuf::from(format!("{}-wal", path.display()));
1402    let shm = PathBuf::from(format!("{}-shm", path.display()));
1403    let _ = std::fs::remove_file(wal);
1404    let _ = std::fs::remove_file(shm);
1405    Ok(())
1406}
1407
1408const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1409    ("user_id", "TEXT"),
1410    ("team_id", "TEXT DEFAULT 'personal'"),
1411    ("tool", "TEXT DEFAULT ''"),
1412    ("agent_provider", "TEXT"),
1413    ("agent_model", "TEXT"),
1414    ("title", "TEXT"),
1415    ("description", "TEXT"),
1416    ("tags", "TEXT"),
1417    ("created_at", "TEXT DEFAULT ''"),
1418    ("uploaded_at", "TEXT DEFAULT ''"),
1419    ("message_count", "INTEGER DEFAULT 0"),
1420    ("user_message_count", "INTEGER DEFAULT 0"),
1421    ("task_count", "INTEGER DEFAULT 0"),
1422    ("event_count", "INTEGER DEFAULT 0"),
1423    ("duration_seconds", "INTEGER DEFAULT 0"),
1424    ("total_input_tokens", "INTEGER DEFAULT 0"),
1425    ("total_output_tokens", "INTEGER DEFAULT 0"),
1426    ("body_storage_key", "TEXT DEFAULT ''"),
1427    ("body_url", "TEXT"),
1428    ("git_remote", "TEXT"),
1429    ("git_branch", "TEXT"),
1430    ("git_commit", "TEXT"),
1431    ("git_repo_name", "TEXT"),
1432    ("pr_number", "INTEGER"),
1433    ("pr_url", "TEXT"),
1434    ("working_directory", "TEXT"),
1435    ("files_modified", "TEXT"),
1436    ("files_read", "TEXT"),
1437    ("has_errors", "BOOLEAN DEFAULT 0"),
1438    ("max_active_agents", "INTEGER DEFAULT 1"),
1439];
1440
1441fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1442    let mut existing = HashSet::new();
1443    let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1444    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1445    for row in rows {
1446        existing.insert(row?);
1447    }
1448
1449    for (name, decl) in REQUIRED_SESSION_COLUMNS {
1450        if existing.contains(*name) {
1451            continue;
1452        }
1453        let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1454        conn.execute_batch(&sql)
1455            .with_context(|| format!("add legacy sessions column '{name}'"))?;
1456    }
1457
1458    Ok(())
1459}
1460
1461/// Column list for SELECT queries against sessions + session_sync + users.
1462pub const LOCAL_SESSION_COLUMNS: &str = "\
1463s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1464s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1465s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1466s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1467s.total_input_tokens, s.total_output_tokens, \
1468s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1469s.pr_number, s.pr_url, s.working_directory, \
1470s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1471
1472fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1473    Ok(LocalSessionRow {
1474        id: row.get(0)?,
1475        source_path: row.get(1)?,
1476        sync_status: row.get(2)?,
1477        last_synced_at: row.get(3)?,
1478        user_id: row.get(4)?,
1479        nickname: row.get(5)?,
1480        team_id: row.get(6)?,
1481        tool: row.get(7)?,
1482        agent_provider: row.get(8)?,
1483        agent_model: row.get(9)?,
1484        title: row.get(10)?,
1485        description: row.get(11)?,
1486        tags: row.get(12)?,
1487        created_at: row.get(13)?,
1488        uploaded_at: row.get(14)?,
1489        message_count: row.get(15)?,
1490        user_message_count: row.get(16)?,
1491        task_count: row.get(17)?,
1492        event_count: row.get(18)?,
1493        duration_seconds: row.get(19)?,
1494        total_input_tokens: row.get(20)?,
1495        total_output_tokens: row.get(21)?,
1496        git_remote: row.get(22)?,
1497        git_branch: row.get(23)?,
1498        git_commit: row.get(24)?,
1499        git_repo_name: row.get(25)?,
1500        pr_number: row.get(26)?,
1501        pr_url: row.get(27)?,
1502        working_directory: row.get(28)?,
1503        files_modified: row.get(29)?,
1504        files_read: row.get(30)?,
1505        has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1506        max_active_agents: row.get(32).unwrap_or(1),
1507    })
1508}
1509
1510fn default_db_path() -> Result<PathBuf> {
1511    let home = std::env::var("HOME")
1512        .or_else(|_| std::env::var("USERPROFILE"))
1513        .context("Could not determine home directory")?;
1514    Ok(PathBuf::from(home)
1515        .join(".local")
1516        .join("share")
1517        .join("opensession")
1518        .join("local.db"))
1519}
1520
1521#[cfg(test)]
1522mod tests {
1523    use super::*;
1524
1525    use std::collections::BTreeSet;
1526    use std::fs::{create_dir_all, write};
1527    use tempfile::tempdir;
1528
1529    fn test_db() -> LocalDb {
1530        let dir = tempdir().unwrap();
1531        let path = dir.keep().join("test.db");
1532        LocalDb::open_path(&path).unwrap()
1533    }
1534
1535    fn temp_root() -> tempfile::TempDir {
1536        tempdir().unwrap()
1537    }
1538
1539    fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1540        LocalSessionRow {
1541            id: id.to_string(),
1542            source_path: source_path.map(String::from),
1543            sync_status: "local_only".to_string(),
1544            last_synced_at: None,
1545            user_id: None,
1546            nickname: None,
1547            team_id: None,
1548            tool: tool.to_string(),
1549            agent_provider: None,
1550            agent_model: None,
1551            title: Some("test".to_string()),
1552            description: None,
1553            tags: None,
1554            created_at: "2024-01-01T00:00:00Z".to_string(),
1555            uploaded_at: None,
1556            message_count: 0,
1557            user_message_count: 0,
1558            task_count: 0,
1559            event_count: 0,
1560            duration_seconds: 0,
1561            total_input_tokens: 0,
1562            total_output_tokens: 0,
1563            git_remote: None,
1564            git_branch: None,
1565            git_commit: None,
1566            git_repo_name: None,
1567            pr_number: None,
1568            pr_url: None,
1569            working_directory: None,
1570            files_modified: None,
1571            files_read: None,
1572            has_errors: false,
1573            max_active_agents: 1,
1574        }
1575    }
1576
1577    #[test]
1578    fn test_open_and_schema() {
1579        let _db = test_db();
1580    }
1581
1582    #[test]
1583    fn test_open_backfills_legacy_sessions_columns() {
1584        let dir = tempfile::tempdir().unwrap();
1585        let path = dir.path().join("legacy.db");
1586        {
1587            let conn = Connection::open(&path).unwrap();
1588            conn.execute_batch(
1589                "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1590                 INSERT INTO sessions (id) VALUES ('legacy-1');",
1591            )
1592            .unwrap();
1593        }
1594
1595        let db = LocalDb::open_path(&path).unwrap();
1596        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1597        assert_eq!(rows.len(), 1);
1598        assert_eq!(rows[0].id, "legacy-1");
1599        assert_eq!(rows[0].user_message_count, 0);
1600    }
1601
1602    #[test]
1603    fn test_open_rotates_incompatible_legacy_schema() {
1604        let dir = tempfile::tempdir().unwrap();
1605        let path = dir.path().join("broken.db");
1606        {
1607            let conn = Connection::open(&path).unwrap();
1608            conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1609                .unwrap();
1610        }
1611
1612        let db = LocalDb::open_path(&path).unwrap();
1613        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1614        assert!(rows.is_empty());
1615
1616        let rotated = std::fs::read_dir(dir.path())
1617            .unwrap()
1618            .filter_map(Result::ok)
1619            .any(|entry| {
1620                let name = entry.file_name();
1621                let name = name.to_string_lossy();
1622                name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1623            });
1624        assert!(rotated, "expected rotated legacy backup file");
1625    }
1626
1627    #[test]
1628    fn test_is_opencode_child_session() {
1629        let root = temp_root();
1630        let dir = root.path().join("sessions");
1631        create_dir_all(&dir).unwrap();
1632        let parent_session = dir.join("parent.json");
1633        write(
1634            &parent_session,
1635            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1636        )
1637        .unwrap();
1638        let child_session = dir.join("child.json");
1639        write(
1640            &child_session,
1641            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1642        )
1643        .unwrap();
1644
1645        let parent = make_row(
1646            "ses_parent",
1647            "opencode",
1648            Some(parent_session.to_str().unwrap()),
1649        );
1650        let child = make_row(
1651            "ses_child",
1652            "opencode",
1653            Some(child_session.to_str().unwrap()),
1654        );
1655        let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1656
1657        assert!(!is_opencode_child_session(&parent));
1658        assert!(is_opencode_child_session(&child));
1659        assert!(!is_opencode_child_session(&codex));
1660    }
1661
1662    #[test]
1663    fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1664        let child = LocalSessionRow {
1665            id: "sess_child".to_string(),
1666            source_path: None,
1667            sync_status: "local_only".to_string(),
1668            last_synced_at: None,
1669            user_id: None,
1670            nickname: None,
1671            team_id: None,
1672            tool: "opencode".to_string(),
1673            agent_provider: None,
1674            agent_model: None,
1675            title: None,
1676            description: None,
1677            tags: None,
1678            created_at: "2024-01-01T00:00:00Z".to_string(),
1679            uploaded_at: None,
1680            message_count: 1,
1681            user_message_count: 0,
1682            task_count: 4,
1683            event_count: 4,
1684            duration_seconds: 0,
1685            total_input_tokens: 0,
1686            total_output_tokens: 0,
1687            git_remote: None,
1688            git_branch: None,
1689            git_commit: None,
1690            git_repo_name: None,
1691            pr_number: None,
1692            pr_url: None,
1693            working_directory: None,
1694            files_modified: None,
1695            files_read: None,
1696            has_errors: false,
1697            max_active_agents: 1,
1698        };
1699
1700        let parent = LocalSessionRow {
1701            id: "sess_parent".to_string(),
1702            source_path: None,
1703            sync_status: "local_only".to_string(),
1704            last_synced_at: None,
1705            user_id: None,
1706            nickname: None,
1707            team_id: None,
1708            tool: "opencode".to_string(),
1709            agent_provider: None,
1710            agent_model: None,
1711            title: Some("regular".to_string()),
1712            description: None,
1713            tags: None,
1714            created_at: "2024-01-01T00:00:00Z".to_string(),
1715            uploaded_at: None,
1716            message_count: 1,
1717            user_message_count: 1,
1718            task_count: 2,
1719            event_count: 20,
1720            duration_seconds: 0,
1721            total_input_tokens: 0,
1722            total_output_tokens: 0,
1723            git_remote: None,
1724            git_branch: None,
1725            git_commit: None,
1726            git_repo_name: None,
1727            pr_number: None,
1728            pr_url: None,
1729            working_directory: None,
1730            files_modified: None,
1731            files_read: None,
1732            has_errors: false,
1733            max_active_agents: 1,
1734        };
1735
1736        assert!(is_opencode_child_session(&child));
1737        assert!(!is_opencode_child_session(&parent));
1738    }
1739
1740    #[test]
1741    fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1742        let child = LocalSessionRow {
1743            id: "sess_child_2".to_string(),
1744            source_path: None,
1745            sync_status: "local_only".to_string(),
1746            last_synced_at: None,
1747            user_id: None,
1748            nickname: None,
1749            team_id: None,
1750            tool: "opencode".to_string(),
1751            agent_provider: None,
1752            agent_model: None,
1753            title: None,
1754            description: None,
1755            tags: None,
1756            created_at: "2024-01-01T00:00:00Z".to_string(),
1757            uploaded_at: None,
1758            message_count: 2,
1759            user_message_count: 0,
1760            task_count: 4,
1761            event_count: 4,
1762            duration_seconds: 0,
1763            total_input_tokens: 0,
1764            total_output_tokens: 0,
1765            git_remote: None,
1766            git_branch: None,
1767            git_commit: None,
1768            git_repo_name: None,
1769            pr_number: None,
1770            pr_url: None,
1771            working_directory: None,
1772            files_modified: None,
1773            files_read: None,
1774            has_errors: false,
1775            max_active_agents: 1,
1776        };
1777
1778        let parent = LocalSessionRow {
1779            id: "sess_parent".to_string(),
1780            source_path: None,
1781            sync_status: "local_only".to_string(),
1782            last_synced_at: None,
1783            user_id: None,
1784            nickname: None,
1785            team_id: None,
1786            tool: "opencode".to_string(),
1787            agent_provider: None,
1788            agent_model: None,
1789            title: Some("regular".to_string()),
1790            description: None,
1791            tags: None,
1792            created_at: "2024-01-01T00:00:00Z".to_string(),
1793            uploaded_at: None,
1794            message_count: 2,
1795            user_message_count: 1,
1796            task_count: 5,
1797            event_count: 20,
1798            duration_seconds: 0,
1799            total_input_tokens: 0,
1800            total_output_tokens: 0,
1801            git_remote: None,
1802            git_branch: None,
1803            git_commit: None,
1804            git_repo_name: None,
1805            pr_number: None,
1806            pr_url: None,
1807            working_directory: None,
1808            files_modified: None,
1809            files_read: None,
1810            has_errors: false,
1811            max_active_agents: 1,
1812        };
1813
1814        assert!(is_opencode_child_session(&child));
1815        assert!(!is_opencode_child_session(&parent));
1816    }
1817
1818    #[test]
1819    fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1820        let child = LocalSessionRow {
1821            id: "sess_child_3".to_string(),
1822            source_path: None,
1823            sync_status: "local_only".to_string(),
1824            last_synced_at: None,
1825            user_id: None,
1826            nickname: None,
1827            team_id: None,
1828            tool: "opencode".to_string(),
1829            agent_provider: None,
1830            agent_model: None,
1831            title: None,
1832            description: None,
1833            tags: None,
1834            created_at: "2024-01-01T00:00:00Z".to_string(),
1835            uploaded_at: None,
1836            message_count: 3,
1837            user_message_count: 0,
1838            task_count: 2,
1839            event_count: 6,
1840            duration_seconds: 0,
1841            total_input_tokens: 0,
1842            total_output_tokens: 0,
1843            git_remote: None,
1844            git_branch: None,
1845            git_commit: None,
1846            git_repo_name: None,
1847            pr_number: None,
1848            pr_url: None,
1849            working_directory: None,
1850            files_modified: None,
1851            files_read: None,
1852            has_errors: false,
1853            max_active_agents: 1,
1854        };
1855
1856        assert!(is_opencode_child_session(&child));
1857    }
1858
1859    #[test]
1860    fn test_parse_opencode_parent_session_id_aliases() {
1861        let root = temp_root();
1862        let dir = root.path().join("session-aliases");
1863        create_dir_all(&dir).unwrap();
1864        let child_session = dir.join("child.json");
1865        write(
1866            &child_session,
1867            r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1868        )
1869        .unwrap();
1870        assert_eq!(
1871            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1872            Some("ses_parent")
1873        );
1874    }
1875
1876    #[test]
1877    fn test_parse_opencode_parent_session_id_nested_metadata() {
1878        let root = temp_root();
1879        let dir = root.path().join("session-nested");
1880        create_dir_all(&dir).unwrap();
1881        let child_session = dir.join("child.json");
1882        write(
1883            &child_session,
1884            r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1885        )
1886        .unwrap();
1887        assert_eq!(
1888            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1889            Some("ses_parent")
1890        );
1891    }
1892
1893    #[test]
1894    fn test_is_claude_subagent_session() {
1895        let row = make_row(
1896            "ses_parent",
1897            "claude-code",
1898            Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1899        );
1900        assert!(!is_opencode_child_session(&row));
1901        assert!(is_claude_subagent_session(&row));
1902        assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1903    }
1904
1905    #[test]
1906    fn test_hide_opencode_child_sessions() {
1907        let root = temp_root();
1908        let dir = root.path().join("sessions");
1909        create_dir_all(&dir).unwrap();
1910        let parent_session = dir.join("parent.json");
1911        let child_session = dir.join("child.json");
1912        let orphan_session = dir.join("orphan.json");
1913
1914        write(
1915            &parent_session,
1916            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1917        )
1918        .unwrap();
1919        write(
1920            &child_session,
1921            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1922        )
1923        .unwrap();
1924        write(
1925            &orphan_session,
1926            r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1927        )
1928        .unwrap();
1929
1930        let rows = vec![
1931            make_row(
1932                "ses_child",
1933                "opencode",
1934                Some(child_session.to_str().unwrap()),
1935            ),
1936            make_row(
1937                "ses_parent",
1938                "opencode",
1939                Some(parent_session.to_str().unwrap()),
1940            ),
1941            {
1942                let mut row = make_row("ses_other", "codex", None);
1943                row.user_message_count = 1;
1944                row
1945            },
1946            make_row(
1947                "ses_orphan",
1948                "opencode",
1949                Some(orphan_session.to_str().unwrap()),
1950            ),
1951        ];
1952
1953        let filtered = hide_opencode_child_sessions(rows);
1954        assert_eq!(filtered.len(), 3);
1955        assert!(filtered.iter().all(|r| r.id != "ses_child"));
1956    }
1957
1958    #[test]
1959    fn test_sync_cursor() {
1960        let db = test_db();
1961        assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1962        db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1963        assert_eq!(
1964            db.get_sync_cursor("team1").unwrap(),
1965            Some("2024-01-01T00:00:00Z".to_string())
1966        );
1967        // Update
1968        db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1969        assert_eq!(
1970            db.get_sync_cursor("team1").unwrap(),
1971            Some("2024-06-01T00:00:00Z".to_string())
1972        );
1973    }
1974
1975    #[test]
1976    fn test_body_cache() {
1977        let db = test_db();
1978        assert_eq!(db.get_cached_body("s1").unwrap(), None);
1979        db.cache_body("s1", b"hello world").unwrap();
1980        assert_eq!(
1981            db.get_cached_body("s1").unwrap(),
1982            Some(b"hello world".to_vec())
1983        );
1984    }
1985
1986    #[test]
1987    fn test_timeline_summary_cache_roundtrip() {
1988        let db = test_db();
1989        db.upsert_timeline_summary_cache(
1990            "k1",
1991            "timeline:v1",
1992            "compact text",
1993            "{\"kind\":\"turn-summary\"}",
1994            "raw text",
1995        )
1996        .unwrap();
1997
1998        let rows = db
1999            .list_timeline_summary_cache_by_namespace("timeline:v1")
2000            .unwrap();
2001        assert_eq!(rows.len(), 1);
2002        assert_eq!(rows[0].lookup_key, "k1");
2003        assert_eq!(rows[0].namespace, "timeline:v1");
2004        assert_eq!(rows[0].compact, "compact text");
2005        assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2006        assert_eq!(rows[0].raw, "raw text");
2007
2008        let cleared = db.clear_timeline_summary_cache().unwrap();
2009        assert_eq!(cleared, 1);
2010        let rows_after = db
2011            .list_timeline_summary_cache_by_namespace("timeline:v1")
2012            .unwrap();
2013        assert!(rows_after.is_empty());
2014    }
2015
2016    #[test]
2017    fn test_local_migrations_include_timeline_summary_cache() {
2018        let db = test_db();
2019        let conn = db.conn();
2020        let applied: bool = conn
2021            .query_row(
2022                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2023                params!["local_0003_timeline_summary_cache"],
2024                |row| row.get(0),
2025            )
2026            .unwrap();
2027        assert!(applied);
2028    }
2029
2030    #[test]
2031    fn test_local_migration_files_match_api_local_migrations() {
2032        fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2033            std::fs::read_dir(dir)
2034                .expect("read migrations directory")
2035                .filter_map(Result::ok)
2036                .map(|entry| entry.file_name().to_string_lossy().to_string())
2037                .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2038                .collect()
2039        }
2040
2041        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2042        let local_files = collect_local_sql(manifest_dir.join("migrations"));
2043        let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2044
2045        assert_eq!(
2046            local_files, api_files,
2047            "local-db local migrations must stay in parity with api local migrations"
2048        );
2049    }
2050
2051    #[test]
2052    fn test_upsert_remote_session() {
2053        let db = test_db();
2054        let summary = RemoteSessionSummary {
2055            id: "remote-1".to_string(),
2056            user_id: Some("u1".to_string()),
2057            nickname: Some("alice".to_string()),
2058            team_id: "t1".to_string(),
2059            tool: "claude-code".to_string(),
2060            agent_provider: None,
2061            agent_model: None,
2062            title: Some("Test session".to_string()),
2063            description: None,
2064            tags: None,
2065            created_at: "2024-01-01T00:00:00Z".to_string(),
2066            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2067            message_count: 10,
2068            task_count: 2,
2069            event_count: 20,
2070            duration_seconds: 300,
2071            total_input_tokens: 1000,
2072            total_output_tokens: 500,
2073            git_remote: None,
2074            git_branch: None,
2075            git_commit: None,
2076            git_repo_name: None,
2077            pr_number: None,
2078            pr_url: None,
2079            working_directory: None,
2080            files_modified: None,
2081            files_read: None,
2082            has_errors: false,
2083            max_active_agents: 1,
2084        };
2085        db.upsert_remote_session(&summary).unwrap();
2086
2087        let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2088        assert_eq!(sessions.len(), 1);
2089        assert_eq!(sessions[0].id, "remote-1");
2090        assert_eq!(sessions[0].sync_status, "remote_only");
2091        assert_eq!(sessions[0].nickname, None); // no user in local users table
2092    }
2093
2094    #[test]
2095    fn test_list_filter_by_repo() {
2096        let db = test_db();
2097        // Insert a remote session with team_id
2098        let summary1 = RemoteSessionSummary {
2099            id: "s1".to_string(),
2100            user_id: None,
2101            nickname: None,
2102            team_id: "t1".to_string(),
2103            tool: "claude-code".to_string(),
2104            agent_provider: None,
2105            agent_model: None,
2106            title: Some("Session 1".to_string()),
2107            description: None,
2108            tags: None,
2109            created_at: "2024-01-01T00:00:00Z".to_string(),
2110            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2111            message_count: 5,
2112            task_count: 0,
2113            event_count: 10,
2114            duration_seconds: 60,
2115            total_input_tokens: 100,
2116            total_output_tokens: 50,
2117            git_remote: None,
2118            git_branch: None,
2119            git_commit: None,
2120            git_repo_name: None,
2121            pr_number: None,
2122            pr_url: None,
2123            working_directory: None,
2124            files_modified: None,
2125            files_read: None,
2126            has_errors: false,
2127            max_active_agents: 1,
2128        };
2129        db.upsert_remote_session(&summary1).unwrap();
2130
2131        // Filter by team
2132        let filter = LocalSessionFilter {
2133            team_id: Some("t1".to_string()),
2134            ..Default::default()
2135        };
2136        assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2137
2138        let filter = LocalSessionFilter {
2139            team_id: Some("t999".to_string()),
2140            ..Default::default()
2141        };
2142        assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2143    }
2144
2145    // ── Helpers for inserting test sessions ────────────────────────────
2146
2147    fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2148        RemoteSessionSummary {
2149            id: id.to_string(),
2150            user_id: None,
2151            nickname: None,
2152            team_id: "t1".to_string(),
2153            tool: tool.to_string(),
2154            agent_provider: Some("anthropic".to_string()),
2155            agent_model: Some("claude-opus-4-6".to_string()),
2156            title: Some(title.to_string()),
2157            description: None,
2158            tags: None,
2159            created_at: created_at.to_string(),
2160            uploaded_at: created_at.to_string(),
2161            message_count: 5,
2162            task_count: 1,
2163            event_count: 10,
2164            duration_seconds: 300,
2165            total_input_tokens: 1000,
2166            total_output_tokens: 500,
2167            git_remote: None,
2168            git_branch: None,
2169            git_commit: None,
2170            git_repo_name: None,
2171            pr_number: None,
2172            pr_url: None,
2173            working_directory: None,
2174            files_modified: None,
2175            files_read: None,
2176            has_errors: false,
2177            max_active_agents: 1,
2178        }
2179    }
2180
2181    fn seed_sessions(db: &LocalDb) {
2182        // Insert 5 sessions across two tools, ordered by created_at
2183        db.upsert_remote_session(&make_summary(
2184            "s1",
2185            "claude-code",
2186            "First session",
2187            "2024-01-01T00:00:00Z",
2188        ))
2189        .unwrap();
2190        db.upsert_remote_session(&make_summary(
2191            "s2",
2192            "claude-code",
2193            "JWT auth work",
2194            "2024-01-02T00:00:00Z",
2195        ))
2196        .unwrap();
2197        db.upsert_remote_session(&make_summary(
2198            "s3",
2199            "gemini",
2200            "Gemini test",
2201            "2024-01-03T00:00:00Z",
2202        ))
2203        .unwrap();
2204        db.upsert_remote_session(&make_summary(
2205            "s4",
2206            "claude-code",
2207            "Error handling",
2208            "2024-01-04T00:00:00Z",
2209        ))
2210        .unwrap();
2211        db.upsert_remote_session(&make_summary(
2212            "s5",
2213            "claude-code",
2214            "Final polish",
2215            "2024-01-05T00:00:00Z",
2216        ))
2217        .unwrap();
2218    }
2219
2220    // ── list_sessions_log tests ────────────────────────────────────────
2221
2222    #[test]
2223    fn test_log_no_filters() {
2224        let db = test_db();
2225        seed_sessions(&db);
2226        let filter = LogFilter::default();
2227        let results = db.list_sessions_log(&filter).unwrap();
2228        assert_eq!(results.len(), 5);
2229        // Should be ordered by created_at DESC
2230        assert_eq!(results[0].id, "s5");
2231        assert_eq!(results[4].id, "s1");
2232    }
2233
2234    #[test]
2235    fn test_log_filter_by_tool() {
2236        let db = test_db();
2237        seed_sessions(&db);
2238        let filter = LogFilter {
2239            tool: Some("claude-code".to_string()),
2240            ..Default::default()
2241        };
2242        let results = db.list_sessions_log(&filter).unwrap();
2243        assert_eq!(results.len(), 4);
2244        assert!(results.iter().all(|s| s.tool == "claude-code"));
2245    }
2246
2247    #[test]
2248    fn test_log_filter_by_model_wildcard() {
2249        let db = test_db();
2250        seed_sessions(&db);
2251        let filter = LogFilter {
2252            model: Some("claude*".to_string()),
2253            ..Default::default()
2254        };
2255        let results = db.list_sessions_log(&filter).unwrap();
2256        assert_eq!(results.len(), 5); // all have claude-opus model
2257    }
2258
2259    #[test]
2260    fn test_log_filter_since() {
2261        let db = test_db();
2262        seed_sessions(&db);
2263        let filter = LogFilter {
2264            since: Some("2024-01-03T00:00:00Z".to_string()),
2265            ..Default::default()
2266        };
2267        let results = db.list_sessions_log(&filter).unwrap();
2268        assert_eq!(results.len(), 3); // s3, s4, s5
2269    }
2270
2271    #[test]
2272    fn test_log_filter_before() {
2273        let db = test_db();
2274        seed_sessions(&db);
2275        let filter = LogFilter {
2276            before: Some("2024-01-03T00:00:00Z".to_string()),
2277            ..Default::default()
2278        };
2279        let results = db.list_sessions_log(&filter).unwrap();
2280        assert_eq!(results.len(), 2); // s1, s2
2281    }
2282
2283    #[test]
2284    fn test_log_filter_since_and_before() {
2285        let db = test_db();
2286        seed_sessions(&db);
2287        let filter = LogFilter {
2288            since: Some("2024-01-02T00:00:00Z".to_string()),
2289            before: Some("2024-01-04T00:00:00Z".to_string()),
2290            ..Default::default()
2291        };
2292        let results = db.list_sessions_log(&filter).unwrap();
2293        assert_eq!(results.len(), 2); // s2, s3
2294    }
2295
2296    #[test]
2297    fn test_log_filter_grep() {
2298        let db = test_db();
2299        seed_sessions(&db);
2300        let filter = LogFilter {
2301            grep: Some("JWT".to_string()),
2302            ..Default::default()
2303        };
2304        let results = db.list_sessions_log(&filter).unwrap();
2305        assert_eq!(results.len(), 1);
2306        assert_eq!(results[0].id, "s2");
2307    }
2308
2309    #[test]
2310    fn test_log_limit_and_offset() {
2311        let db = test_db();
2312        seed_sessions(&db);
2313        let filter = LogFilter {
2314            limit: Some(2),
2315            offset: Some(1),
2316            ..Default::default()
2317        };
2318        let results = db.list_sessions_log(&filter).unwrap();
2319        assert_eq!(results.len(), 2);
2320        assert_eq!(results[0].id, "s4"); // second most recent
2321        assert_eq!(results[1].id, "s3");
2322    }
2323
2324    #[test]
2325    fn test_log_limit_only() {
2326        let db = test_db();
2327        seed_sessions(&db);
2328        let filter = LogFilter {
2329            limit: Some(3),
2330            ..Default::default()
2331        };
2332        let results = db.list_sessions_log(&filter).unwrap();
2333        assert_eq!(results.len(), 3);
2334    }
2335
2336    #[test]
2337    fn test_list_sessions_limit_offset() {
2338        let db = test_db();
2339        seed_sessions(&db);
2340        let filter = LocalSessionFilter {
2341            limit: Some(2),
2342            offset: Some(1),
2343            ..Default::default()
2344        };
2345        let results = db.list_sessions(&filter).unwrap();
2346        assert_eq!(results.len(), 2);
2347        assert_eq!(results[0].id, "s4");
2348        assert_eq!(results[1].id, "s3");
2349    }
2350
2351    #[test]
2352    fn test_count_sessions_filtered() {
2353        let db = test_db();
2354        seed_sessions(&db);
2355        let count = db
2356            .count_sessions_filtered(&LocalSessionFilter::default())
2357            .unwrap();
2358        assert_eq!(count, 5);
2359    }
2360
2361    #[test]
2362    fn test_list_session_tools() {
2363        let db = test_db();
2364        seed_sessions(&db);
2365        let tools = db
2366            .list_session_tools(&LocalSessionFilter::default())
2367            .unwrap();
2368        assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2369    }
2370
2371    #[test]
2372    fn test_log_combined_filters() {
2373        let db = test_db();
2374        seed_sessions(&db);
2375        let filter = LogFilter {
2376            tool: Some("claude-code".to_string()),
2377            since: Some("2024-01-03T00:00:00Z".to_string()),
2378            limit: Some(1),
2379            ..Default::default()
2380        };
2381        let results = db.list_sessions_log(&filter).unwrap();
2382        assert_eq!(results.len(), 1);
2383        assert_eq!(results[0].id, "s5"); // most recent claude-code after Jan 3
2384    }
2385
2386    // ── Session offset/latest tests ────────────────────────────────────
2387
2388    #[test]
2389    fn test_get_session_by_offset() {
2390        let db = test_db();
2391        seed_sessions(&db);
2392        let row = db.get_session_by_offset(0).unwrap().unwrap();
2393        assert_eq!(row.id, "s5"); // most recent
2394        let row = db.get_session_by_offset(2).unwrap().unwrap();
2395        assert_eq!(row.id, "s3");
2396        assert!(db.get_session_by_offset(10).unwrap().is_none());
2397    }
2398
2399    #[test]
2400    fn test_get_session_by_tool_offset() {
2401        let db = test_db();
2402        seed_sessions(&db);
2403        let row = db
2404            .get_session_by_tool_offset("claude-code", 0)
2405            .unwrap()
2406            .unwrap();
2407        assert_eq!(row.id, "s5");
2408        let row = db
2409            .get_session_by_tool_offset("claude-code", 1)
2410            .unwrap()
2411            .unwrap();
2412        assert_eq!(row.id, "s4");
2413        let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2414        assert_eq!(row.id, "s3");
2415        assert!(db
2416            .get_session_by_tool_offset("gemini", 1)
2417            .unwrap()
2418            .is_none());
2419    }
2420
2421    #[test]
2422    fn test_get_sessions_latest() {
2423        let db = test_db();
2424        seed_sessions(&db);
2425        let rows = db.get_sessions_latest(3).unwrap();
2426        assert_eq!(rows.len(), 3);
2427        assert_eq!(rows[0].id, "s5");
2428        assert_eq!(rows[1].id, "s4");
2429        assert_eq!(rows[2].id, "s3");
2430    }
2431
2432    #[test]
2433    fn test_get_sessions_by_tool_latest() {
2434        let db = test_db();
2435        seed_sessions(&db);
2436        let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2437        assert_eq!(rows.len(), 2);
2438        assert_eq!(rows[0].id, "s5");
2439        assert_eq!(rows[1].id, "s4");
2440    }
2441
2442    #[test]
2443    fn test_get_sessions_latest_more_than_available() {
2444        let db = test_db();
2445        seed_sessions(&db);
2446        let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2447        assert_eq!(rows.len(), 1); // only 1 gemini session
2448    }
2449
2450    #[test]
2451    fn test_session_count() {
2452        let db = test_db();
2453        assert_eq!(db.session_count().unwrap(), 0);
2454        seed_sessions(&db);
2455        assert_eq!(db.session_count().unwrap(), 5);
2456    }
2457
2458    // ── Commit link tests ─────────────────────────────────────────────
2459
2460    #[test]
2461    fn test_link_commit_session() {
2462        let db = test_db();
2463        seed_sessions(&db);
2464        db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2465            .unwrap();
2466
2467        let commits = db.get_commits_by_session("s1").unwrap();
2468        assert_eq!(commits.len(), 1);
2469        assert_eq!(commits[0].commit_hash, "abc123");
2470        assert_eq!(commits[0].session_id, "s1");
2471        assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2472        assert_eq!(commits[0].branch.as_deref(), Some("main"));
2473
2474        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2475        assert_eq!(sessions.len(), 1);
2476        assert_eq!(sessions[0].id, "s1");
2477    }
2478
2479    #[test]
2480    fn test_get_sessions_by_commit() {
2481        let db = test_db();
2482        seed_sessions(&db);
2483        // Link multiple sessions to the same commit
2484        db.link_commit_session("abc123", "s1", None, None).unwrap();
2485        db.link_commit_session("abc123", "s2", None, None).unwrap();
2486        db.link_commit_session("abc123", "s3", None, None).unwrap();
2487
2488        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2489        assert_eq!(sessions.len(), 3);
2490        // Ordered by created_at DESC
2491        assert_eq!(sessions[0].id, "s3");
2492        assert_eq!(sessions[1].id, "s2");
2493        assert_eq!(sessions[2].id, "s1");
2494    }
2495
2496    #[test]
2497    fn test_get_commits_by_session() {
2498        let db = test_db();
2499        seed_sessions(&db);
2500        // Link multiple commits to the same session
2501        db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2502            .unwrap();
2503        db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2504            .unwrap();
2505        db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2506            .unwrap();
2507
2508        let commits = db.get_commits_by_session("s1").unwrap();
2509        assert_eq!(commits.len(), 3);
2510        // All linked to s1
2511        assert!(commits.iter().all(|c| c.session_id == "s1"));
2512    }
2513
2514    #[test]
2515    fn test_duplicate_link_ignored() {
2516        let db = test_db();
2517        seed_sessions(&db);
2518        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2519            .unwrap();
2520        // Inserting the same link again should not error
2521        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2522            .unwrap();
2523
2524        let commits = db.get_commits_by_session("s1").unwrap();
2525        assert_eq!(commits.len(), 1);
2526    }
2527
2528    #[test]
2529    fn test_log_filter_by_commit() {
2530        let db = test_db();
2531        seed_sessions(&db);
2532        db.link_commit_session("abc123", "s2", None, None).unwrap();
2533        db.link_commit_session("abc123", "s4", None, None).unwrap();
2534
2535        let filter = LogFilter {
2536            commit: Some("abc123".to_string()),
2537            ..Default::default()
2538        };
2539        let results = db.list_sessions_log(&filter).unwrap();
2540        assert_eq!(results.len(), 2);
2541        assert_eq!(results[0].id, "s4");
2542        assert_eq!(results[1].id, "s2");
2543
2544        // Non-existent commit returns nothing
2545        let filter = LogFilter {
2546            commit: Some("nonexistent".to_string()),
2547            ..Default::default()
2548        };
2549        let results = db.list_sessions_log(&filter).unwrap();
2550        assert_eq!(results.len(), 0);
2551    }
2552}