Skip to main content

opensession_local_db/
lib.rs

1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16// Keep local index/cache schema aligned with remote/session schema migrations.
17const REMOTE_MIGRATIONS: &[Migration] = &[
18    ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19    (
20        "0002_team_invite_keys",
21        include_str!("../migrations/0002_team_invite_keys.sql"),
22    ),
23    (
24        "0003_max_active_agents",
25        include_str!("../migrations/0003_max_active_agents.sql"),
26    ),
27    (
28        "0004_oauth_states_provider",
29        include_str!("../migrations/0004_oauth_states_provider.sql"),
30    ),
31    (
32        "0005_sessions_body_url_backfill",
33        include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34    ),
35    (
36        "0006_sessions_remove_fk_constraints",
37        include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38    ),
39    (
40        "0007_sessions_list_perf_indexes",
41        include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42    ),
43    (
44        "0008_teams_force_public",
45        include_str!("../migrations/0008_teams_force_public.sql"),
46    ),
47    (
48        "0009_session_score_plugin",
49        include_str!("../migrations/0009_session_score_plugin.sql"),
50    ),
51];
52
53const LOCAL_MIGRATIONS: &[Migration] = &[
54    (
55        "local_0001_schema",
56        include_str!("../migrations/local_0001_schema.sql"),
57    ),
58    (
59        "local_0002_drop_unused_local_sessions",
60        include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
61    ),
62    (
63        "local_0003_timeline_summary_cache",
64        include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
65    ),
66];
67
68/// A local session row stored in the local SQLite index/cache database.
69#[derive(Debug, Clone)]
70pub struct LocalSessionRow {
71    pub id: String,
72    pub source_path: Option<String>,
73    pub sync_status: String,
74    pub last_synced_at: Option<String>,
75    pub user_id: Option<String>,
76    pub nickname: Option<String>,
77    pub team_id: Option<String>,
78    pub tool: String,
79    pub agent_provider: Option<String>,
80    pub agent_model: Option<String>,
81    pub title: Option<String>,
82    pub description: Option<String>,
83    pub tags: Option<String>,
84    pub created_at: String,
85    pub uploaded_at: Option<String>,
86    pub message_count: i64,
87    pub user_message_count: i64,
88    pub task_count: i64,
89    pub event_count: i64,
90    pub duration_seconds: i64,
91    pub total_input_tokens: i64,
92    pub total_output_tokens: i64,
93    pub git_remote: Option<String>,
94    pub git_branch: Option<String>,
95    pub git_commit: Option<String>,
96    pub git_repo_name: Option<String>,
97    pub pr_number: Option<i64>,
98    pub pr_url: Option<String>,
99    pub working_directory: Option<String>,
100    pub files_modified: Option<String>,
101    pub files_read: Option<String>,
102    pub has_errors: bool,
103    pub max_active_agents: i64,
104}
105
106/// A link between a git commit and an AI session.
107#[derive(Debug, Clone)]
108pub struct CommitLink {
109    pub commit_hash: String,
110    pub session_id: String,
111    pub repo_path: Option<String>,
112    pub branch: Option<String>,
113    pub created_at: String,
114}
115
116/// A cached timeline summary row stored in the local index/cache DB.
117#[derive(Debug, Clone)]
118pub struct TimelineSummaryCacheRow {
119    pub lookup_key: String,
120    pub namespace: String,
121    pub compact: String,
122    pub payload: String,
123    pub raw: String,
124    pub cached_at: String,
125}
126
127/// Return true when a cached row corresponds to an OpenCode child session.
128pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
129    if row.tool != "opencode" {
130        return false;
131    }
132
133    // Strong heuristic: some orchestration artifacts are generated as sessions
134    // that do not carry a user-visible request surface and only contain system
135    // or agent-only events.
136    if row.user_message_count <= 0
137        && row.message_count <= 4
138        && row.task_count <= 4
139        && row.event_count > 0
140        && row.event_count <= 16
141    {
142        return true;
143    }
144
145    if is_opencode_subagent_source(row.source_path.as_deref()) {
146        return true;
147    }
148
149    let source_path = match row.source_path.as_deref() {
150        Some(path) if !path.trim().is_empty() => path,
151        _ => return false,
152    };
153
154    parse_opencode_parent_session_id(source_path)
155        .is_some_and(|parent_id| !parent_id.trim().is_empty())
156}
157
158/// Parse `parentID` / `parentId` from an OpenCode session JSON file.
159pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
160    let text = fs::read_to_string(source_path).ok()?;
161    let json: Value = serde_json::from_str(&text).ok()?;
162    lookup_parent_session_id(&json)
163}
164
165fn lookup_parent_session_id(value: &Value) -> Option<String> {
166    match value {
167        Value::Object(obj) => {
168            for (key, value) in obj {
169                if is_parent_id_key(key) {
170                    if let Some(parent_id) = value.as_str() {
171                        let parent_id = parent_id.trim();
172                        if !parent_id.is_empty() {
173                            return Some(parent_id.to_string());
174                        }
175                    }
176                }
177                if let Some(parent_id) = lookup_parent_session_id(value) {
178                    return Some(parent_id);
179                }
180            }
181            None
182        }
183        Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
184        _ => None,
185    }
186}
187
188fn is_parent_id_key(key: &str) -> bool {
189    let flat = key
190        .chars()
191        .filter(|c| c.is_ascii_alphanumeric())
192        .map(|c| c.to_ascii_lowercase())
193        .collect::<String>();
194
195    flat == "parentid"
196        || flat == "parentuuid"
197        || flat == "parentsessionid"
198        || flat == "parentsessionuuid"
199        || flat.ends_with("parentsessionid")
200        || (flat.contains("parent") && flat.ends_with("id"))
201        || (flat.contains("parent") && flat.ends_with("uuid"))
202}
203
204/// Remove OpenCode child sessions so only parent sessions remain visible.
205pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
206    rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
207    rows
208}
209
210fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
211    is_subagent_source(source_path)
212}
213
214fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
215    if row.tool != "claude-code" {
216        return false;
217    }
218
219    is_subagent_source(row.source_path.as_deref())
220}
221
222fn is_subagent_source(source_path: Option<&str>) -> bool {
223    let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
224        return false;
225    };
226
227    if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
228        return true;
229    }
230
231    let filename = match std::path::Path::new(&source_path).file_name() {
232        Some(name) => name.to_string_lossy(),
233        None => return false,
234    };
235
236    filename.starts_with("agent-")
237        || filename.starts_with("agent_")
238        || filename.starts_with("subagent-")
239        || filename.starts_with("subagent_")
240}
241
242/// Filter for listing sessions from the local DB.
243#[derive(Debug, Clone)]
244pub struct LocalSessionFilter {
245    pub team_id: Option<String>,
246    pub sync_status: Option<String>,
247    pub git_repo_name: Option<String>,
248    pub search: Option<String>,
249    pub tool: Option<String>,
250    pub sort: LocalSortOrder,
251    pub time_range: LocalTimeRange,
252    pub limit: Option<u32>,
253    pub offset: Option<u32>,
254}
255
256impl Default for LocalSessionFilter {
257    fn default() -> Self {
258        Self {
259            team_id: None,
260            sync_status: None,
261            git_repo_name: None,
262            search: None,
263            tool: None,
264            sort: LocalSortOrder::Recent,
265            time_range: LocalTimeRange::All,
266            limit: None,
267            offset: None,
268        }
269    }
270}
271
272/// Sort order for local session listing.
273#[derive(Debug, Clone, Default, PartialEq, Eq)]
274pub enum LocalSortOrder {
275    #[default]
276    Recent,
277    Popular,
278    Longest,
279}
280
281/// Time range filter for local session listing.
282#[derive(Debug, Clone, Default, PartialEq, Eq)]
283pub enum LocalTimeRange {
284    Hours24,
285    Days7,
286    Days30,
287    #[default]
288    All,
289}
290
291/// Minimal remote session payload needed for local index/cache upsert.
292#[derive(Debug, Clone)]
293pub struct RemoteSessionSummary {
294    pub id: String,
295    pub user_id: Option<String>,
296    pub nickname: Option<String>,
297    pub team_id: String,
298    pub tool: String,
299    pub agent_provider: Option<String>,
300    pub agent_model: Option<String>,
301    pub title: Option<String>,
302    pub description: Option<String>,
303    pub tags: Option<String>,
304    pub created_at: String,
305    pub uploaded_at: String,
306    pub message_count: i64,
307    pub task_count: i64,
308    pub event_count: i64,
309    pub duration_seconds: i64,
310    pub total_input_tokens: i64,
311    pub total_output_tokens: i64,
312    pub git_remote: Option<String>,
313    pub git_branch: Option<String>,
314    pub git_commit: Option<String>,
315    pub git_repo_name: Option<String>,
316    pub pr_number: Option<i64>,
317    pub pr_url: Option<String>,
318    pub working_directory: Option<String>,
319    pub files_modified: Option<String>,
320    pub files_read: Option<String>,
321    pub has_errors: bool,
322    pub max_active_agents: i64,
323}
324
325/// Extended filter for the `log` command.
326#[derive(Debug, Default)]
327pub struct LogFilter {
328    /// Filter by tool name (exact match).
329    pub tool: Option<String>,
330    /// Filter by model (glob-like, uses LIKE).
331    pub model: Option<String>,
332    /// Filter sessions created after this ISO8601 timestamp.
333    pub since: Option<String>,
334    /// Filter sessions created before this ISO8601 timestamp.
335    pub before: Option<String>,
336    /// Filter sessions that touched this file path (searches files_modified JSON).
337    pub touches: Option<String>,
338    /// Free-text search in title, description, tags.
339    pub grep: Option<String>,
340    /// Only sessions with errors.
341    pub has_errors: Option<bool>,
342    /// Filter by working directory (prefix match).
343    pub working_directory: Option<String>,
344    /// Filter by git repo name.
345    pub git_repo_name: Option<String>,
346    /// Filter sessions linked to this git commit hash.
347    pub commit: Option<String>,
348    /// Maximum number of results.
349    pub limit: Option<u32>,
350    /// Offset for pagination.
351    pub offset: Option<u32>,
352}
353
354/// Base FROM clause for session list queries.
355const FROM_CLAUSE: &str = "\
356FROM sessions s \
357LEFT JOIN session_sync ss ON ss.session_id = s.id \
358LEFT JOIN users u ON u.id = s.user_id";
359
360/// Local SQLite index/cache shared by TUI and Daemon.
361/// This is not the source of truth for canonical session bodies.
362/// Thread-safe: wraps the connection in a Mutex so it can be shared via `Arc<LocalDb>`.
363pub struct LocalDb {
364    conn: Mutex<Connection>,
365}
366
367impl LocalDb {
368    /// Open (or create) the local database at the default path.
369    /// `~/.local/share/opensession/local.db`
370    pub fn open() -> Result<Self> {
371        let path = default_db_path()?;
372        Self::open_path(&path)
373    }
374
375    /// Open (or create) the local database at a specific path.
376    pub fn open_path(path: &PathBuf) -> Result<Self> {
377        if let Some(parent) = path.parent() {
378            std::fs::create_dir_all(parent)
379                .with_context(|| format!("create dir for {}", path.display()))?;
380        }
381        match open_connection_with_latest_schema(path) {
382            Ok(conn) => Ok(Self {
383                conn: Mutex::new(conn),
384            }),
385            Err(err) => {
386                if !is_schema_compat_error(&err) {
387                    return Err(err);
388                }
389
390                // Local DB is a cache. If schema migration cannot safely reconcile
391                // a legacy/corrupted file, rotate it out and recreate with latest schema.
392                rotate_legacy_db(path)?;
393
394                let conn = open_connection_with_latest_schema(path)
395                    .with_context(|| format!("recreate db {}", path.display()))?;
396                Ok(Self {
397                    conn: Mutex::new(conn),
398                })
399            }
400        }
401    }
402
403    fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
404        self.conn.lock().expect("local db mutex poisoned")
405    }
406
407    // ── Upsert local session (parsed from file) ────────────────────────
408
409    pub fn upsert_local_session(
410        &self,
411        session: &Session,
412        source_path: &str,
413        git: &GitContext,
414    ) -> Result<()> {
415        let title = session.context.title.as_deref();
416        let description = session.context.description.as_deref();
417        let tags = if session.context.tags.is_empty() {
418            None
419        } else {
420            Some(session.context.tags.join(","))
421        };
422        let created_at = session.context.created_at.to_rfc3339();
423        let cwd = session
424            .context
425            .attributes
426            .get("cwd")
427            .or_else(|| session.context.attributes.get("working_directory"))
428            .and_then(|v| v.as_str().map(String::from));
429
430        // Extract files_modified, files_read, and has_errors from events
431        let (files_modified, files_read, has_errors) =
432            opensession_core::extract::extract_file_metadata(session);
433        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
434
435        let conn = self.conn();
436        // NOTE: `body_storage_key` is a deprecated compatibility column kept
437        // for legacy schema parity during migration.
438        conn.execute(
439            "INSERT INTO sessions \
440             (id, team_id, tool, agent_provider, agent_model, \
441              title, description, tags, created_at, \
442              message_count, user_message_count, task_count, event_count, duration_seconds, \
443              total_input_tokens, total_output_tokens, body_storage_key, \
444              git_remote, git_branch, git_commit, git_repo_name, working_directory, \
445              files_modified, files_read, has_errors, max_active_agents) \
446             VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
447             ON CONFLICT(id) DO UPDATE SET \
448              tool=excluded.tool, agent_provider=excluded.agent_provider, \
449              agent_model=excluded.agent_model, \
450              title=excluded.title, description=excluded.description, \
451              tags=excluded.tags, \
452              message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
453              task_count=excluded.task_count, \
454              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
455              total_input_tokens=excluded.total_input_tokens, \
456              total_output_tokens=excluded.total_output_tokens, \
457              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
458              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
459              working_directory=excluded.working_directory, \
460              files_modified=excluded.files_modified, files_read=excluded.files_read, \
461              has_errors=excluded.has_errors, \
462              max_active_agents=excluded.max_active_agents",
463            params![
464                &session.session_id,
465                &session.agent.tool,
466                &session.agent.provider,
467                &session.agent.model,
468                title,
469                description,
470                &tags,
471                &created_at,
472                session.stats.message_count as i64,
473                session.stats.user_message_count as i64,
474                session.stats.task_count as i64,
475                session.stats.event_count as i64,
476                session.stats.duration_seconds as i64,
477                session.stats.total_input_tokens as i64,
478                session.stats.total_output_tokens as i64,
479                &git.remote,
480                &git.branch,
481                &git.commit,
482                &git.repo_name,
483                &cwd,
484                &files_modified,
485                &files_read,
486                has_errors,
487                max_active_agents,
488            ],
489        )?;
490
491        conn.execute(
492            "INSERT INTO session_sync (session_id, source_path, sync_status) \
493             VALUES (?1, ?2, 'local_only') \
494             ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
495            params![&session.session_id, source_path],
496        )?;
497        Ok(())
498    }
499
500    // ── Upsert remote session (from server sync pull) ──────────────────
501
502    pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
503        let conn = self.conn();
504        // NOTE: `body_storage_key` is a deprecated compatibility column kept
505        // for legacy schema parity during migration.
506        conn.execute(
507            "INSERT INTO sessions \
508             (id, user_id, team_id, tool, agent_provider, agent_model, \
509              title, description, tags, created_at, uploaded_at, \
510              message_count, task_count, event_count, duration_seconds, \
511              total_input_tokens, total_output_tokens, body_storage_key, \
512              git_remote, git_branch, git_commit, git_repo_name, \
513              pr_number, pr_url, working_directory, \
514              files_modified, files_read, has_errors, max_active_agents) \
515             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
516             ON CONFLICT(id) DO UPDATE SET \
517              title=excluded.title, description=excluded.description, \
518              tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
519              message_count=excluded.message_count, task_count=excluded.task_count, \
520              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
521              total_input_tokens=excluded.total_input_tokens, \
522              total_output_tokens=excluded.total_output_tokens, \
523              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
524              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
525              pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
526              working_directory=excluded.working_directory, \
527              files_modified=excluded.files_modified, files_read=excluded.files_read, \
528              has_errors=excluded.has_errors, \
529              max_active_agents=excluded.max_active_agents",
530            params![
531                &summary.id,
532                &summary.user_id,
533                &summary.team_id,
534                &summary.tool,
535                &summary.agent_provider,
536                &summary.agent_model,
537                &summary.title,
538                &summary.description,
539                &summary.tags,
540                &summary.created_at,
541                &summary.uploaded_at,
542                summary.message_count,
543                summary.task_count,
544                summary.event_count,
545                summary.duration_seconds,
546                summary.total_input_tokens,
547                summary.total_output_tokens,
548                &summary.git_remote,
549                &summary.git_branch,
550                &summary.git_commit,
551                &summary.git_repo_name,
552                summary.pr_number,
553                &summary.pr_url,
554                &summary.working_directory,
555                &summary.files_modified,
556                &summary.files_read,
557                summary.has_errors,
558                summary.max_active_agents,
559            ],
560        )?;
561
562        conn.execute(
563            "INSERT INTO session_sync (session_id, sync_status) \
564             VALUES (?1, 'remote_only') \
565             ON CONFLICT(session_id) DO UPDATE SET \
566              sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
567            params![&summary.id],
568        )?;
569        Ok(())
570    }
571
572    // ── List sessions ──────────────────────────────────────────────────
573
574    fn build_local_session_where_clause(
575        filter: &LocalSessionFilter,
576    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
577        let mut where_clauses = vec![
578            "1=1".to_string(),
579            "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
580            "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
581            "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
582        ];
583        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
584        let mut idx = 1u32;
585
586        if let Some(ref team_id) = filter.team_id {
587            where_clauses.push(format!("s.team_id = ?{idx}"));
588            param_values.push(Box::new(team_id.clone()));
589            idx += 1;
590        }
591
592        if let Some(ref sync_status) = filter.sync_status {
593            where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
594            param_values.push(Box::new(sync_status.clone()));
595            idx += 1;
596        }
597
598        if let Some(ref repo) = filter.git_repo_name {
599            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
600            param_values.push(Box::new(repo.clone()));
601            idx += 1;
602        }
603
604        if let Some(ref tool) = filter.tool {
605            where_clauses.push(format!("s.tool = ?{idx}"));
606            param_values.push(Box::new(tool.clone()));
607            idx += 1;
608        }
609
610        if let Some(ref search) = filter.search {
611            let like = format!("%{search}%");
612            where_clauses.push(format!(
613                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
614                i1 = idx,
615                i2 = idx + 1,
616                i3 = idx + 2,
617            ));
618            param_values.push(Box::new(like.clone()));
619            param_values.push(Box::new(like.clone()));
620            param_values.push(Box::new(like));
621            idx += 3;
622        }
623
624        let interval = match filter.time_range {
625            LocalTimeRange::Hours24 => Some("-1 day"),
626            LocalTimeRange::Days7 => Some("-7 days"),
627            LocalTimeRange::Days30 => Some("-30 days"),
628            LocalTimeRange::All => None,
629        };
630        if let Some(interval) = interval {
631            where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
632            param_values.push(Box::new(interval.to_string()));
633        }
634
635        (where_clauses.join(" AND "), param_values)
636    }
637
638    pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
639        let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
640        let order_clause = match filter.sort {
641            LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
642            LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
643            LocalSortOrder::Recent => "s.created_at DESC",
644        };
645
646        let mut sql = format!(
647            "SELECT {LOCAL_SESSION_COLUMNS} \
648             {FROM_CLAUSE} WHERE {where_str} \
649             ORDER BY {order_clause}"
650        );
651
652        if let Some(limit) = filter.limit {
653            sql.push_str(" LIMIT ?");
654            param_values.push(Box::new(limit));
655            if let Some(offset) = filter.offset {
656                sql.push_str(" OFFSET ?");
657                param_values.push(Box::new(offset));
658            }
659        }
660
661        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
662            param_values.iter().map(|p| p.as_ref()).collect();
663        let conn = self.conn();
664        let mut stmt = conn.prepare(&sql)?;
665        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
666
667        let mut result = Vec::new();
668        for row in rows {
669            result.push(row?);
670        }
671
672        Ok(hide_opencode_child_sessions(result))
673    }
674
675    /// Count sessions for a given list filter (before UI-level page slicing).
676    pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
677        let mut count_filter = filter.clone();
678        count_filter.limit = None;
679        count_filter.offset = None;
680        let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
681        let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
682        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
683            param_values.iter().map(|p| p.as_ref()).collect();
684        let conn = self.conn();
685        let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
686        Ok(count)
687    }
688
689    /// List distinct tool names for the current list filter (ignores active tool filter).
690    pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
691        let mut tool_filter = filter.clone();
692        tool_filter.tool = None;
693        tool_filter.limit = None;
694        tool_filter.offset = None;
695        let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
696        let sql = format!(
697            "SELECT DISTINCT s.tool \
698             {FROM_CLAUSE} WHERE {where_str} \
699             ORDER BY s.tool ASC"
700        );
701        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
702            param_values.iter().map(|p| p.as_ref()).collect();
703        let conn = self.conn();
704        let mut stmt = conn.prepare(&sql)?;
705        let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
706
707        let mut tools = Vec::new();
708        for row in rows {
709            let tool = row?;
710            if !tool.trim().is_empty() {
711                tools.push(tool);
712            }
713        }
714        Ok(tools)
715    }
716
717    // ── Log query ─────────────────────────────────────────────────────
718
719    /// Query sessions with extended filters for the `log` command.
720    pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
721        let mut where_clauses = vec!["1=1".to_string()];
722        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
723        let mut idx = 1u32;
724
725        if let Some(ref tool) = filter.tool {
726            where_clauses.push(format!("s.tool = ?{idx}"));
727            param_values.push(Box::new(tool.clone()));
728            idx += 1;
729        }
730
731        if let Some(ref model) = filter.model {
732            let like = model.replace('*', "%");
733            where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
734            param_values.push(Box::new(like));
735            idx += 1;
736        }
737
738        if let Some(ref since) = filter.since {
739            where_clauses.push(format!("s.created_at >= ?{idx}"));
740            param_values.push(Box::new(since.clone()));
741            idx += 1;
742        }
743
744        if let Some(ref before) = filter.before {
745            where_clauses.push(format!("s.created_at < ?{idx}"));
746            param_values.push(Box::new(before.clone()));
747            idx += 1;
748        }
749
750        if let Some(ref touches) = filter.touches {
751            let like = format!("%\"{touches}\"%");
752            where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
753            param_values.push(Box::new(like));
754            idx += 1;
755        }
756
757        if let Some(ref grep) = filter.grep {
758            let like = format!("%{grep}%");
759            where_clauses.push(format!(
760                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
761                i1 = idx,
762                i2 = idx + 1,
763                i3 = idx + 2,
764            ));
765            param_values.push(Box::new(like.clone()));
766            param_values.push(Box::new(like.clone()));
767            param_values.push(Box::new(like));
768            idx += 3;
769        }
770
771        if let Some(true) = filter.has_errors {
772            where_clauses.push("s.has_errors = 1".to_string());
773        }
774
775        if let Some(ref wd) = filter.working_directory {
776            where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
777            param_values.push(Box::new(format!("{wd}%")));
778            idx += 1;
779        }
780
781        if let Some(ref repo) = filter.git_repo_name {
782            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
783            param_values.push(Box::new(repo.clone()));
784            idx += 1;
785        }
786
787        // Optional JOIN for commit hash filter
788        let mut extra_join = String::new();
789        if let Some(ref commit) = filter.commit {
790            extra_join =
791                " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
792            where_clauses.push(format!("csl.commit_hash = ?{idx}"));
793            param_values.push(Box::new(commit.clone()));
794            idx += 1;
795        }
796
797        let _ = idx; // suppress unused warning
798
799        let where_str = where_clauses.join(" AND ");
800        let mut sql = format!(
801            "SELECT {LOCAL_SESSION_COLUMNS} \
802             {FROM_CLAUSE}{extra_join} WHERE {where_str} \
803             ORDER BY s.created_at DESC"
804        );
805
806        if let Some(limit) = filter.limit {
807            sql.push_str(" LIMIT ?");
808            param_values.push(Box::new(limit));
809            if let Some(offset) = filter.offset {
810                sql.push_str(" OFFSET ?");
811                param_values.push(Box::new(offset));
812            }
813        }
814
815        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
816            param_values.iter().map(|p| p.as_ref()).collect();
817        let conn = self.conn();
818        let mut stmt = conn.prepare(&sql)?;
819        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
820
821        let mut result = Vec::new();
822        for row in rows {
823            result.push(row?);
824        }
825        Ok(hide_opencode_child_sessions(result))
826    }
827
828    /// Get the latest N sessions for a specific tool, ordered by created_at DESC.
829    pub fn get_sessions_by_tool_latest(
830        &self,
831        tool: &str,
832        count: u32,
833    ) -> Result<Vec<LocalSessionRow>> {
834        let sql = format!(
835            "SELECT {LOCAL_SESSION_COLUMNS} \
836             {FROM_CLAUSE} WHERE s.tool = ?1 \
837             ORDER BY s.created_at DESC"
838        );
839        let conn = self.conn();
840        let mut stmt = conn.prepare(&sql)?;
841        let rows = stmt.query_map(params![tool], row_to_local_session)?;
842        let mut result = Vec::new();
843        for row in rows {
844            result.push(row?);
845        }
846
847        let mut filtered = hide_opencode_child_sessions(result);
848        filtered.truncate(count as usize);
849        Ok(filtered)
850    }
851
852    /// Get the latest N sessions across all tools, ordered by created_at DESC.
853    pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
854        let sql = format!(
855            "SELECT {LOCAL_SESSION_COLUMNS} \
856             {FROM_CLAUSE} \
857             ORDER BY s.created_at DESC"
858        );
859        let conn = self.conn();
860        let mut stmt = conn.prepare(&sql)?;
861        let rows = stmt.query_map([], row_to_local_session)?;
862        let mut result = Vec::new();
863        for row in rows {
864            result.push(row?);
865        }
866
867        let mut filtered = hide_opencode_child_sessions(result);
868        filtered.truncate(count as usize);
869        Ok(filtered)
870    }
871
872    /// Get the Nth most recent session for a specific tool (0 = HEAD, 1 = HEAD~1, etc.).
873    pub fn get_session_by_tool_offset(
874        &self,
875        tool: &str,
876        offset: u32,
877    ) -> Result<Option<LocalSessionRow>> {
878        let sql = format!(
879            "SELECT {LOCAL_SESSION_COLUMNS} \
880             {FROM_CLAUSE} WHERE s.tool = ?1 \
881             ORDER BY s.created_at DESC"
882        );
883        let conn = self.conn();
884        let mut stmt = conn.prepare(&sql)?;
885        let rows = stmt.query_map(params![tool], row_to_local_session)?;
886        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
887        Ok(filtered.into_iter().nth(offset as usize))
888    }
889
890    /// Get the Nth most recent session across all tools (0 = HEAD, 1 = HEAD~1, etc.).
891    pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
892        let sql = format!(
893            "SELECT {LOCAL_SESSION_COLUMNS} \
894             {FROM_CLAUSE} \
895             ORDER BY s.created_at DESC"
896        );
897        let conn = self.conn();
898        let mut stmt = conn.prepare(&sql)?;
899        let rows = stmt.query_map([], row_to_local_session)?;
900        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
901        Ok(filtered.into_iter().nth(offset as usize))
902    }
903
904    /// Fetch the source path used when the session was last parsed/loaded.
905    pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
906        let conn = self.conn();
907        let result = conn
908            .query_row(
909                "SELECT source_path FROM session_sync WHERE session_id = ?1",
910                params![session_id],
911                |row| row.get(0),
912            )
913            .optional()?;
914
915        Ok(result)
916    }
917
918    /// Count total sessions in the local DB.
919    pub fn session_count(&self) -> Result<i64> {
920        let count = self
921            .conn()
922            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
923        Ok(count)
924    }
925
926    // ── Delete session ─────────────────────────────────────────────────
927
928    pub fn delete_session(&self, session_id: &str) -> Result<()> {
929        let conn = self.conn();
930        conn.execute(
931            "DELETE FROM body_cache WHERE session_id = ?1",
932            params![session_id],
933        )?;
934        conn.execute(
935            "DELETE FROM session_sync WHERE session_id = ?1",
936            params![session_id],
937        )?;
938        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
939        Ok(())
940    }
941
942    // ── Sync cursor ────────────────────────────────────────────────────
943
944    pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
945        let cursor = self
946            .conn()
947            .query_row(
948                "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
949                params![team_id],
950                |row| row.get(0),
951            )
952            .optional()?;
953        Ok(cursor)
954    }
955
956    pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
957        self.conn().execute(
958            "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
959             VALUES (?1, ?2, datetime('now')) \
960             ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
961            params![team_id, cursor],
962        )?;
963        Ok(())
964    }
965
966    // ── Upload tracking ────────────────────────────────────────────────
967
968    /// Get sessions that are local_only and need to be uploaded.
969    pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
970        let sql = format!(
971            "SELECT {LOCAL_SESSION_COLUMNS} \
972             FROM sessions s \
973             INNER JOIN session_sync ss ON ss.session_id = s.id \
974             LEFT JOIN users u ON u.id = s.user_id \
975             WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
976             ORDER BY s.created_at ASC"
977        );
978        let conn = self.conn();
979        let mut stmt = conn.prepare(&sql)?;
980        let rows = stmt.query_map(params![team_id], row_to_local_session)?;
981        let mut result = Vec::new();
982        for row in rows {
983            result.push(row?);
984        }
985        Ok(result)
986    }
987
988    pub fn mark_synced(&self, session_id: &str) -> Result<()> {
989        self.conn().execute(
990            "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
991             WHERE session_id = ?1",
992            params![session_id],
993        )?;
994        Ok(())
995    }
996
997    /// Check if a session was already uploaded (synced or remote_only) since the given modification time.
998    pub fn was_uploaded_after(
999        &self,
1000        source_path: &str,
1001        modified: &chrono::DateTime<chrono::Utc>,
1002    ) -> Result<bool> {
1003        let result: Option<String> = self
1004            .conn()
1005            .query_row(
1006                "SELECT last_synced_at FROM session_sync \
1007                 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1008                params![source_path],
1009                |row| row.get(0),
1010            )
1011            .optional()?;
1012
1013        if let Some(synced_at) = result {
1014            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1015                return Ok(dt >= *modified);
1016            }
1017        }
1018        Ok(false)
1019    }
1020
1021    // ── Body cache (legacy compatibility) ──────────────────────────────
1022
1023    pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1024        self.conn().execute(
1025            "INSERT INTO body_cache (session_id, body, cached_at) \
1026             VALUES (?1, ?2, datetime('now')) \
1027             ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1028            params![session_id, body],
1029        )?;
1030        Ok(())
1031    }
1032
1033    pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1034        let body = self
1035            .conn()
1036            .query_row(
1037                "SELECT body FROM body_cache WHERE session_id = ?1",
1038                params![session_id],
1039                |row| row.get(0),
1040            )
1041            .optional()?;
1042        Ok(body)
1043    }
1044
1045    // ── Timeline summary cache ────────────────────────────────────────
1046
1047    pub fn upsert_timeline_summary_cache(
1048        &self,
1049        lookup_key: &str,
1050        namespace: &str,
1051        compact: &str,
1052        payload: &str,
1053        raw: &str,
1054    ) -> Result<()> {
1055        self.conn().execute(
1056            "INSERT INTO timeline_summary_cache \
1057             (lookup_key, namespace, compact, payload, raw, cached_at) \
1058             VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1059             ON CONFLICT(lookup_key) DO UPDATE SET \
1060               namespace = excluded.namespace, \
1061               compact = excluded.compact, \
1062               payload = excluded.payload, \
1063               raw = excluded.raw, \
1064               cached_at = datetime('now')",
1065            params![lookup_key, namespace, compact, payload, raw],
1066        )?;
1067        Ok(())
1068    }
1069
1070    pub fn list_timeline_summary_cache_by_namespace(
1071        &self,
1072        namespace: &str,
1073    ) -> Result<Vec<TimelineSummaryCacheRow>> {
1074        let conn = self.conn();
1075        let mut stmt = conn.prepare(
1076            "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1077             FROM timeline_summary_cache \
1078             WHERE namespace = ?1 \
1079             ORDER BY cached_at DESC",
1080        )?;
1081        let rows = stmt.query_map(params![namespace], |row| {
1082            Ok(TimelineSummaryCacheRow {
1083                lookup_key: row.get(0)?,
1084                namespace: row.get(1)?,
1085                compact: row.get(2)?,
1086                payload: row.get(3)?,
1087                raw: row.get(4)?,
1088                cached_at: row.get(5)?,
1089            })
1090        })?;
1091
1092        let mut out = Vec::new();
1093        for row in rows {
1094            out.push(row?);
1095        }
1096        Ok(out)
1097    }
1098
1099    pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1100        let affected = self
1101            .conn()
1102            .execute("DELETE FROM timeline_summary_cache", [])?;
1103        Ok(affected)
1104    }
1105
1106    // ── Migration helper ───────────────────────────────────────────────
1107
1108    /// Migrate entries from the old state.json UploadState into the local DB.
1109    /// Marks them as `synced` with no metadata (we only know the file path was uploaded).
1110    pub fn migrate_from_state_json(
1111        &self,
1112        uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1113    ) -> Result<usize> {
1114        let mut count = 0;
1115        for (path, uploaded_at) in uploaded {
1116            let exists: bool = self
1117                .conn()
1118                .query_row(
1119                    "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1120                    params![path],
1121                    |row| row.get(0),
1122                )
1123                .unwrap_or(false);
1124
1125            if exists {
1126                self.conn().execute(
1127                    "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1128                     WHERE source_path = ?2 AND sync_status = 'local_only'",
1129                    params![uploaded_at.to_rfc3339(), path],
1130                )?;
1131                count += 1;
1132            }
1133        }
1134        Ok(count)
1135    }
1136
1137    // ── Commit ↔ session linking ────────────────────────────────────
1138
1139    /// Link a git commit to an AI session.
1140    pub fn link_commit_session(
1141        &self,
1142        commit_hash: &str,
1143        session_id: &str,
1144        repo_path: Option<&str>,
1145        branch: Option<&str>,
1146    ) -> Result<()> {
1147        self.conn().execute(
1148            "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1149             VALUES (?1, ?2, ?3, ?4) \
1150             ON CONFLICT(commit_hash, session_id) DO NOTHING",
1151            params![commit_hash, session_id, repo_path, branch],
1152        )?;
1153        Ok(())
1154    }
1155
1156    /// Get all sessions linked to a git commit.
1157    pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1158        let sql = format!(
1159            "SELECT {LOCAL_SESSION_COLUMNS} \
1160             {FROM_CLAUSE} \
1161             INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1162             WHERE csl.commit_hash = ?1 \
1163             ORDER BY s.created_at DESC"
1164        );
1165        let conn = self.conn();
1166        let mut stmt = conn.prepare(&sql)?;
1167        let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1168        let mut result = Vec::new();
1169        for row in rows {
1170            result.push(row?);
1171        }
1172        Ok(result)
1173    }
1174
1175    /// Get all commits linked to a session.
1176    pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1177        let conn = self.conn();
1178        let mut stmt = conn.prepare(
1179            "SELECT commit_hash, session_id, repo_path, branch, created_at \
1180             FROM commit_session_links WHERE session_id = ?1 \
1181             ORDER BY created_at DESC",
1182        )?;
1183        let rows = stmt.query_map(params![session_id], |row| {
1184            Ok(CommitLink {
1185                commit_hash: row.get(0)?,
1186                session_id: row.get(1)?,
1187                repo_path: row.get(2)?,
1188                branch: row.get(3)?,
1189                created_at: row.get(4)?,
1190            })
1191        })?;
1192        let mut result = Vec::new();
1193        for row in rows {
1194            result.push(row?);
1195        }
1196        Ok(result)
1197    }
1198
1199    /// Find the most recently active session for a given repo path.
1200    /// "Active" means the session's working_directory matches the repo path
1201    /// and was created within the last `since_minutes` minutes.
1202    pub fn find_active_session_for_repo(
1203        &self,
1204        repo_path: &str,
1205        since_minutes: u32,
1206    ) -> Result<Option<LocalSessionRow>> {
1207        let sql = format!(
1208            "SELECT {LOCAL_SESSION_COLUMNS} \
1209             {FROM_CLAUSE} \
1210             WHERE s.working_directory LIKE ?1 \
1211             AND s.created_at >= datetime('now', ?2) \
1212             ORDER BY s.created_at DESC LIMIT 1"
1213        );
1214        let since = format!("-{since_minutes} minutes");
1215        let like = format!("{repo_path}%");
1216        let conn = self.conn();
1217        let mut stmt = conn.prepare(&sql)?;
1218        let row = stmt
1219            .query_map(params![like, since], row_to_local_session)?
1220            .next()
1221            .transpose()?;
1222        Ok(row)
1223    }
1224
1225    /// Get all session IDs currently in the local DB.
1226    pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1227        let conn = self.conn();
1228        let mut stmt = conn
1229            .prepare("SELECT id FROM sessions")
1230            .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1231        let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1232        let mut set = std::collections::HashSet::new();
1233        if let Ok(rows) = rows {
1234            for row in rows.flatten() {
1235                set.insert(row);
1236            }
1237        }
1238        set
1239    }
1240
1241    /// Update only stats fields for an existing session (no git context re-extraction).
1242    pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1243        let title = session.context.title.as_deref();
1244        let description = session.context.description.as_deref();
1245        let (files_modified, files_read, has_errors) =
1246            opensession_core::extract::extract_file_metadata(session);
1247        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1248
1249        self.conn().execute(
1250            "UPDATE sessions SET \
1251             title=?2, description=?3, \
1252             message_count=?4, user_message_count=?5, task_count=?6, \
1253             event_count=?7, duration_seconds=?8, \
1254             total_input_tokens=?9, total_output_tokens=?10, \
1255             files_modified=?11, files_read=?12, has_errors=?13, \
1256             max_active_agents=?14 \
1257             WHERE id=?1",
1258            params![
1259                &session.session_id,
1260                title,
1261                description,
1262                session.stats.message_count as i64,
1263                session.stats.user_message_count as i64,
1264                session.stats.task_count as i64,
1265                session.stats.event_count as i64,
1266                session.stats.duration_seconds as i64,
1267                session.stats.total_input_tokens as i64,
1268                session.stats.total_output_tokens as i64,
1269                &files_modified,
1270                &files_read,
1271                has_errors,
1272                max_active_agents,
1273            ],
1274        )?;
1275        Ok(())
1276    }
1277
1278    /// Update only sync metadata path for an existing session.
1279    pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1280        self.conn().execute(
1281            "INSERT INTO session_sync (session_id, source_path) \
1282             VALUES (?1, ?2) \
1283             ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1284            params![session_id, source_path],
1285        )?;
1286        Ok(())
1287    }
1288
1289    /// Get a list of distinct git repo names present in the DB.
1290    pub fn list_repos(&self) -> Result<Vec<String>> {
1291        let conn = self.conn();
1292        let mut stmt = conn.prepare(
1293            "SELECT DISTINCT git_repo_name FROM sessions \
1294             WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1295        )?;
1296        let rows = stmt.query_map([], |row| row.get(0))?;
1297        let mut result = Vec::new();
1298        for row in rows {
1299            result.push(row?);
1300        }
1301        Ok(result)
1302    }
1303
1304    /// Get a list of distinct, non-empty working directories present in the DB.
1305    pub fn list_working_directories(&self) -> Result<Vec<String>> {
1306        let conn = self.conn();
1307        let mut stmt = conn.prepare(
1308            "SELECT DISTINCT working_directory FROM sessions \
1309             WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1310             ORDER BY working_directory ASC",
1311        )?;
1312        let rows = stmt.query_map([], |row| row.get(0))?;
1313        let mut result = Vec::new();
1314        for row in rows {
1315            result.push(row?);
1316        }
1317        Ok(result)
1318    }
1319}
1320
1321// ── Legacy schema backfill ─────────────────────────────────────────────
1322
1323fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1324    let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1325    conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1326
1327    // Disable FK constraints for local DB (index/cache, not source of truth)
1328    conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1329
1330    apply_local_migrations(&conn)?;
1331
1332    // Backfill missing columns for legacy local DBs where `sessions` already
1333    // existed before newer fields were introduced.
1334    ensure_sessions_columns(&conn)?;
1335    validate_local_schema(&conn)?;
1336
1337    Ok(conn)
1338}
1339
1340fn apply_local_migrations(conn: &Connection) -> Result<()> {
1341    conn.execute_batch(
1342        "CREATE TABLE IF NOT EXISTS _migrations (
1343            id INTEGER PRIMARY KEY,
1344            name TEXT NOT NULL UNIQUE,
1345            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1346        );",
1347    )
1348    .context("create _migrations table for local db")?;
1349
1350    for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1351        let already_applied: bool = conn
1352            .query_row(
1353                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1354                [name],
1355                |row| row.get(0),
1356            )
1357            .unwrap_or(false);
1358
1359        if already_applied {
1360            continue;
1361        }
1362
1363        if let Err(e) = conn.execute_batch(sql) {
1364            let msg = e.to_string().to_ascii_lowercase();
1365            if !is_local_migration_compat_error(&msg) {
1366                return Err(e).with_context(|| format!("apply local migration {name}"));
1367            }
1368        }
1369
1370        conn.execute(
1371            "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1372            [name],
1373        )
1374        .with_context(|| format!("record local migration {name}"))?;
1375    }
1376
1377    Ok(())
1378}
1379
1380fn is_local_migration_compat_error(msg: &str) -> bool {
1381    msg.contains("duplicate column name")
1382        || msg.contains("no such column")
1383        || msg.contains("already exists")
1384}
1385
1386fn validate_local_schema(conn: &Connection) -> Result<()> {
1387    let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1388    conn.prepare(&sql)
1389        .map(|_| ())
1390        .context("validate local session schema")
1391}
1392
1393fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1394    let msg = format!("{err:#}").to_ascii_lowercase();
1395    msg.contains("no such column")
1396        || msg.contains("no such table")
1397        || msg.contains("cannot add a column")
1398        || msg.contains("already exists")
1399        || msg.contains("views may not be indexed")
1400        || msg.contains("malformed database schema")
1401        || msg.contains("duplicate column name")
1402}
1403
1404fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1405    if !path.exists() {
1406        return Ok(());
1407    }
1408
1409    let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1410    let backup_name = format!(
1411        "{}.legacy-{}.bak",
1412        path.file_name()
1413            .and_then(|n| n.to_str())
1414            .unwrap_or("local.db"),
1415        ts
1416    );
1417    let backup_path = path.with_file_name(backup_name);
1418    std::fs::rename(path, &backup_path).with_context(|| {
1419        format!(
1420            "rotate legacy db {} -> {}",
1421            path.display(),
1422            backup_path.display()
1423        )
1424    })?;
1425
1426    let wal = PathBuf::from(format!("{}-wal", path.display()));
1427    let shm = PathBuf::from(format!("{}-shm", path.display()));
1428    let _ = std::fs::remove_file(wal);
1429    let _ = std::fs::remove_file(shm);
1430    Ok(())
1431}
1432
1433const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1434    ("user_id", "TEXT"),
1435    ("team_id", "TEXT DEFAULT 'personal'"),
1436    ("tool", "TEXT DEFAULT ''"),
1437    ("agent_provider", "TEXT"),
1438    ("agent_model", "TEXT"),
1439    ("title", "TEXT"),
1440    ("description", "TEXT"),
1441    ("tags", "TEXT"),
1442    ("created_at", "TEXT DEFAULT ''"),
1443    ("uploaded_at", "TEXT DEFAULT ''"),
1444    ("message_count", "INTEGER DEFAULT 0"),
1445    ("user_message_count", "INTEGER DEFAULT 0"),
1446    ("task_count", "INTEGER DEFAULT 0"),
1447    ("event_count", "INTEGER DEFAULT 0"),
1448    ("duration_seconds", "INTEGER DEFAULT 0"),
1449    ("total_input_tokens", "INTEGER DEFAULT 0"),
1450    ("total_output_tokens", "INTEGER DEFAULT 0"),
1451    // Deprecated compatibility column from pre git-native body storage.
1452    ("body_storage_key", "TEXT DEFAULT ''"),
1453    ("body_url", "TEXT"),
1454    ("git_remote", "TEXT"),
1455    ("git_branch", "TEXT"),
1456    ("git_commit", "TEXT"),
1457    ("git_repo_name", "TEXT"),
1458    ("pr_number", "INTEGER"),
1459    ("pr_url", "TEXT"),
1460    ("working_directory", "TEXT"),
1461    ("files_modified", "TEXT"),
1462    ("files_read", "TEXT"),
1463    ("has_errors", "BOOLEAN DEFAULT 0"),
1464    ("max_active_agents", "INTEGER DEFAULT 1"),
1465];
1466
1467fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1468    let mut existing = HashSet::new();
1469    let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1470    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1471    for row in rows {
1472        existing.insert(row?);
1473    }
1474
1475    for (name, decl) in REQUIRED_SESSION_COLUMNS {
1476        if existing.contains(*name) {
1477            continue;
1478        }
1479        let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1480        conn.execute_batch(&sql)
1481            .with_context(|| format!("add legacy sessions column '{name}'"))?;
1482    }
1483
1484    Ok(())
1485}
1486
1487/// Column list for SELECT queries against sessions + session_sync + users.
1488pub const LOCAL_SESSION_COLUMNS: &str = "\
1489s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1490s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1491s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1492s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1493s.total_input_tokens, s.total_output_tokens, \
1494s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1495s.pr_number, s.pr_url, s.working_directory, \
1496s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1497
1498fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1499    Ok(LocalSessionRow {
1500        id: row.get(0)?,
1501        source_path: row.get(1)?,
1502        sync_status: row.get(2)?,
1503        last_synced_at: row.get(3)?,
1504        user_id: row.get(4)?,
1505        nickname: row.get(5)?,
1506        team_id: row.get(6)?,
1507        tool: row.get(7)?,
1508        agent_provider: row.get(8)?,
1509        agent_model: row.get(9)?,
1510        title: row.get(10)?,
1511        description: row.get(11)?,
1512        tags: row.get(12)?,
1513        created_at: row.get(13)?,
1514        uploaded_at: row.get(14)?,
1515        message_count: row.get(15)?,
1516        user_message_count: row.get(16)?,
1517        task_count: row.get(17)?,
1518        event_count: row.get(18)?,
1519        duration_seconds: row.get(19)?,
1520        total_input_tokens: row.get(20)?,
1521        total_output_tokens: row.get(21)?,
1522        git_remote: row.get(22)?,
1523        git_branch: row.get(23)?,
1524        git_commit: row.get(24)?,
1525        git_repo_name: row.get(25)?,
1526        pr_number: row.get(26)?,
1527        pr_url: row.get(27)?,
1528        working_directory: row.get(28)?,
1529        files_modified: row.get(29)?,
1530        files_read: row.get(30)?,
1531        has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1532        max_active_agents: row.get(32).unwrap_or(1),
1533    })
1534}
1535
1536fn default_db_path() -> Result<PathBuf> {
1537    let home = std::env::var("HOME")
1538        .or_else(|_| std::env::var("USERPROFILE"))
1539        .context("Could not determine home directory")?;
1540    Ok(PathBuf::from(home)
1541        .join(".local")
1542        .join("share")
1543        .join("opensession")
1544        .join("local.db"))
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549    use super::*;
1550
1551    use std::collections::BTreeSet;
1552    use std::fs::{create_dir_all, write};
1553    use tempfile::tempdir;
1554
1555    fn test_db() -> LocalDb {
1556        let dir = tempdir().unwrap();
1557        let path = dir.keep().join("test.db");
1558        LocalDb::open_path(&path).unwrap()
1559    }
1560
1561    fn temp_root() -> tempfile::TempDir {
1562        tempdir().unwrap()
1563    }
1564
1565    fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1566        LocalSessionRow {
1567            id: id.to_string(),
1568            source_path: source_path.map(String::from),
1569            sync_status: "local_only".to_string(),
1570            last_synced_at: None,
1571            user_id: None,
1572            nickname: None,
1573            team_id: None,
1574            tool: tool.to_string(),
1575            agent_provider: None,
1576            agent_model: None,
1577            title: Some("test".to_string()),
1578            description: None,
1579            tags: None,
1580            created_at: "2024-01-01T00:00:00Z".to_string(),
1581            uploaded_at: None,
1582            message_count: 0,
1583            user_message_count: 0,
1584            task_count: 0,
1585            event_count: 0,
1586            duration_seconds: 0,
1587            total_input_tokens: 0,
1588            total_output_tokens: 0,
1589            git_remote: None,
1590            git_branch: None,
1591            git_commit: None,
1592            git_repo_name: None,
1593            pr_number: None,
1594            pr_url: None,
1595            working_directory: None,
1596            files_modified: None,
1597            files_read: None,
1598            has_errors: false,
1599            max_active_agents: 1,
1600        }
1601    }
1602
1603    #[test]
1604    fn test_open_and_schema() {
1605        let _db = test_db();
1606    }
1607
1608    #[test]
1609    fn test_open_backfills_legacy_sessions_columns() {
1610        let dir = tempfile::tempdir().unwrap();
1611        let path = dir.path().join("legacy.db");
1612        {
1613            let conn = Connection::open(&path).unwrap();
1614            conn.execute_batch(
1615                "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1616                 INSERT INTO sessions (id) VALUES ('legacy-1');",
1617            )
1618            .unwrap();
1619        }
1620
1621        let db = LocalDb::open_path(&path).unwrap();
1622        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1623        assert_eq!(rows.len(), 1);
1624        assert_eq!(rows[0].id, "legacy-1");
1625        assert_eq!(rows[0].user_message_count, 0);
1626    }
1627
1628    #[test]
1629    fn test_open_rotates_incompatible_legacy_schema() {
1630        let dir = tempfile::tempdir().unwrap();
1631        let path = dir.path().join("broken.db");
1632        {
1633            let conn = Connection::open(&path).unwrap();
1634            conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1635                .unwrap();
1636        }
1637
1638        let db = LocalDb::open_path(&path).unwrap();
1639        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1640        assert!(rows.is_empty());
1641
1642        let rotated = std::fs::read_dir(dir.path())
1643            .unwrap()
1644            .filter_map(Result::ok)
1645            .any(|entry| {
1646                let name = entry.file_name();
1647                let name = name.to_string_lossy();
1648                name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1649            });
1650        assert!(rotated, "expected rotated legacy backup file");
1651    }
1652
1653    #[test]
1654    fn test_is_opencode_child_session() {
1655        let root = temp_root();
1656        let dir = root.path().join("sessions");
1657        create_dir_all(&dir).unwrap();
1658        let parent_session = dir.join("parent.json");
1659        write(
1660            &parent_session,
1661            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1662        )
1663        .unwrap();
1664        let child_session = dir.join("child.json");
1665        write(
1666            &child_session,
1667            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1668        )
1669        .unwrap();
1670
1671        let parent = make_row(
1672            "ses_parent",
1673            "opencode",
1674            Some(parent_session.to_str().unwrap()),
1675        );
1676        let child = make_row(
1677            "ses_child",
1678            "opencode",
1679            Some(child_session.to_str().unwrap()),
1680        );
1681        let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1682
1683        assert!(!is_opencode_child_session(&parent));
1684        assert!(is_opencode_child_session(&child));
1685        assert!(!is_opencode_child_session(&codex));
1686    }
1687
1688    #[test]
1689    fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1690        let child = LocalSessionRow {
1691            id: "sess_child".to_string(),
1692            source_path: None,
1693            sync_status: "local_only".to_string(),
1694            last_synced_at: None,
1695            user_id: None,
1696            nickname: None,
1697            team_id: None,
1698            tool: "opencode".to_string(),
1699            agent_provider: None,
1700            agent_model: None,
1701            title: None,
1702            description: None,
1703            tags: None,
1704            created_at: "2024-01-01T00:00:00Z".to_string(),
1705            uploaded_at: None,
1706            message_count: 1,
1707            user_message_count: 0,
1708            task_count: 4,
1709            event_count: 4,
1710            duration_seconds: 0,
1711            total_input_tokens: 0,
1712            total_output_tokens: 0,
1713            git_remote: None,
1714            git_branch: None,
1715            git_commit: None,
1716            git_repo_name: None,
1717            pr_number: None,
1718            pr_url: None,
1719            working_directory: None,
1720            files_modified: None,
1721            files_read: None,
1722            has_errors: false,
1723            max_active_agents: 1,
1724        };
1725
1726        let parent = LocalSessionRow {
1727            id: "sess_parent".to_string(),
1728            source_path: None,
1729            sync_status: "local_only".to_string(),
1730            last_synced_at: None,
1731            user_id: None,
1732            nickname: None,
1733            team_id: None,
1734            tool: "opencode".to_string(),
1735            agent_provider: None,
1736            agent_model: None,
1737            title: Some("regular".to_string()),
1738            description: None,
1739            tags: None,
1740            created_at: "2024-01-01T00:00:00Z".to_string(),
1741            uploaded_at: None,
1742            message_count: 1,
1743            user_message_count: 1,
1744            task_count: 2,
1745            event_count: 20,
1746            duration_seconds: 0,
1747            total_input_tokens: 0,
1748            total_output_tokens: 0,
1749            git_remote: None,
1750            git_branch: None,
1751            git_commit: None,
1752            git_repo_name: None,
1753            pr_number: None,
1754            pr_url: None,
1755            working_directory: None,
1756            files_modified: None,
1757            files_read: None,
1758            has_errors: false,
1759            max_active_agents: 1,
1760        };
1761
1762        assert!(is_opencode_child_session(&child));
1763        assert!(!is_opencode_child_session(&parent));
1764    }
1765
1766    #[test]
1767    fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1768        let child = LocalSessionRow {
1769            id: "sess_child_2".to_string(),
1770            source_path: None,
1771            sync_status: "local_only".to_string(),
1772            last_synced_at: None,
1773            user_id: None,
1774            nickname: None,
1775            team_id: None,
1776            tool: "opencode".to_string(),
1777            agent_provider: None,
1778            agent_model: None,
1779            title: None,
1780            description: None,
1781            tags: None,
1782            created_at: "2024-01-01T00:00:00Z".to_string(),
1783            uploaded_at: None,
1784            message_count: 2,
1785            user_message_count: 0,
1786            task_count: 4,
1787            event_count: 4,
1788            duration_seconds: 0,
1789            total_input_tokens: 0,
1790            total_output_tokens: 0,
1791            git_remote: None,
1792            git_branch: None,
1793            git_commit: None,
1794            git_repo_name: None,
1795            pr_number: None,
1796            pr_url: None,
1797            working_directory: None,
1798            files_modified: None,
1799            files_read: None,
1800            has_errors: false,
1801            max_active_agents: 1,
1802        };
1803
1804        let parent = LocalSessionRow {
1805            id: "sess_parent".to_string(),
1806            source_path: None,
1807            sync_status: "local_only".to_string(),
1808            last_synced_at: None,
1809            user_id: None,
1810            nickname: None,
1811            team_id: None,
1812            tool: "opencode".to_string(),
1813            agent_provider: None,
1814            agent_model: None,
1815            title: Some("regular".to_string()),
1816            description: None,
1817            tags: None,
1818            created_at: "2024-01-01T00:00:00Z".to_string(),
1819            uploaded_at: None,
1820            message_count: 2,
1821            user_message_count: 1,
1822            task_count: 5,
1823            event_count: 20,
1824            duration_seconds: 0,
1825            total_input_tokens: 0,
1826            total_output_tokens: 0,
1827            git_remote: None,
1828            git_branch: None,
1829            git_commit: None,
1830            git_repo_name: None,
1831            pr_number: None,
1832            pr_url: None,
1833            working_directory: None,
1834            files_modified: None,
1835            files_read: None,
1836            has_errors: false,
1837            max_active_agents: 1,
1838        };
1839
1840        assert!(is_opencode_child_session(&child));
1841        assert!(!is_opencode_child_session(&parent));
1842    }
1843
1844    #[test]
1845    fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1846        let child = LocalSessionRow {
1847            id: "sess_child_3".to_string(),
1848            source_path: None,
1849            sync_status: "local_only".to_string(),
1850            last_synced_at: None,
1851            user_id: None,
1852            nickname: None,
1853            team_id: None,
1854            tool: "opencode".to_string(),
1855            agent_provider: None,
1856            agent_model: None,
1857            title: None,
1858            description: None,
1859            tags: None,
1860            created_at: "2024-01-01T00:00:00Z".to_string(),
1861            uploaded_at: None,
1862            message_count: 3,
1863            user_message_count: 0,
1864            task_count: 2,
1865            event_count: 6,
1866            duration_seconds: 0,
1867            total_input_tokens: 0,
1868            total_output_tokens: 0,
1869            git_remote: None,
1870            git_branch: None,
1871            git_commit: None,
1872            git_repo_name: None,
1873            pr_number: None,
1874            pr_url: None,
1875            working_directory: None,
1876            files_modified: None,
1877            files_read: None,
1878            has_errors: false,
1879            max_active_agents: 1,
1880        };
1881
1882        assert!(is_opencode_child_session(&child));
1883    }
1884
1885    #[test]
1886    fn test_parse_opencode_parent_session_id_aliases() {
1887        let root = temp_root();
1888        let dir = root.path().join("session-aliases");
1889        create_dir_all(&dir).unwrap();
1890        let child_session = dir.join("child.json");
1891        write(
1892            &child_session,
1893            r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1894        )
1895        .unwrap();
1896        assert_eq!(
1897            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1898            Some("ses_parent")
1899        );
1900    }
1901
1902    #[test]
1903    fn test_parse_opencode_parent_session_id_nested_metadata() {
1904        let root = temp_root();
1905        let dir = root.path().join("session-nested");
1906        create_dir_all(&dir).unwrap();
1907        let child_session = dir.join("child.json");
1908        write(
1909            &child_session,
1910            r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1911        )
1912        .unwrap();
1913        assert_eq!(
1914            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1915            Some("ses_parent")
1916        );
1917    }
1918
1919    #[test]
1920    fn test_is_claude_subagent_session() {
1921        let row = make_row(
1922            "ses_parent",
1923            "claude-code",
1924            Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1925        );
1926        assert!(!is_opencode_child_session(&row));
1927        assert!(is_claude_subagent_session(&row));
1928        assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1929    }
1930
1931    #[test]
1932    fn test_hide_opencode_child_sessions() {
1933        let root = temp_root();
1934        let dir = root.path().join("sessions");
1935        create_dir_all(&dir).unwrap();
1936        let parent_session = dir.join("parent.json");
1937        let child_session = dir.join("child.json");
1938        let orphan_session = dir.join("orphan.json");
1939
1940        write(
1941            &parent_session,
1942            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1943        )
1944        .unwrap();
1945        write(
1946            &child_session,
1947            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1948        )
1949        .unwrap();
1950        write(
1951            &orphan_session,
1952            r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1953        )
1954        .unwrap();
1955
1956        let rows = vec![
1957            make_row(
1958                "ses_child",
1959                "opencode",
1960                Some(child_session.to_str().unwrap()),
1961            ),
1962            make_row(
1963                "ses_parent",
1964                "opencode",
1965                Some(parent_session.to_str().unwrap()),
1966            ),
1967            {
1968                let mut row = make_row("ses_other", "codex", None);
1969                row.user_message_count = 1;
1970                row
1971            },
1972            make_row(
1973                "ses_orphan",
1974                "opencode",
1975                Some(orphan_session.to_str().unwrap()),
1976            ),
1977        ];
1978
1979        let filtered = hide_opencode_child_sessions(rows);
1980        assert_eq!(filtered.len(), 3);
1981        assert!(filtered.iter().all(|r| r.id != "ses_child"));
1982    }
1983
1984    #[test]
1985    fn test_sync_cursor() {
1986        let db = test_db();
1987        assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1988        db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1989        assert_eq!(
1990            db.get_sync_cursor("team1").unwrap(),
1991            Some("2024-01-01T00:00:00Z".to_string())
1992        );
1993        // Update
1994        db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1995        assert_eq!(
1996            db.get_sync_cursor("team1").unwrap(),
1997            Some("2024-06-01T00:00:00Z".to_string())
1998        );
1999    }
2000
2001    #[test]
2002    fn test_body_cache() {
2003        let db = test_db();
2004        assert_eq!(db.get_cached_body("s1").unwrap(), None);
2005        db.cache_body("s1", b"hello world").unwrap();
2006        assert_eq!(
2007            db.get_cached_body("s1").unwrap(),
2008            Some(b"hello world".to_vec())
2009        );
2010    }
2011
2012    #[test]
2013    fn test_timeline_summary_cache_roundtrip() {
2014        let db = test_db();
2015        db.upsert_timeline_summary_cache(
2016            "k1",
2017            "timeline:v1",
2018            "compact text",
2019            "{\"kind\":\"turn-summary\"}",
2020            "raw text",
2021        )
2022        .unwrap();
2023
2024        let rows = db
2025            .list_timeline_summary_cache_by_namespace("timeline:v1")
2026            .unwrap();
2027        assert_eq!(rows.len(), 1);
2028        assert_eq!(rows[0].lookup_key, "k1");
2029        assert_eq!(rows[0].namespace, "timeline:v1");
2030        assert_eq!(rows[0].compact, "compact text");
2031        assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2032        assert_eq!(rows[0].raw, "raw text");
2033
2034        let cleared = db.clear_timeline_summary_cache().unwrap();
2035        assert_eq!(cleared, 1);
2036        let rows_after = db
2037            .list_timeline_summary_cache_by_namespace("timeline:v1")
2038            .unwrap();
2039        assert!(rows_after.is_empty());
2040    }
2041
2042    #[test]
2043    fn test_local_migrations_include_timeline_summary_cache() {
2044        let db = test_db();
2045        let conn = db.conn();
2046        let applied: bool = conn
2047            .query_row(
2048                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2049                params!["local_0003_timeline_summary_cache"],
2050                |row| row.get(0),
2051            )
2052            .unwrap();
2053        assert!(applied);
2054    }
2055
2056    #[test]
2057    fn test_local_migration_files_match_api_local_migrations() {
2058        fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2059            std::fs::read_dir(dir)
2060                .expect("read migrations directory")
2061                .filter_map(Result::ok)
2062                .map(|entry| entry.file_name().to_string_lossy().to_string())
2063                .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2064                .collect()
2065        }
2066
2067        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2068        let local_files = collect_local_sql(manifest_dir.join("migrations"));
2069        let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2070
2071        assert_eq!(
2072            local_files, api_files,
2073            "local-db local migrations must stay in parity with api local migrations"
2074        );
2075    }
2076
2077    #[test]
2078    fn test_upsert_remote_session() {
2079        let db = test_db();
2080        let summary = RemoteSessionSummary {
2081            id: "remote-1".to_string(),
2082            user_id: Some("u1".to_string()),
2083            nickname: Some("alice".to_string()),
2084            team_id: "t1".to_string(),
2085            tool: "claude-code".to_string(),
2086            agent_provider: None,
2087            agent_model: None,
2088            title: Some("Test session".to_string()),
2089            description: None,
2090            tags: None,
2091            created_at: "2024-01-01T00:00:00Z".to_string(),
2092            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2093            message_count: 10,
2094            task_count: 2,
2095            event_count: 20,
2096            duration_seconds: 300,
2097            total_input_tokens: 1000,
2098            total_output_tokens: 500,
2099            git_remote: None,
2100            git_branch: None,
2101            git_commit: None,
2102            git_repo_name: None,
2103            pr_number: None,
2104            pr_url: None,
2105            working_directory: None,
2106            files_modified: None,
2107            files_read: None,
2108            has_errors: false,
2109            max_active_agents: 1,
2110        };
2111        db.upsert_remote_session(&summary).unwrap();
2112
2113        let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2114        assert_eq!(sessions.len(), 1);
2115        assert_eq!(sessions[0].id, "remote-1");
2116        assert_eq!(sessions[0].sync_status, "remote_only");
2117        assert_eq!(sessions[0].nickname, None); // no user in local users table
2118    }
2119
2120    #[test]
2121    fn test_list_filter_by_repo() {
2122        let db = test_db();
2123        // Insert a remote session with team_id
2124        let summary1 = RemoteSessionSummary {
2125            id: "s1".to_string(),
2126            user_id: None,
2127            nickname: None,
2128            team_id: "t1".to_string(),
2129            tool: "claude-code".to_string(),
2130            agent_provider: None,
2131            agent_model: None,
2132            title: Some("Session 1".to_string()),
2133            description: None,
2134            tags: None,
2135            created_at: "2024-01-01T00:00:00Z".to_string(),
2136            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2137            message_count: 5,
2138            task_count: 0,
2139            event_count: 10,
2140            duration_seconds: 60,
2141            total_input_tokens: 100,
2142            total_output_tokens: 50,
2143            git_remote: None,
2144            git_branch: None,
2145            git_commit: None,
2146            git_repo_name: None,
2147            pr_number: None,
2148            pr_url: None,
2149            working_directory: None,
2150            files_modified: None,
2151            files_read: None,
2152            has_errors: false,
2153            max_active_agents: 1,
2154        };
2155        db.upsert_remote_session(&summary1).unwrap();
2156
2157        // Filter by team
2158        let filter = LocalSessionFilter {
2159            team_id: Some("t1".to_string()),
2160            ..Default::default()
2161        };
2162        assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2163
2164        let filter = LocalSessionFilter {
2165            team_id: Some("t999".to_string()),
2166            ..Default::default()
2167        };
2168        assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2169    }
2170
2171    // ── Helpers for inserting test sessions ────────────────────────────
2172
2173    fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2174        RemoteSessionSummary {
2175            id: id.to_string(),
2176            user_id: None,
2177            nickname: None,
2178            team_id: "t1".to_string(),
2179            tool: tool.to_string(),
2180            agent_provider: Some("anthropic".to_string()),
2181            agent_model: Some("claude-opus-4-6".to_string()),
2182            title: Some(title.to_string()),
2183            description: None,
2184            tags: None,
2185            created_at: created_at.to_string(),
2186            uploaded_at: created_at.to_string(),
2187            message_count: 5,
2188            task_count: 1,
2189            event_count: 10,
2190            duration_seconds: 300,
2191            total_input_tokens: 1000,
2192            total_output_tokens: 500,
2193            git_remote: None,
2194            git_branch: None,
2195            git_commit: None,
2196            git_repo_name: None,
2197            pr_number: None,
2198            pr_url: None,
2199            working_directory: None,
2200            files_modified: None,
2201            files_read: None,
2202            has_errors: false,
2203            max_active_agents: 1,
2204        }
2205    }
2206
2207    fn seed_sessions(db: &LocalDb) {
2208        // Insert 5 sessions across two tools, ordered by created_at
2209        db.upsert_remote_session(&make_summary(
2210            "s1",
2211            "claude-code",
2212            "First session",
2213            "2024-01-01T00:00:00Z",
2214        ))
2215        .unwrap();
2216        db.upsert_remote_session(&make_summary(
2217            "s2",
2218            "claude-code",
2219            "JWT auth work",
2220            "2024-01-02T00:00:00Z",
2221        ))
2222        .unwrap();
2223        db.upsert_remote_session(&make_summary(
2224            "s3",
2225            "gemini",
2226            "Gemini test",
2227            "2024-01-03T00:00:00Z",
2228        ))
2229        .unwrap();
2230        db.upsert_remote_session(&make_summary(
2231            "s4",
2232            "claude-code",
2233            "Error handling",
2234            "2024-01-04T00:00:00Z",
2235        ))
2236        .unwrap();
2237        db.upsert_remote_session(&make_summary(
2238            "s5",
2239            "claude-code",
2240            "Final polish",
2241            "2024-01-05T00:00:00Z",
2242        ))
2243        .unwrap();
2244    }
2245
2246    // ── list_sessions_log tests ────────────────────────────────────────
2247
2248    #[test]
2249    fn test_log_no_filters() {
2250        let db = test_db();
2251        seed_sessions(&db);
2252        let filter = LogFilter::default();
2253        let results = db.list_sessions_log(&filter).unwrap();
2254        assert_eq!(results.len(), 5);
2255        // Should be ordered by created_at DESC
2256        assert_eq!(results[0].id, "s5");
2257        assert_eq!(results[4].id, "s1");
2258    }
2259
2260    #[test]
2261    fn test_log_filter_by_tool() {
2262        let db = test_db();
2263        seed_sessions(&db);
2264        let filter = LogFilter {
2265            tool: Some("claude-code".to_string()),
2266            ..Default::default()
2267        };
2268        let results = db.list_sessions_log(&filter).unwrap();
2269        assert_eq!(results.len(), 4);
2270        assert!(results.iter().all(|s| s.tool == "claude-code"));
2271    }
2272
2273    #[test]
2274    fn test_log_filter_by_model_wildcard() {
2275        let db = test_db();
2276        seed_sessions(&db);
2277        let filter = LogFilter {
2278            model: Some("claude*".to_string()),
2279            ..Default::default()
2280        };
2281        let results = db.list_sessions_log(&filter).unwrap();
2282        assert_eq!(results.len(), 5); // all have claude-opus model
2283    }
2284
2285    #[test]
2286    fn test_log_filter_since() {
2287        let db = test_db();
2288        seed_sessions(&db);
2289        let filter = LogFilter {
2290            since: Some("2024-01-03T00:00:00Z".to_string()),
2291            ..Default::default()
2292        };
2293        let results = db.list_sessions_log(&filter).unwrap();
2294        assert_eq!(results.len(), 3); // s3, s4, s5
2295    }
2296
2297    #[test]
2298    fn test_log_filter_before() {
2299        let db = test_db();
2300        seed_sessions(&db);
2301        let filter = LogFilter {
2302            before: Some("2024-01-03T00:00:00Z".to_string()),
2303            ..Default::default()
2304        };
2305        let results = db.list_sessions_log(&filter).unwrap();
2306        assert_eq!(results.len(), 2); // s1, s2
2307    }
2308
2309    #[test]
2310    fn test_log_filter_since_and_before() {
2311        let db = test_db();
2312        seed_sessions(&db);
2313        let filter = LogFilter {
2314            since: Some("2024-01-02T00:00:00Z".to_string()),
2315            before: Some("2024-01-04T00:00:00Z".to_string()),
2316            ..Default::default()
2317        };
2318        let results = db.list_sessions_log(&filter).unwrap();
2319        assert_eq!(results.len(), 2); // s2, s3
2320    }
2321
2322    #[test]
2323    fn test_log_filter_grep() {
2324        let db = test_db();
2325        seed_sessions(&db);
2326        let filter = LogFilter {
2327            grep: Some("JWT".to_string()),
2328            ..Default::default()
2329        };
2330        let results = db.list_sessions_log(&filter).unwrap();
2331        assert_eq!(results.len(), 1);
2332        assert_eq!(results[0].id, "s2");
2333    }
2334
2335    #[test]
2336    fn test_log_limit_and_offset() {
2337        let db = test_db();
2338        seed_sessions(&db);
2339        let filter = LogFilter {
2340            limit: Some(2),
2341            offset: Some(1),
2342            ..Default::default()
2343        };
2344        let results = db.list_sessions_log(&filter).unwrap();
2345        assert_eq!(results.len(), 2);
2346        assert_eq!(results[0].id, "s4"); // second most recent
2347        assert_eq!(results[1].id, "s3");
2348    }
2349
2350    #[test]
2351    fn test_log_limit_only() {
2352        let db = test_db();
2353        seed_sessions(&db);
2354        let filter = LogFilter {
2355            limit: Some(3),
2356            ..Default::default()
2357        };
2358        let results = db.list_sessions_log(&filter).unwrap();
2359        assert_eq!(results.len(), 3);
2360    }
2361
2362    #[test]
2363    fn test_list_sessions_limit_offset() {
2364        let db = test_db();
2365        seed_sessions(&db);
2366        let filter = LocalSessionFilter {
2367            limit: Some(2),
2368            offset: Some(1),
2369            ..Default::default()
2370        };
2371        let results = db.list_sessions(&filter).unwrap();
2372        assert_eq!(results.len(), 2);
2373        assert_eq!(results[0].id, "s4");
2374        assert_eq!(results[1].id, "s3");
2375    }
2376
2377    #[test]
2378    fn test_count_sessions_filtered() {
2379        let db = test_db();
2380        seed_sessions(&db);
2381        let count = db
2382            .count_sessions_filtered(&LocalSessionFilter::default())
2383            .unwrap();
2384        assert_eq!(count, 5);
2385    }
2386
2387    #[test]
2388    fn test_list_working_directories_distinct_non_empty() {
2389        let db = test_db();
2390
2391        let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2392        a.working_directory = Some("/tmp/repo-a".to_string());
2393        let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2394        b.working_directory = Some("/tmp/repo-a".to_string());
2395        let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2396        c.working_directory = Some("/tmp/repo-b".to_string());
2397        let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2398        d.working_directory = Some("".to_string());
2399
2400        db.upsert_remote_session(&a).unwrap();
2401        db.upsert_remote_session(&b).unwrap();
2402        db.upsert_remote_session(&c).unwrap();
2403        db.upsert_remote_session(&d).unwrap();
2404
2405        let dirs = db.list_working_directories().unwrap();
2406        assert_eq!(
2407            dirs,
2408            vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2409        );
2410    }
2411
2412    #[test]
2413    fn test_list_session_tools() {
2414        let db = test_db();
2415        seed_sessions(&db);
2416        let tools = db
2417            .list_session_tools(&LocalSessionFilter::default())
2418            .unwrap();
2419        assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2420    }
2421
2422    #[test]
2423    fn test_log_combined_filters() {
2424        let db = test_db();
2425        seed_sessions(&db);
2426        let filter = LogFilter {
2427            tool: Some("claude-code".to_string()),
2428            since: Some("2024-01-03T00:00:00Z".to_string()),
2429            limit: Some(1),
2430            ..Default::default()
2431        };
2432        let results = db.list_sessions_log(&filter).unwrap();
2433        assert_eq!(results.len(), 1);
2434        assert_eq!(results[0].id, "s5"); // most recent claude-code after Jan 3
2435    }
2436
2437    // ── Session offset/latest tests ────────────────────────────────────
2438
2439    #[test]
2440    fn test_get_session_by_offset() {
2441        let db = test_db();
2442        seed_sessions(&db);
2443        let row = db.get_session_by_offset(0).unwrap().unwrap();
2444        assert_eq!(row.id, "s5"); // most recent
2445        let row = db.get_session_by_offset(2).unwrap().unwrap();
2446        assert_eq!(row.id, "s3");
2447        assert!(db.get_session_by_offset(10).unwrap().is_none());
2448    }
2449
2450    #[test]
2451    fn test_get_session_by_tool_offset() {
2452        let db = test_db();
2453        seed_sessions(&db);
2454        let row = db
2455            .get_session_by_tool_offset("claude-code", 0)
2456            .unwrap()
2457            .unwrap();
2458        assert_eq!(row.id, "s5");
2459        let row = db
2460            .get_session_by_tool_offset("claude-code", 1)
2461            .unwrap()
2462            .unwrap();
2463        assert_eq!(row.id, "s4");
2464        let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2465        assert_eq!(row.id, "s3");
2466        assert!(db
2467            .get_session_by_tool_offset("gemini", 1)
2468            .unwrap()
2469            .is_none());
2470    }
2471
2472    #[test]
2473    fn test_get_sessions_latest() {
2474        let db = test_db();
2475        seed_sessions(&db);
2476        let rows = db.get_sessions_latest(3).unwrap();
2477        assert_eq!(rows.len(), 3);
2478        assert_eq!(rows[0].id, "s5");
2479        assert_eq!(rows[1].id, "s4");
2480        assert_eq!(rows[2].id, "s3");
2481    }
2482
2483    #[test]
2484    fn test_get_sessions_by_tool_latest() {
2485        let db = test_db();
2486        seed_sessions(&db);
2487        let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2488        assert_eq!(rows.len(), 2);
2489        assert_eq!(rows[0].id, "s5");
2490        assert_eq!(rows[1].id, "s4");
2491    }
2492
2493    #[test]
2494    fn test_get_sessions_latest_more_than_available() {
2495        let db = test_db();
2496        seed_sessions(&db);
2497        let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2498        assert_eq!(rows.len(), 1); // only 1 gemini session
2499    }
2500
2501    #[test]
2502    fn test_session_count() {
2503        let db = test_db();
2504        assert_eq!(db.session_count().unwrap(), 0);
2505        seed_sessions(&db);
2506        assert_eq!(db.session_count().unwrap(), 5);
2507    }
2508
2509    // ── Commit link tests ─────────────────────────────────────────────
2510
2511    #[test]
2512    fn test_link_commit_session() {
2513        let db = test_db();
2514        seed_sessions(&db);
2515        db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2516            .unwrap();
2517
2518        let commits = db.get_commits_by_session("s1").unwrap();
2519        assert_eq!(commits.len(), 1);
2520        assert_eq!(commits[0].commit_hash, "abc123");
2521        assert_eq!(commits[0].session_id, "s1");
2522        assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2523        assert_eq!(commits[0].branch.as_deref(), Some("main"));
2524
2525        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2526        assert_eq!(sessions.len(), 1);
2527        assert_eq!(sessions[0].id, "s1");
2528    }
2529
2530    #[test]
2531    fn test_get_sessions_by_commit() {
2532        let db = test_db();
2533        seed_sessions(&db);
2534        // Link multiple sessions to the same commit
2535        db.link_commit_session("abc123", "s1", None, None).unwrap();
2536        db.link_commit_session("abc123", "s2", None, None).unwrap();
2537        db.link_commit_session("abc123", "s3", None, None).unwrap();
2538
2539        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2540        assert_eq!(sessions.len(), 3);
2541        // Ordered by created_at DESC
2542        assert_eq!(sessions[0].id, "s3");
2543        assert_eq!(sessions[1].id, "s2");
2544        assert_eq!(sessions[2].id, "s1");
2545    }
2546
2547    #[test]
2548    fn test_get_commits_by_session() {
2549        let db = test_db();
2550        seed_sessions(&db);
2551        // Link multiple commits to the same session
2552        db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2553            .unwrap();
2554        db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2555            .unwrap();
2556        db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2557            .unwrap();
2558
2559        let commits = db.get_commits_by_session("s1").unwrap();
2560        assert_eq!(commits.len(), 3);
2561        // All linked to s1
2562        assert!(commits.iter().all(|c| c.session_id == "s1"));
2563    }
2564
2565    #[test]
2566    fn test_duplicate_link_ignored() {
2567        let db = test_db();
2568        seed_sessions(&db);
2569        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2570            .unwrap();
2571        // Inserting the same link again should not error
2572        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2573            .unwrap();
2574
2575        let commits = db.get_commits_by_session("s1").unwrap();
2576        assert_eq!(commits.len(), 1);
2577    }
2578
2579    #[test]
2580    fn test_log_filter_by_commit() {
2581        let db = test_db();
2582        seed_sessions(&db);
2583        db.link_commit_session("abc123", "s2", None, None).unwrap();
2584        db.link_commit_session("abc123", "s4", None, None).unwrap();
2585
2586        let filter = LogFilter {
2587            commit: Some("abc123".to_string()),
2588            ..Default::default()
2589        };
2590        let results = db.list_sessions_log(&filter).unwrap();
2591        assert_eq!(results.len(), 2);
2592        assert_eq!(results[0].id, "s4");
2593        assert_eq!(results[1].id, "s2");
2594
2595        // Non-existent commit returns nothing
2596        let filter = LogFilter {
2597            commit: Some("nonexistent".to_string()),
2598            ..Default::default()
2599        };
2600        let results = db.list_sessions_log(&filter).unwrap();
2601        assert_eq!(results.len(), 0);
2602    }
2603}