Skip to main content

opensession_local_db/
lib.rs

1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16// Keep local cache schema aligned with remote/session schema migrations.
17const REMOTE_MIGRATIONS: &[Migration] = &[
18    ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19    (
20        "0002_team_invite_keys",
21        include_str!("../migrations/0002_team_invite_keys.sql"),
22    ),
23    (
24        "0003_max_active_agents",
25        include_str!("../migrations/0003_max_active_agents.sql"),
26    ),
27    (
28        "0004_oauth_states_provider",
29        include_str!("../migrations/0004_oauth_states_provider.sql"),
30    ),
31    (
32        "0005_sessions_body_url_backfill",
33        include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34    ),
35    (
36        "0006_sessions_remove_fk_constraints",
37        include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38    ),
39    (
40        "0007_sessions_list_perf_indexes",
41        include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42    ),
43    (
44        "0008_teams_force_public",
45        include_str!("../migrations/0008_teams_force_public.sql"),
46    ),
47    (
48        "0009_session_score_plugin",
49        include_str!("../migrations/0009_session_score_plugin.sql"),
50    ),
51];
52
53const LOCAL_MIGRATIONS: &[Migration] = &[
54    (
55        "local_0001_schema",
56        include_str!("../migrations/local_0001_schema.sql"),
57    ),
58    (
59        "local_0002_drop_unused_local_sessions",
60        include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
61    ),
62    (
63        "local_0003_timeline_summary_cache",
64        include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
65    ),
66];
67
68/// A local session row stored in the local SQLite database.
69#[derive(Debug, Clone)]
70pub struct LocalSessionRow {
71    pub id: String,
72    pub source_path: Option<String>,
73    pub sync_status: String,
74    pub last_synced_at: Option<String>,
75    pub user_id: Option<String>,
76    pub nickname: Option<String>,
77    pub team_id: Option<String>,
78    pub tool: String,
79    pub agent_provider: Option<String>,
80    pub agent_model: Option<String>,
81    pub title: Option<String>,
82    pub description: Option<String>,
83    pub tags: Option<String>,
84    pub created_at: String,
85    pub uploaded_at: Option<String>,
86    pub message_count: i64,
87    pub user_message_count: i64,
88    pub task_count: i64,
89    pub event_count: i64,
90    pub duration_seconds: i64,
91    pub total_input_tokens: i64,
92    pub total_output_tokens: i64,
93    pub git_remote: Option<String>,
94    pub git_branch: Option<String>,
95    pub git_commit: Option<String>,
96    pub git_repo_name: Option<String>,
97    pub pr_number: Option<i64>,
98    pub pr_url: Option<String>,
99    pub working_directory: Option<String>,
100    pub files_modified: Option<String>,
101    pub files_read: Option<String>,
102    pub has_errors: bool,
103    pub max_active_agents: i64,
104}
105
106/// A link between a git commit and an AI session.
107#[derive(Debug, Clone)]
108pub struct CommitLink {
109    pub commit_hash: String,
110    pub session_id: String,
111    pub repo_path: Option<String>,
112    pub branch: Option<String>,
113    pub created_at: String,
114}
115
116/// A cached timeline summary row stored in local DB.
117#[derive(Debug, Clone)]
118pub struct TimelineSummaryCacheRow {
119    pub lookup_key: String,
120    pub namespace: String,
121    pub compact: String,
122    pub payload: String,
123    pub raw: String,
124    pub cached_at: String,
125}
126
127/// Return true when a cached row corresponds to an OpenCode child session.
128pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
129    if row.tool != "opencode" {
130        return false;
131    }
132
133    // Strong heuristic: some orchestration artifacts are generated as sessions
134    // that do not carry a user-visible request surface and only contain system
135    // or agent-only events.
136    if row.user_message_count <= 0
137        && row.message_count <= 4
138        && row.task_count <= 4
139        && row.event_count > 0
140        && row.event_count <= 16
141    {
142        return true;
143    }
144
145    if is_opencode_subagent_source(row.source_path.as_deref()) {
146        return true;
147    }
148
149    let source_path = match row.source_path.as_deref() {
150        Some(path) if !path.trim().is_empty() => path,
151        _ => return false,
152    };
153
154    parse_opencode_parent_session_id(source_path)
155        .is_some_and(|parent_id| !parent_id.trim().is_empty())
156}
157
158/// Parse `parentID` / `parentId` from an OpenCode session JSON file.
159pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
160    let text = fs::read_to_string(source_path).ok()?;
161    let json: Value = serde_json::from_str(&text).ok()?;
162    lookup_parent_session_id(&json)
163}
164
165fn lookup_parent_session_id(value: &Value) -> Option<String> {
166    match value {
167        Value::Object(obj) => {
168            for (key, value) in obj {
169                if is_parent_id_key(key) {
170                    if let Some(parent_id) = value.as_str() {
171                        let parent_id = parent_id.trim();
172                        if !parent_id.is_empty() {
173                            return Some(parent_id.to_string());
174                        }
175                    }
176                }
177                if let Some(parent_id) = lookup_parent_session_id(value) {
178                    return Some(parent_id);
179                }
180            }
181            None
182        }
183        Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
184        _ => None,
185    }
186}
187
188fn is_parent_id_key(key: &str) -> bool {
189    let flat = key
190        .chars()
191        .filter(|c| c.is_ascii_alphanumeric())
192        .map(|c| c.to_ascii_lowercase())
193        .collect::<String>();
194
195    flat == "parentid"
196        || flat == "parentuuid"
197        || flat == "parentsessionid"
198        || flat == "parentsessionuuid"
199        || flat.ends_with("parentsessionid")
200        || (flat.contains("parent") && flat.ends_with("id"))
201        || (flat.contains("parent") && flat.ends_with("uuid"))
202}
203
204/// Remove OpenCode child sessions so only parent sessions remain visible.
205pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
206    rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
207    rows
208}
209
210fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
211    is_subagent_source(source_path)
212}
213
214fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
215    if row.tool != "claude-code" {
216        return false;
217    }
218
219    is_subagent_source(row.source_path.as_deref())
220}
221
222fn is_subagent_source(source_path: Option<&str>) -> bool {
223    let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
224        return false;
225    };
226
227    if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
228        return true;
229    }
230
231    let filename = match std::path::Path::new(&source_path).file_name() {
232        Some(name) => name.to_string_lossy(),
233        None => return false,
234    };
235
236    filename.starts_with("agent-")
237        || filename.starts_with("agent_")
238        || filename.starts_with("subagent-")
239        || filename.starts_with("subagent_")
240}
241
242/// Filter for listing sessions from the local DB.
243#[derive(Debug, Clone)]
244pub struct LocalSessionFilter {
245    pub team_id: Option<String>,
246    pub sync_status: Option<String>,
247    pub git_repo_name: Option<String>,
248    pub search: Option<String>,
249    pub tool: Option<String>,
250    pub sort: LocalSortOrder,
251    pub time_range: LocalTimeRange,
252    pub limit: Option<u32>,
253    pub offset: Option<u32>,
254}
255
256impl Default for LocalSessionFilter {
257    fn default() -> Self {
258        Self {
259            team_id: None,
260            sync_status: None,
261            git_repo_name: None,
262            search: None,
263            tool: None,
264            sort: LocalSortOrder::Recent,
265            time_range: LocalTimeRange::All,
266            limit: None,
267            offset: None,
268        }
269    }
270}
271
272/// Sort order for local session listing.
273#[derive(Debug, Clone, Default, PartialEq, Eq)]
274pub enum LocalSortOrder {
275    #[default]
276    Recent,
277    Popular,
278    Longest,
279}
280
281/// Time range filter for local session listing.
282#[derive(Debug, Clone, Default, PartialEq, Eq)]
283pub enum LocalTimeRange {
284    Hours24,
285    Days7,
286    Days30,
287    #[default]
288    All,
289}
290
291/// Minimal remote session payload needed for local cache upsert.
292#[derive(Debug, Clone)]
293pub struct RemoteSessionSummary {
294    pub id: String,
295    pub user_id: Option<String>,
296    pub nickname: Option<String>,
297    pub team_id: String,
298    pub tool: String,
299    pub agent_provider: Option<String>,
300    pub agent_model: Option<String>,
301    pub title: Option<String>,
302    pub description: Option<String>,
303    pub tags: Option<String>,
304    pub created_at: String,
305    pub uploaded_at: String,
306    pub message_count: i64,
307    pub task_count: i64,
308    pub event_count: i64,
309    pub duration_seconds: i64,
310    pub total_input_tokens: i64,
311    pub total_output_tokens: i64,
312    pub git_remote: Option<String>,
313    pub git_branch: Option<String>,
314    pub git_commit: Option<String>,
315    pub git_repo_name: Option<String>,
316    pub pr_number: Option<i64>,
317    pub pr_url: Option<String>,
318    pub working_directory: Option<String>,
319    pub files_modified: Option<String>,
320    pub files_read: Option<String>,
321    pub has_errors: bool,
322    pub max_active_agents: i64,
323}
324
325/// Extended filter for the `log` command.
326#[derive(Debug, Default)]
327pub struct LogFilter {
328    /// Filter by tool name (exact match).
329    pub tool: Option<String>,
330    /// Filter by model (glob-like, uses LIKE).
331    pub model: Option<String>,
332    /// Filter sessions created after this ISO8601 timestamp.
333    pub since: Option<String>,
334    /// Filter sessions created before this ISO8601 timestamp.
335    pub before: Option<String>,
336    /// Filter sessions that touched this file path (searches files_modified JSON).
337    pub touches: Option<String>,
338    /// Free-text search in title, description, tags.
339    pub grep: Option<String>,
340    /// Only sessions with errors.
341    pub has_errors: Option<bool>,
342    /// Filter by working directory (prefix match).
343    pub working_directory: Option<String>,
344    /// Filter by git repo name.
345    pub git_repo_name: Option<String>,
346    /// Filter sessions linked to this git commit hash.
347    pub commit: Option<String>,
348    /// Maximum number of results.
349    pub limit: Option<u32>,
350    /// Offset for pagination.
351    pub offset: Option<u32>,
352}
353
354/// Base FROM clause for session list queries.
355const FROM_CLAUSE: &str = "\
356FROM sessions s \
357LEFT JOIN session_sync ss ON ss.session_id = s.id \
358LEFT JOIN users u ON u.id = s.user_id";
359
360/// Local SQLite database shared by TUI and Daemon.
361/// Thread-safe: wraps the connection in a Mutex so it can be shared via `Arc<LocalDb>`.
362pub struct LocalDb {
363    conn: Mutex<Connection>,
364}
365
366impl LocalDb {
367    /// Open (or create) the local database at the default path.
368    /// `~/.local/share/opensession/local.db`
369    pub fn open() -> Result<Self> {
370        let path = default_db_path()?;
371        Self::open_path(&path)
372    }
373
374    /// Open (or create) the local database at a specific path.
375    pub fn open_path(path: &PathBuf) -> Result<Self> {
376        if let Some(parent) = path.parent() {
377            std::fs::create_dir_all(parent)
378                .with_context(|| format!("create dir for {}", path.display()))?;
379        }
380        match open_connection_with_latest_schema(path) {
381            Ok(conn) => Ok(Self {
382                conn: Mutex::new(conn),
383            }),
384            Err(err) => {
385                if !is_schema_compat_error(&err) {
386                    return Err(err);
387                }
388
389                // Local DB is a cache. If schema migration cannot safely reconcile
390                // a legacy/corrupted file, rotate it out and recreate with latest schema.
391                rotate_legacy_db(path)?;
392
393                let conn = open_connection_with_latest_schema(path)
394                    .with_context(|| format!("recreate db {}", path.display()))?;
395                Ok(Self {
396                    conn: Mutex::new(conn),
397                })
398            }
399        }
400    }
401
402    fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
403        self.conn.lock().expect("local db mutex poisoned")
404    }
405
406    // ── Upsert local session (parsed from file) ────────────────────────
407
408    pub fn upsert_local_session(
409        &self,
410        session: &Session,
411        source_path: &str,
412        git: &GitContext,
413    ) -> Result<()> {
414        let title = session.context.title.as_deref();
415        let description = session.context.description.as_deref();
416        let tags = if session.context.tags.is_empty() {
417            None
418        } else {
419            Some(session.context.tags.join(","))
420        };
421        let created_at = session.context.created_at.to_rfc3339();
422        let cwd = session
423            .context
424            .attributes
425            .get("cwd")
426            .or_else(|| session.context.attributes.get("working_directory"))
427            .and_then(|v| v.as_str().map(String::from));
428
429        // Extract files_modified, files_read, and has_errors from events
430        let (files_modified, files_read, has_errors) =
431            opensession_core::extract::extract_file_metadata(session);
432        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
433
434        let conn = self.conn();
435        conn.execute(
436            "INSERT INTO sessions \
437             (id, team_id, tool, agent_provider, agent_model, \
438              title, description, tags, created_at, \
439              message_count, user_message_count, task_count, event_count, duration_seconds, \
440              total_input_tokens, total_output_tokens, body_storage_key, \
441              git_remote, git_branch, git_commit, git_repo_name, working_directory, \
442              files_modified, files_read, has_errors, max_active_agents) \
443             VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
444             ON CONFLICT(id) DO UPDATE SET \
445              tool=excluded.tool, agent_provider=excluded.agent_provider, \
446              agent_model=excluded.agent_model, \
447              title=excluded.title, description=excluded.description, \
448              tags=excluded.tags, \
449              message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
450              task_count=excluded.task_count, \
451              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
452              total_input_tokens=excluded.total_input_tokens, \
453              total_output_tokens=excluded.total_output_tokens, \
454              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
455              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
456              working_directory=excluded.working_directory, \
457              files_modified=excluded.files_modified, files_read=excluded.files_read, \
458              has_errors=excluded.has_errors, \
459              max_active_agents=excluded.max_active_agents",
460            params![
461                &session.session_id,
462                &session.agent.tool,
463                &session.agent.provider,
464                &session.agent.model,
465                title,
466                description,
467                &tags,
468                &created_at,
469                session.stats.message_count as i64,
470                session.stats.user_message_count as i64,
471                session.stats.task_count as i64,
472                session.stats.event_count as i64,
473                session.stats.duration_seconds as i64,
474                session.stats.total_input_tokens as i64,
475                session.stats.total_output_tokens as i64,
476                &git.remote,
477                &git.branch,
478                &git.commit,
479                &git.repo_name,
480                &cwd,
481                &files_modified,
482                &files_read,
483                has_errors,
484                max_active_agents,
485            ],
486        )?;
487
488        conn.execute(
489            "INSERT INTO session_sync (session_id, source_path, sync_status) \
490             VALUES (?1, ?2, 'local_only') \
491             ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
492            params![&session.session_id, source_path],
493        )?;
494        Ok(())
495    }
496
497    // ── Upsert remote session (from server sync pull) ──────────────────
498
499    pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
500        let conn = self.conn();
501        conn.execute(
502            "INSERT INTO sessions \
503             (id, user_id, team_id, tool, agent_provider, agent_model, \
504              title, description, tags, created_at, uploaded_at, \
505              message_count, task_count, event_count, duration_seconds, \
506              total_input_tokens, total_output_tokens, body_storage_key, \
507              git_remote, git_branch, git_commit, git_repo_name, \
508              pr_number, pr_url, working_directory, \
509              files_modified, files_read, has_errors, max_active_agents) \
510             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
511             ON CONFLICT(id) DO UPDATE SET \
512              title=excluded.title, description=excluded.description, \
513              tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
514              message_count=excluded.message_count, task_count=excluded.task_count, \
515              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
516              total_input_tokens=excluded.total_input_tokens, \
517              total_output_tokens=excluded.total_output_tokens, \
518              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
519              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
520              pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
521              working_directory=excluded.working_directory, \
522              files_modified=excluded.files_modified, files_read=excluded.files_read, \
523              has_errors=excluded.has_errors, \
524              max_active_agents=excluded.max_active_agents",
525            params![
526                &summary.id,
527                &summary.user_id,
528                &summary.team_id,
529                &summary.tool,
530                &summary.agent_provider,
531                &summary.agent_model,
532                &summary.title,
533                &summary.description,
534                &summary.tags,
535                &summary.created_at,
536                &summary.uploaded_at,
537                summary.message_count,
538                summary.task_count,
539                summary.event_count,
540                summary.duration_seconds,
541                summary.total_input_tokens,
542                summary.total_output_tokens,
543                &summary.git_remote,
544                &summary.git_branch,
545                &summary.git_commit,
546                &summary.git_repo_name,
547                summary.pr_number,
548                &summary.pr_url,
549                &summary.working_directory,
550                &summary.files_modified,
551                &summary.files_read,
552                summary.has_errors,
553                summary.max_active_agents,
554            ],
555        )?;
556
557        conn.execute(
558            "INSERT INTO session_sync (session_id, sync_status) \
559             VALUES (?1, 'remote_only') \
560             ON CONFLICT(session_id) DO UPDATE SET \
561              sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
562            params![&summary.id],
563        )?;
564        Ok(())
565    }
566
567    // ── List sessions ──────────────────────────────────────────────────
568
569    fn build_local_session_where_clause(
570        filter: &LocalSessionFilter,
571    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
572        let mut where_clauses = vec![
573            "1=1".to_string(),
574            "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
575            "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
576            "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
577        ];
578        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
579        let mut idx = 1u32;
580
581        if let Some(ref team_id) = filter.team_id {
582            where_clauses.push(format!("s.team_id = ?{idx}"));
583            param_values.push(Box::new(team_id.clone()));
584            idx += 1;
585        }
586
587        if let Some(ref sync_status) = filter.sync_status {
588            where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
589            param_values.push(Box::new(sync_status.clone()));
590            idx += 1;
591        }
592
593        if let Some(ref repo) = filter.git_repo_name {
594            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
595            param_values.push(Box::new(repo.clone()));
596            idx += 1;
597        }
598
599        if let Some(ref tool) = filter.tool {
600            where_clauses.push(format!("s.tool = ?{idx}"));
601            param_values.push(Box::new(tool.clone()));
602            idx += 1;
603        }
604
605        if let Some(ref search) = filter.search {
606            let like = format!("%{search}%");
607            where_clauses.push(format!(
608                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
609                i1 = idx,
610                i2 = idx + 1,
611                i3 = idx + 2,
612            ));
613            param_values.push(Box::new(like.clone()));
614            param_values.push(Box::new(like.clone()));
615            param_values.push(Box::new(like));
616            idx += 3;
617        }
618
619        let interval = match filter.time_range {
620            LocalTimeRange::Hours24 => Some("-1 day"),
621            LocalTimeRange::Days7 => Some("-7 days"),
622            LocalTimeRange::Days30 => Some("-30 days"),
623            LocalTimeRange::All => None,
624        };
625        if let Some(interval) = interval {
626            where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
627            param_values.push(Box::new(interval.to_string()));
628        }
629
630        (where_clauses.join(" AND "), param_values)
631    }
632
633    pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
634        let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
635        let order_clause = match filter.sort {
636            LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
637            LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
638            LocalSortOrder::Recent => "s.created_at DESC",
639        };
640
641        let mut sql = format!(
642            "SELECT {LOCAL_SESSION_COLUMNS} \
643             {FROM_CLAUSE} WHERE {where_str} \
644             ORDER BY {order_clause}"
645        );
646
647        if let Some(limit) = filter.limit {
648            sql.push_str(" LIMIT ?");
649            param_values.push(Box::new(limit));
650            if let Some(offset) = filter.offset {
651                sql.push_str(" OFFSET ?");
652                param_values.push(Box::new(offset));
653            }
654        }
655
656        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
657            param_values.iter().map(|p| p.as_ref()).collect();
658        let conn = self.conn();
659        let mut stmt = conn.prepare(&sql)?;
660        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
661
662        let mut result = Vec::new();
663        for row in rows {
664            result.push(row?);
665        }
666
667        Ok(hide_opencode_child_sessions(result))
668    }
669
670    /// Count sessions for a given list filter (before UI-level page slicing).
671    pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
672        let mut count_filter = filter.clone();
673        count_filter.limit = None;
674        count_filter.offset = None;
675        let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
676        let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
677        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
678            param_values.iter().map(|p| p.as_ref()).collect();
679        let conn = self.conn();
680        let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
681        Ok(count)
682    }
683
684    /// List distinct tool names for the current list filter (ignores active tool filter).
685    pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
686        let mut tool_filter = filter.clone();
687        tool_filter.tool = None;
688        tool_filter.limit = None;
689        tool_filter.offset = None;
690        let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
691        let sql = format!(
692            "SELECT DISTINCT s.tool \
693             {FROM_CLAUSE} WHERE {where_str} \
694             ORDER BY s.tool ASC"
695        );
696        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
697            param_values.iter().map(|p| p.as_ref()).collect();
698        let conn = self.conn();
699        let mut stmt = conn.prepare(&sql)?;
700        let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
701
702        let mut tools = Vec::new();
703        for row in rows {
704            let tool = row?;
705            if !tool.trim().is_empty() {
706                tools.push(tool);
707            }
708        }
709        Ok(tools)
710    }
711
712    // ── Log query ─────────────────────────────────────────────────────
713
714    /// Query sessions with extended filters for the `log` command.
715    pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
716        let mut where_clauses = vec!["1=1".to_string()];
717        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
718        let mut idx = 1u32;
719
720        if let Some(ref tool) = filter.tool {
721            where_clauses.push(format!("s.tool = ?{idx}"));
722            param_values.push(Box::new(tool.clone()));
723            idx += 1;
724        }
725
726        if let Some(ref model) = filter.model {
727            let like = model.replace('*', "%");
728            where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
729            param_values.push(Box::new(like));
730            idx += 1;
731        }
732
733        if let Some(ref since) = filter.since {
734            where_clauses.push(format!("s.created_at >= ?{idx}"));
735            param_values.push(Box::new(since.clone()));
736            idx += 1;
737        }
738
739        if let Some(ref before) = filter.before {
740            where_clauses.push(format!("s.created_at < ?{idx}"));
741            param_values.push(Box::new(before.clone()));
742            idx += 1;
743        }
744
745        if let Some(ref touches) = filter.touches {
746            let like = format!("%\"{touches}\"%");
747            where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
748            param_values.push(Box::new(like));
749            idx += 1;
750        }
751
752        if let Some(ref grep) = filter.grep {
753            let like = format!("%{grep}%");
754            where_clauses.push(format!(
755                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
756                i1 = idx,
757                i2 = idx + 1,
758                i3 = idx + 2,
759            ));
760            param_values.push(Box::new(like.clone()));
761            param_values.push(Box::new(like.clone()));
762            param_values.push(Box::new(like));
763            idx += 3;
764        }
765
766        if let Some(true) = filter.has_errors {
767            where_clauses.push("s.has_errors = 1".to_string());
768        }
769
770        if let Some(ref wd) = filter.working_directory {
771            where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
772            param_values.push(Box::new(format!("{wd}%")));
773            idx += 1;
774        }
775
776        if let Some(ref repo) = filter.git_repo_name {
777            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
778            param_values.push(Box::new(repo.clone()));
779            idx += 1;
780        }
781
782        // Optional JOIN for commit hash filter
783        let mut extra_join = String::new();
784        if let Some(ref commit) = filter.commit {
785            extra_join =
786                " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
787            where_clauses.push(format!("csl.commit_hash = ?{idx}"));
788            param_values.push(Box::new(commit.clone()));
789            idx += 1;
790        }
791
792        let _ = idx; // suppress unused warning
793
794        let where_str = where_clauses.join(" AND ");
795        let mut sql = format!(
796            "SELECT {LOCAL_SESSION_COLUMNS} \
797             {FROM_CLAUSE}{extra_join} WHERE {where_str} \
798             ORDER BY s.created_at DESC"
799        );
800
801        if let Some(limit) = filter.limit {
802            sql.push_str(" LIMIT ?");
803            param_values.push(Box::new(limit));
804            if let Some(offset) = filter.offset {
805                sql.push_str(" OFFSET ?");
806                param_values.push(Box::new(offset));
807            }
808        }
809
810        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
811            param_values.iter().map(|p| p.as_ref()).collect();
812        let conn = self.conn();
813        let mut stmt = conn.prepare(&sql)?;
814        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
815
816        let mut result = Vec::new();
817        for row in rows {
818            result.push(row?);
819        }
820        Ok(hide_opencode_child_sessions(result))
821    }
822
823    /// Get the latest N sessions for a specific tool, ordered by created_at DESC.
824    pub fn get_sessions_by_tool_latest(
825        &self,
826        tool: &str,
827        count: u32,
828    ) -> Result<Vec<LocalSessionRow>> {
829        let sql = format!(
830            "SELECT {LOCAL_SESSION_COLUMNS} \
831             {FROM_CLAUSE} WHERE s.tool = ?1 \
832             ORDER BY s.created_at DESC"
833        );
834        let conn = self.conn();
835        let mut stmt = conn.prepare(&sql)?;
836        let rows = stmt.query_map(params![tool], row_to_local_session)?;
837        let mut result = Vec::new();
838        for row in rows {
839            result.push(row?);
840        }
841
842        let mut filtered = hide_opencode_child_sessions(result);
843        filtered.truncate(count as usize);
844        Ok(filtered)
845    }
846
847    /// Get the latest N sessions across all tools, ordered by created_at DESC.
848    pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
849        let sql = format!(
850            "SELECT {LOCAL_SESSION_COLUMNS} \
851             {FROM_CLAUSE} \
852             ORDER BY s.created_at DESC"
853        );
854        let conn = self.conn();
855        let mut stmt = conn.prepare(&sql)?;
856        let rows = stmt.query_map([], row_to_local_session)?;
857        let mut result = Vec::new();
858        for row in rows {
859            result.push(row?);
860        }
861
862        let mut filtered = hide_opencode_child_sessions(result);
863        filtered.truncate(count as usize);
864        Ok(filtered)
865    }
866
867    /// Get the Nth most recent session for a specific tool (0 = HEAD, 1 = HEAD~1, etc.).
868    pub fn get_session_by_tool_offset(
869        &self,
870        tool: &str,
871        offset: u32,
872    ) -> Result<Option<LocalSessionRow>> {
873        let sql = format!(
874            "SELECT {LOCAL_SESSION_COLUMNS} \
875             {FROM_CLAUSE} WHERE s.tool = ?1 \
876             ORDER BY s.created_at DESC"
877        );
878        let conn = self.conn();
879        let mut stmt = conn.prepare(&sql)?;
880        let rows = stmt.query_map(params![tool], row_to_local_session)?;
881        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
882        Ok(filtered.into_iter().nth(offset as usize))
883    }
884
885    /// Get the Nth most recent session across all tools (0 = HEAD, 1 = HEAD~1, etc.).
886    pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
887        let sql = format!(
888            "SELECT {LOCAL_SESSION_COLUMNS} \
889             {FROM_CLAUSE} \
890             ORDER BY s.created_at DESC"
891        );
892        let conn = self.conn();
893        let mut stmt = conn.prepare(&sql)?;
894        let rows = stmt.query_map([], row_to_local_session)?;
895        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
896        Ok(filtered.into_iter().nth(offset as usize))
897    }
898
899    /// Fetch the source path used when the session was last parsed/loaded.
900    pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
901        let conn = self.conn();
902        let result = conn
903            .query_row(
904                "SELECT source_path FROM session_sync WHERE session_id = ?1",
905                params![session_id],
906                |row| row.get(0),
907            )
908            .optional()?;
909
910        Ok(result)
911    }
912
913    /// Count total sessions in the local DB.
914    pub fn session_count(&self) -> Result<i64> {
915        let count = self
916            .conn()
917            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
918        Ok(count)
919    }
920
921    // ── Delete session ─────────────────────────────────────────────────
922
923    pub fn delete_session(&self, session_id: &str) -> Result<()> {
924        let conn = self.conn();
925        conn.execute(
926            "DELETE FROM body_cache WHERE session_id = ?1",
927            params![session_id],
928        )?;
929        conn.execute(
930            "DELETE FROM session_sync WHERE session_id = ?1",
931            params![session_id],
932        )?;
933        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
934        Ok(())
935    }
936
937    // ── Sync cursor ────────────────────────────────────────────────────
938
939    pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
940        let cursor = self
941            .conn()
942            .query_row(
943                "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
944                params![team_id],
945                |row| row.get(0),
946            )
947            .optional()?;
948        Ok(cursor)
949    }
950
951    pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
952        self.conn().execute(
953            "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
954             VALUES (?1, ?2, datetime('now')) \
955             ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
956            params![team_id, cursor],
957        )?;
958        Ok(())
959    }
960
961    // ── Upload tracking ────────────────────────────────────────────────
962
963    /// Get sessions that are local_only and need to be uploaded.
964    pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
965        let sql = format!(
966            "SELECT {LOCAL_SESSION_COLUMNS} \
967             FROM sessions s \
968             INNER JOIN session_sync ss ON ss.session_id = s.id \
969             LEFT JOIN users u ON u.id = s.user_id \
970             WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
971             ORDER BY s.created_at ASC"
972        );
973        let conn = self.conn();
974        let mut stmt = conn.prepare(&sql)?;
975        let rows = stmt.query_map(params![team_id], row_to_local_session)?;
976        let mut result = Vec::new();
977        for row in rows {
978            result.push(row?);
979        }
980        Ok(result)
981    }
982
983    pub fn mark_synced(&self, session_id: &str) -> Result<()> {
984        self.conn().execute(
985            "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
986             WHERE session_id = ?1",
987            params![session_id],
988        )?;
989        Ok(())
990    }
991
992    /// Check if a session was already uploaded (synced or remote_only) since the given modification time.
993    pub fn was_uploaded_after(
994        &self,
995        source_path: &str,
996        modified: &chrono::DateTime<chrono::Utc>,
997    ) -> Result<bool> {
998        let result: Option<String> = self
999            .conn()
1000            .query_row(
1001                "SELECT last_synced_at FROM session_sync \
1002                 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1003                params![source_path],
1004                |row| row.get(0),
1005            )
1006            .optional()?;
1007
1008        if let Some(synced_at) = result {
1009            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1010                return Ok(dt >= *modified);
1011            }
1012        }
1013        Ok(false)
1014    }
1015
1016    // ── Body cache ─────────────────────────────────────────────────────
1017
1018    pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1019        self.conn().execute(
1020            "INSERT INTO body_cache (session_id, body, cached_at) \
1021             VALUES (?1, ?2, datetime('now')) \
1022             ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1023            params![session_id, body],
1024        )?;
1025        Ok(())
1026    }
1027
1028    pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1029        let body = self
1030            .conn()
1031            .query_row(
1032                "SELECT body FROM body_cache WHERE session_id = ?1",
1033                params![session_id],
1034                |row| row.get(0),
1035            )
1036            .optional()?;
1037        Ok(body)
1038    }
1039
1040    // ── Timeline summary cache ────────────────────────────────────────
1041
1042    pub fn upsert_timeline_summary_cache(
1043        &self,
1044        lookup_key: &str,
1045        namespace: &str,
1046        compact: &str,
1047        payload: &str,
1048        raw: &str,
1049    ) -> Result<()> {
1050        self.conn().execute(
1051            "INSERT INTO timeline_summary_cache \
1052             (lookup_key, namespace, compact, payload, raw, cached_at) \
1053             VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1054             ON CONFLICT(lookup_key) DO UPDATE SET \
1055               namespace = excluded.namespace, \
1056               compact = excluded.compact, \
1057               payload = excluded.payload, \
1058               raw = excluded.raw, \
1059               cached_at = datetime('now')",
1060            params![lookup_key, namespace, compact, payload, raw],
1061        )?;
1062        Ok(())
1063    }
1064
1065    pub fn list_timeline_summary_cache_by_namespace(
1066        &self,
1067        namespace: &str,
1068    ) -> Result<Vec<TimelineSummaryCacheRow>> {
1069        let conn = self.conn();
1070        let mut stmt = conn.prepare(
1071            "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1072             FROM timeline_summary_cache \
1073             WHERE namespace = ?1 \
1074             ORDER BY cached_at DESC",
1075        )?;
1076        let rows = stmt.query_map(params![namespace], |row| {
1077            Ok(TimelineSummaryCacheRow {
1078                lookup_key: row.get(0)?,
1079                namespace: row.get(1)?,
1080                compact: row.get(2)?,
1081                payload: row.get(3)?,
1082                raw: row.get(4)?,
1083                cached_at: row.get(5)?,
1084            })
1085        })?;
1086
1087        let mut out = Vec::new();
1088        for row in rows {
1089            out.push(row?);
1090        }
1091        Ok(out)
1092    }
1093
1094    pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1095        let affected = self
1096            .conn()
1097            .execute("DELETE FROM timeline_summary_cache", [])?;
1098        Ok(affected)
1099    }
1100
1101    // ── Migration helper ───────────────────────────────────────────────
1102
1103    /// Migrate entries from the old state.json UploadState into the local DB.
1104    /// Marks them as `synced` with no metadata (we only know the file path was uploaded).
1105    pub fn migrate_from_state_json(
1106        &self,
1107        uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1108    ) -> Result<usize> {
1109        let mut count = 0;
1110        for (path, uploaded_at) in uploaded {
1111            let exists: bool = self
1112                .conn()
1113                .query_row(
1114                    "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1115                    params![path],
1116                    |row| row.get(0),
1117                )
1118                .unwrap_or(false);
1119
1120            if exists {
1121                self.conn().execute(
1122                    "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1123                     WHERE source_path = ?2 AND sync_status = 'local_only'",
1124                    params![uploaded_at.to_rfc3339(), path],
1125                )?;
1126                count += 1;
1127            }
1128        }
1129        Ok(count)
1130    }
1131
1132    // ── Commit ↔ session linking ────────────────────────────────────
1133
1134    /// Link a git commit to an AI session.
1135    pub fn link_commit_session(
1136        &self,
1137        commit_hash: &str,
1138        session_id: &str,
1139        repo_path: Option<&str>,
1140        branch: Option<&str>,
1141    ) -> Result<()> {
1142        self.conn().execute(
1143            "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1144             VALUES (?1, ?2, ?3, ?4) \
1145             ON CONFLICT(commit_hash, session_id) DO NOTHING",
1146            params![commit_hash, session_id, repo_path, branch],
1147        )?;
1148        Ok(())
1149    }
1150
1151    /// Get all sessions linked to a git commit.
1152    pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1153        let sql = format!(
1154            "SELECT {LOCAL_SESSION_COLUMNS} \
1155             {FROM_CLAUSE} \
1156             INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1157             WHERE csl.commit_hash = ?1 \
1158             ORDER BY s.created_at DESC"
1159        );
1160        let conn = self.conn();
1161        let mut stmt = conn.prepare(&sql)?;
1162        let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1163        let mut result = Vec::new();
1164        for row in rows {
1165            result.push(row?);
1166        }
1167        Ok(result)
1168    }
1169
1170    /// Get all commits linked to a session.
1171    pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1172        let conn = self.conn();
1173        let mut stmt = conn.prepare(
1174            "SELECT commit_hash, session_id, repo_path, branch, created_at \
1175             FROM commit_session_links WHERE session_id = ?1 \
1176             ORDER BY created_at DESC",
1177        )?;
1178        let rows = stmt.query_map(params![session_id], |row| {
1179            Ok(CommitLink {
1180                commit_hash: row.get(0)?,
1181                session_id: row.get(1)?,
1182                repo_path: row.get(2)?,
1183                branch: row.get(3)?,
1184                created_at: row.get(4)?,
1185            })
1186        })?;
1187        let mut result = Vec::new();
1188        for row in rows {
1189            result.push(row?);
1190        }
1191        Ok(result)
1192    }
1193
1194    /// Find the most recently active session for a given repo path.
1195    /// "Active" means the session's working_directory matches the repo path
1196    /// and was created within the last `since_minutes` minutes.
1197    pub fn find_active_session_for_repo(
1198        &self,
1199        repo_path: &str,
1200        since_minutes: u32,
1201    ) -> Result<Option<LocalSessionRow>> {
1202        let sql = format!(
1203            "SELECT {LOCAL_SESSION_COLUMNS} \
1204             {FROM_CLAUSE} \
1205             WHERE s.working_directory LIKE ?1 \
1206             AND s.created_at >= datetime('now', ?2) \
1207             ORDER BY s.created_at DESC LIMIT 1"
1208        );
1209        let since = format!("-{since_minutes} minutes");
1210        let like = format!("{repo_path}%");
1211        let conn = self.conn();
1212        let mut stmt = conn.prepare(&sql)?;
1213        let row = stmt
1214            .query_map(params![like, since], row_to_local_session)?
1215            .next()
1216            .transpose()?;
1217        Ok(row)
1218    }
1219
1220    /// Get all session IDs currently in the local DB.
1221    pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1222        let conn = self.conn();
1223        let mut stmt = conn
1224            .prepare("SELECT id FROM sessions")
1225            .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1226        let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1227        let mut set = std::collections::HashSet::new();
1228        if let Ok(rows) = rows {
1229            for row in rows.flatten() {
1230                set.insert(row);
1231            }
1232        }
1233        set
1234    }
1235
1236    /// Update only stats fields for an existing session (no git context re-extraction).
1237    pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1238        let title = session.context.title.as_deref();
1239        let description = session.context.description.as_deref();
1240        let (files_modified, files_read, has_errors) =
1241            opensession_core::extract::extract_file_metadata(session);
1242        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1243
1244        self.conn().execute(
1245            "UPDATE sessions SET \
1246             title=?2, description=?3, \
1247             message_count=?4, user_message_count=?5, task_count=?6, \
1248             event_count=?7, duration_seconds=?8, \
1249             total_input_tokens=?9, total_output_tokens=?10, \
1250             files_modified=?11, files_read=?12, has_errors=?13, \
1251             max_active_agents=?14 \
1252             WHERE id=?1",
1253            params![
1254                &session.session_id,
1255                title,
1256                description,
1257                session.stats.message_count as i64,
1258                session.stats.user_message_count as i64,
1259                session.stats.task_count as i64,
1260                session.stats.event_count as i64,
1261                session.stats.duration_seconds as i64,
1262                session.stats.total_input_tokens as i64,
1263                session.stats.total_output_tokens as i64,
1264                &files_modified,
1265                &files_read,
1266                has_errors,
1267                max_active_agents,
1268            ],
1269        )?;
1270        Ok(())
1271    }
1272
1273    /// Update only sync metadata path for an existing session.
1274    pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1275        self.conn().execute(
1276            "INSERT INTO session_sync (session_id, source_path) \
1277             VALUES (?1, ?2) \
1278             ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1279            params![session_id, source_path],
1280        )?;
1281        Ok(())
1282    }
1283
1284    /// Get a list of distinct git repo names present in the DB.
1285    pub fn list_repos(&self) -> Result<Vec<String>> {
1286        let conn = self.conn();
1287        let mut stmt = conn.prepare(
1288            "SELECT DISTINCT git_repo_name FROM sessions \
1289             WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1290        )?;
1291        let rows = stmt.query_map([], |row| row.get(0))?;
1292        let mut result = Vec::new();
1293        for row in rows {
1294            result.push(row?);
1295        }
1296        Ok(result)
1297    }
1298
1299    /// Get a list of distinct, non-empty working directories present in the DB.
1300    pub fn list_working_directories(&self) -> Result<Vec<String>> {
1301        let conn = self.conn();
1302        let mut stmt = conn.prepare(
1303            "SELECT DISTINCT working_directory FROM sessions \
1304             WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1305             ORDER BY working_directory ASC",
1306        )?;
1307        let rows = stmt.query_map([], |row| row.get(0))?;
1308        let mut result = Vec::new();
1309        for row in rows {
1310            result.push(row?);
1311        }
1312        Ok(result)
1313    }
1314}
1315
1316// ── Legacy schema backfill ─────────────────────────────────────────────
1317
1318fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1319    let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1320    conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1321
1322    // Disable FK constraints for local DB (it's a cache, not source of truth)
1323    conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1324
1325    apply_local_migrations(&conn)?;
1326
1327    // Backfill missing columns for legacy local DBs where `sessions` already
1328    // existed before newer fields were introduced.
1329    ensure_sessions_columns(&conn)?;
1330    validate_local_schema(&conn)?;
1331
1332    Ok(conn)
1333}
1334
1335fn apply_local_migrations(conn: &Connection) -> Result<()> {
1336    conn.execute_batch(
1337        "CREATE TABLE IF NOT EXISTS _migrations (
1338            id INTEGER PRIMARY KEY,
1339            name TEXT NOT NULL UNIQUE,
1340            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1341        );",
1342    )
1343    .context("create _migrations table for local db")?;
1344
1345    for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1346        let already_applied: bool = conn
1347            .query_row(
1348                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1349                [name],
1350                |row| row.get(0),
1351            )
1352            .unwrap_or(false);
1353
1354        if already_applied {
1355            continue;
1356        }
1357
1358        if let Err(e) = conn.execute_batch(sql) {
1359            let msg = e.to_string().to_ascii_lowercase();
1360            if !is_local_migration_compat_error(&msg) {
1361                return Err(e).with_context(|| format!("apply local migration {name}"));
1362            }
1363        }
1364
1365        conn.execute(
1366            "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1367            [name],
1368        )
1369        .with_context(|| format!("record local migration {name}"))?;
1370    }
1371
1372    Ok(())
1373}
1374
1375fn is_local_migration_compat_error(msg: &str) -> bool {
1376    msg.contains("duplicate column name")
1377        || msg.contains("no such column")
1378        || msg.contains("already exists")
1379}
1380
1381fn validate_local_schema(conn: &Connection) -> Result<()> {
1382    let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1383    conn.prepare(&sql)
1384        .map(|_| ())
1385        .context("validate local session schema")
1386}
1387
1388fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1389    let msg = format!("{err:#}").to_ascii_lowercase();
1390    msg.contains("no such column")
1391        || msg.contains("no such table")
1392        || msg.contains("cannot add a column")
1393        || msg.contains("already exists")
1394        || msg.contains("views may not be indexed")
1395        || msg.contains("malformed database schema")
1396        || msg.contains("duplicate column name")
1397}
1398
1399fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1400    if !path.exists() {
1401        return Ok(());
1402    }
1403
1404    let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1405    let backup_name = format!(
1406        "{}.legacy-{}.bak",
1407        path.file_name()
1408            .and_then(|n| n.to_str())
1409            .unwrap_or("local.db"),
1410        ts
1411    );
1412    let backup_path = path.with_file_name(backup_name);
1413    std::fs::rename(path, &backup_path).with_context(|| {
1414        format!(
1415            "rotate legacy db {} -> {}",
1416            path.display(),
1417            backup_path.display()
1418        )
1419    })?;
1420
1421    let wal = PathBuf::from(format!("{}-wal", path.display()));
1422    let shm = PathBuf::from(format!("{}-shm", path.display()));
1423    let _ = std::fs::remove_file(wal);
1424    let _ = std::fs::remove_file(shm);
1425    Ok(())
1426}
1427
1428const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1429    ("user_id", "TEXT"),
1430    ("team_id", "TEXT DEFAULT 'personal'"),
1431    ("tool", "TEXT DEFAULT ''"),
1432    ("agent_provider", "TEXT"),
1433    ("agent_model", "TEXT"),
1434    ("title", "TEXT"),
1435    ("description", "TEXT"),
1436    ("tags", "TEXT"),
1437    ("created_at", "TEXT DEFAULT ''"),
1438    ("uploaded_at", "TEXT DEFAULT ''"),
1439    ("message_count", "INTEGER DEFAULT 0"),
1440    ("user_message_count", "INTEGER DEFAULT 0"),
1441    ("task_count", "INTEGER DEFAULT 0"),
1442    ("event_count", "INTEGER DEFAULT 0"),
1443    ("duration_seconds", "INTEGER DEFAULT 0"),
1444    ("total_input_tokens", "INTEGER DEFAULT 0"),
1445    ("total_output_tokens", "INTEGER DEFAULT 0"),
1446    ("body_storage_key", "TEXT DEFAULT ''"),
1447    ("body_url", "TEXT"),
1448    ("git_remote", "TEXT"),
1449    ("git_branch", "TEXT"),
1450    ("git_commit", "TEXT"),
1451    ("git_repo_name", "TEXT"),
1452    ("pr_number", "INTEGER"),
1453    ("pr_url", "TEXT"),
1454    ("working_directory", "TEXT"),
1455    ("files_modified", "TEXT"),
1456    ("files_read", "TEXT"),
1457    ("has_errors", "BOOLEAN DEFAULT 0"),
1458    ("max_active_agents", "INTEGER DEFAULT 1"),
1459];
1460
1461fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1462    let mut existing = HashSet::new();
1463    let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1464    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1465    for row in rows {
1466        existing.insert(row?);
1467    }
1468
1469    for (name, decl) in REQUIRED_SESSION_COLUMNS {
1470        if existing.contains(*name) {
1471            continue;
1472        }
1473        let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1474        conn.execute_batch(&sql)
1475            .with_context(|| format!("add legacy sessions column '{name}'"))?;
1476    }
1477
1478    Ok(())
1479}
1480
1481/// Column list for SELECT queries against sessions + session_sync + users.
1482pub const LOCAL_SESSION_COLUMNS: &str = "\
1483s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1484s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1485s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1486s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1487s.total_input_tokens, s.total_output_tokens, \
1488s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1489s.pr_number, s.pr_url, s.working_directory, \
1490s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1491
1492fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1493    Ok(LocalSessionRow {
1494        id: row.get(0)?,
1495        source_path: row.get(1)?,
1496        sync_status: row.get(2)?,
1497        last_synced_at: row.get(3)?,
1498        user_id: row.get(4)?,
1499        nickname: row.get(5)?,
1500        team_id: row.get(6)?,
1501        tool: row.get(7)?,
1502        agent_provider: row.get(8)?,
1503        agent_model: row.get(9)?,
1504        title: row.get(10)?,
1505        description: row.get(11)?,
1506        tags: row.get(12)?,
1507        created_at: row.get(13)?,
1508        uploaded_at: row.get(14)?,
1509        message_count: row.get(15)?,
1510        user_message_count: row.get(16)?,
1511        task_count: row.get(17)?,
1512        event_count: row.get(18)?,
1513        duration_seconds: row.get(19)?,
1514        total_input_tokens: row.get(20)?,
1515        total_output_tokens: row.get(21)?,
1516        git_remote: row.get(22)?,
1517        git_branch: row.get(23)?,
1518        git_commit: row.get(24)?,
1519        git_repo_name: row.get(25)?,
1520        pr_number: row.get(26)?,
1521        pr_url: row.get(27)?,
1522        working_directory: row.get(28)?,
1523        files_modified: row.get(29)?,
1524        files_read: row.get(30)?,
1525        has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1526        max_active_agents: row.get(32).unwrap_or(1),
1527    })
1528}
1529
1530fn default_db_path() -> Result<PathBuf> {
1531    let home = std::env::var("HOME")
1532        .or_else(|_| std::env::var("USERPROFILE"))
1533        .context("Could not determine home directory")?;
1534    Ok(PathBuf::from(home)
1535        .join(".local")
1536        .join("share")
1537        .join("opensession")
1538        .join("local.db"))
1539}
1540
1541#[cfg(test)]
1542mod tests {
1543    use super::*;
1544
1545    use std::collections::BTreeSet;
1546    use std::fs::{create_dir_all, write};
1547    use tempfile::tempdir;
1548
1549    fn test_db() -> LocalDb {
1550        let dir = tempdir().unwrap();
1551        let path = dir.keep().join("test.db");
1552        LocalDb::open_path(&path).unwrap()
1553    }
1554
1555    fn temp_root() -> tempfile::TempDir {
1556        tempdir().unwrap()
1557    }
1558
1559    fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1560        LocalSessionRow {
1561            id: id.to_string(),
1562            source_path: source_path.map(String::from),
1563            sync_status: "local_only".to_string(),
1564            last_synced_at: None,
1565            user_id: None,
1566            nickname: None,
1567            team_id: None,
1568            tool: tool.to_string(),
1569            agent_provider: None,
1570            agent_model: None,
1571            title: Some("test".to_string()),
1572            description: None,
1573            tags: None,
1574            created_at: "2024-01-01T00:00:00Z".to_string(),
1575            uploaded_at: None,
1576            message_count: 0,
1577            user_message_count: 0,
1578            task_count: 0,
1579            event_count: 0,
1580            duration_seconds: 0,
1581            total_input_tokens: 0,
1582            total_output_tokens: 0,
1583            git_remote: None,
1584            git_branch: None,
1585            git_commit: None,
1586            git_repo_name: None,
1587            pr_number: None,
1588            pr_url: None,
1589            working_directory: None,
1590            files_modified: None,
1591            files_read: None,
1592            has_errors: false,
1593            max_active_agents: 1,
1594        }
1595    }
1596
1597    #[test]
1598    fn test_open_and_schema() {
1599        let _db = test_db();
1600    }
1601
1602    #[test]
1603    fn test_open_backfills_legacy_sessions_columns() {
1604        let dir = tempfile::tempdir().unwrap();
1605        let path = dir.path().join("legacy.db");
1606        {
1607            let conn = Connection::open(&path).unwrap();
1608            conn.execute_batch(
1609                "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1610                 INSERT INTO sessions (id) VALUES ('legacy-1');",
1611            )
1612            .unwrap();
1613        }
1614
1615        let db = LocalDb::open_path(&path).unwrap();
1616        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1617        assert_eq!(rows.len(), 1);
1618        assert_eq!(rows[0].id, "legacy-1");
1619        assert_eq!(rows[0].user_message_count, 0);
1620    }
1621
1622    #[test]
1623    fn test_open_rotates_incompatible_legacy_schema() {
1624        let dir = tempfile::tempdir().unwrap();
1625        let path = dir.path().join("broken.db");
1626        {
1627            let conn = Connection::open(&path).unwrap();
1628            conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1629                .unwrap();
1630        }
1631
1632        let db = LocalDb::open_path(&path).unwrap();
1633        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1634        assert!(rows.is_empty());
1635
1636        let rotated = std::fs::read_dir(dir.path())
1637            .unwrap()
1638            .filter_map(Result::ok)
1639            .any(|entry| {
1640                let name = entry.file_name();
1641                let name = name.to_string_lossy();
1642                name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1643            });
1644        assert!(rotated, "expected rotated legacy backup file");
1645    }
1646
1647    #[test]
1648    fn test_is_opencode_child_session() {
1649        let root = temp_root();
1650        let dir = root.path().join("sessions");
1651        create_dir_all(&dir).unwrap();
1652        let parent_session = dir.join("parent.json");
1653        write(
1654            &parent_session,
1655            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1656        )
1657        .unwrap();
1658        let child_session = dir.join("child.json");
1659        write(
1660            &child_session,
1661            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1662        )
1663        .unwrap();
1664
1665        let parent = make_row(
1666            "ses_parent",
1667            "opencode",
1668            Some(parent_session.to_str().unwrap()),
1669        );
1670        let child = make_row(
1671            "ses_child",
1672            "opencode",
1673            Some(child_session.to_str().unwrap()),
1674        );
1675        let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1676
1677        assert!(!is_opencode_child_session(&parent));
1678        assert!(is_opencode_child_session(&child));
1679        assert!(!is_opencode_child_session(&codex));
1680    }
1681
1682    #[test]
1683    fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1684        let child = LocalSessionRow {
1685            id: "sess_child".to_string(),
1686            source_path: None,
1687            sync_status: "local_only".to_string(),
1688            last_synced_at: None,
1689            user_id: None,
1690            nickname: None,
1691            team_id: None,
1692            tool: "opencode".to_string(),
1693            agent_provider: None,
1694            agent_model: None,
1695            title: None,
1696            description: None,
1697            tags: None,
1698            created_at: "2024-01-01T00:00:00Z".to_string(),
1699            uploaded_at: None,
1700            message_count: 1,
1701            user_message_count: 0,
1702            task_count: 4,
1703            event_count: 4,
1704            duration_seconds: 0,
1705            total_input_tokens: 0,
1706            total_output_tokens: 0,
1707            git_remote: None,
1708            git_branch: None,
1709            git_commit: None,
1710            git_repo_name: None,
1711            pr_number: None,
1712            pr_url: None,
1713            working_directory: None,
1714            files_modified: None,
1715            files_read: None,
1716            has_errors: false,
1717            max_active_agents: 1,
1718        };
1719
1720        let parent = LocalSessionRow {
1721            id: "sess_parent".to_string(),
1722            source_path: None,
1723            sync_status: "local_only".to_string(),
1724            last_synced_at: None,
1725            user_id: None,
1726            nickname: None,
1727            team_id: None,
1728            tool: "opencode".to_string(),
1729            agent_provider: None,
1730            agent_model: None,
1731            title: Some("regular".to_string()),
1732            description: None,
1733            tags: None,
1734            created_at: "2024-01-01T00:00:00Z".to_string(),
1735            uploaded_at: None,
1736            message_count: 1,
1737            user_message_count: 1,
1738            task_count: 2,
1739            event_count: 20,
1740            duration_seconds: 0,
1741            total_input_tokens: 0,
1742            total_output_tokens: 0,
1743            git_remote: None,
1744            git_branch: None,
1745            git_commit: None,
1746            git_repo_name: None,
1747            pr_number: None,
1748            pr_url: None,
1749            working_directory: None,
1750            files_modified: None,
1751            files_read: None,
1752            has_errors: false,
1753            max_active_agents: 1,
1754        };
1755
1756        assert!(is_opencode_child_session(&child));
1757        assert!(!is_opencode_child_session(&parent));
1758    }
1759
1760    #[test]
1761    fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1762        let child = LocalSessionRow {
1763            id: "sess_child_2".to_string(),
1764            source_path: None,
1765            sync_status: "local_only".to_string(),
1766            last_synced_at: None,
1767            user_id: None,
1768            nickname: None,
1769            team_id: None,
1770            tool: "opencode".to_string(),
1771            agent_provider: None,
1772            agent_model: None,
1773            title: None,
1774            description: None,
1775            tags: None,
1776            created_at: "2024-01-01T00:00:00Z".to_string(),
1777            uploaded_at: None,
1778            message_count: 2,
1779            user_message_count: 0,
1780            task_count: 4,
1781            event_count: 4,
1782            duration_seconds: 0,
1783            total_input_tokens: 0,
1784            total_output_tokens: 0,
1785            git_remote: None,
1786            git_branch: None,
1787            git_commit: None,
1788            git_repo_name: None,
1789            pr_number: None,
1790            pr_url: None,
1791            working_directory: None,
1792            files_modified: None,
1793            files_read: None,
1794            has_errors: false,
1795            max_active_agents: 1,
1796        };
1797
1798        let parent = LocalSessionRow {
1799            id: "sess_parent".to_string(),
1800            source_path: None,
1801            sync_status: "local_only".to_string(),
1802            last_synced_at: None,
1803            user_id: None,
1804            nickname: None,
1805            team_id: None,
1806            tool: "opencode".to_string(),
1807            agent_provider: None,
1808            agent_model: None,
1809            title: Some("regular".to_string()),
1810            description: None,
1811            tags: None,
1812            created_at: "2024-01-01T00:00:00Z".to_string(),
1813            uploaded_at: None,
1814            message_count: 2,
1815            user_message_count: 1,
1816            task_count: 5,
1817            event_count: 20,
1818            duration_seconds: 0,
1819            total_input_tokens: 0,
1820            total_output_tokens: 0,
1821            git_remote: None,
1822            git_branch: None,
1823            git_commit: None,
1824            git_repo_name: None,
1825            pr_number: None,
1826            pr_url: None,
1827            working_directory: None,
1828            files_modified: None,
1829            files_read: None,
1830            has_errors: false,
1831            max_active_agents: 1,
1832        };
1833
1834        assert!(is_opencode_child_session(&child));
1835        assert!(!is_opencode_child_session(&parent));
1836    }
1837
1838    #[test]
1839    fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1840        let child = LocalSessionRow {
1841            id: "sess_child_3".to_string(),
1842            source_path: None,
1843            sync_status: "local_only".to_string(),
1844            last_synced_at: None,
1845            user_id: None,
1846            nickname: None,
1847            team_id: None,
1848            tool: "opencode".to_string(),
1849            agent_provider: None,
1850            agent_model: None,
1851            title: None,
1852            description: None,
1853            tags: None,
1854            created_at: "2024-01-01T00:00:00Z".to_string(),
1855            uploaded_at: None,
1856            message_count: 3,
1857            user_message_count: 0,
1858            task_count: 2,
1859            event_count: 6,
1860            duration_seconds: 0,
1861            total_input_tokens: 0,
1862            total_output_tokens: 0,
1863            git_remote: None,
1864            git_branch: None,
1865            git_commit: None,
1866            git_repo_name: None,
1867            pr_number: None,
1868            pr_url: None,
1869            working_directory: None,
1870            files_modified: None,
1871            files_read: None,
1872            has_errors: false,
1873            max_active_agents: 1,
1874        };
1875
1876        assert!(is_opencode_child_session(&child));
1877    }
1878
1879    #[test]
1880    fn test_parse_opencode_parent_session_id_aliases() {
1881        let root = temp_root();
1882        let dir = root.path().join("session-aliases");
1883        create_dir_all(&dir).unwrap();
1884        let child_session = dir.join("child.json");
1885        write(
1886            &child_session,
1887            r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1888        )
1889        .unwrap();
1890        assert_eq!(
1891            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1892            Some("ses_parent")
1893        );
1894    }
1895
1896    #[test]
1897    fn test_parse_opencode_parent_session_id_nested_metadata() {
1898        let root = temp_root();
1899        let dir = root.path().join("session-nested");
1900        create_dir_all(&dir).unwrap();
1901        let child_session = dir.join("child.json");
1902        write(
1903            &child_session,
1904            r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1905        )
1906        .unwrap();
1907        assert_eq!(
1908            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1909            Some("ses_parent")
1910        );
1911    }
1912
1913    #[test]
1914    fn test_is_claude_subagent_session() {
1915        let row = make_row(
1916            "ses_parent",
1917            "claude-code",
1918            Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1919        );
1920        assert!(!is_opencode_child_session(&row));
1921        assert!(is_claude_subagent_session(&row));
1922        assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1923    }
1924
1925    #[test]
1926    fn test_hide_opencode_child_sessions() {
1927        let root = temp_root();
1928        let dir = root.path().join("sessions");
1929        create_dir_all(&dir).unwrap();
1930        let parent_session = dir.join("parent.json");
1931        let child_session = dir.join("child.json");
1932        let orphan_session = dir.join("orphan.json");
1933
1934        write(
1935            &parent_session,
1936            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1937        )
1938        .unwrap();
1939        write(
1940            &child_session,
1941            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1942        )
1943        .unwrap();
1944        write(
1945            &orphan_session,
1946            r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1947        )
1948        .unwrap();
1949
1950        let rows = vec![
1951            make_row(
1952                "ses_child",
1953                "opencode",
1954                Some(child_session.to_str().unwrap()),
1955            ),
1956            make_row(
1957                "ses_parent",
1958                "opencode",
1959                Some(parent_session.to_str().unwrap()),
1960            ),
1961            {
1962                let mut row = make_row("ses_other", "codex", None);
1963                row.user_message_count = 1;
1964                row
1965            },
1966            make_row(
1967                "ses_orphan",
1968                "opencode",
1969                Some(orphan_session.to_str().unwrap()),
1970            ),
1971        ];
1972
1973        let filtered = hide_opencode_child_sessions(rows);
1974        assert_eq!(filtered.len(), 3);
1975        assert!(filtered.iter().all(|r| r.id != "ses_child"));
1976    }
1977
1978    #[test]
1979    fn test_sync_cursor() {
1980        let db = test_db();
1981        assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1982        db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1983        assert_eq!(
1984            db.get_sync_cursor("team1").unwrap(),
1985            Some("2024-01-01T00:00:00Z".to_string())
1986        );
1987        // Update
1988        db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1989        assert_eq!(
1990            db.get_sync_cursor("team1").unwrap(),
1991            Some("2024-06-01T00:00:00Z".to_string())
1992        );
1993    }
1994
1995    #[test]
1996    fn test_body_cache() {
1997        let db = test_db();
1998        assert_eq!(db.get_cached_body("s1").unwrap(), None);
1999        db.cache_body("s1", b"hello world").unwrap();
2000        assert_eq!(
2001            db.get_cached_body("s1").unwrap(),
2002            Some(b"hello world".to_vec())
2003        );
2004    }
2005
2006    #[test]
2007    fn test_timeline_summary_cache_roundtrip() {
2008        let db = test_db();
2009        db.upsert_timeline_summary_cache(
2010            "k1",
2011            "timeline:v1",
2012            "compact text",
2013            "{\"kind\":\"turn-summary\"}",
2014            "raw text",
2015        )
2016        .unwrap();
2017
2018        let rows = db
2019            .list_timeline_summary_cache_by_namespace("timeline:v1")
2020            .unwrap();
2021        assert_eq!(rows.len(), 1);
2022        assert_eq!(rows[0].lookup_key, "k1");
2023        assert_eq!(rows[0].namespace, "timeline:v1");
2024        assert_eq!(rows[0].compact, "compact text");
2025        assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2026        assert_eq!(rows[0].raw, "raw text");
2027
2028        let cleared = db.clear_timeline_summary_cache().unwrap();
2029        assert_eq!(cleared, 1);
2030        let rows_after = db
2031            .list_timeline_summary_cache_by_namespace("timeline:v1")
2032            .unwrap();
2033        assert!(rows_after.is_empty());
2034    }
2035
2036    #[test]
2037    fn test_local_migrations_include_timeline_summary_cache() {
2038        let db = test_db();
2039        let conn = db.conn();
2040        let applied: bool = conn
2041            .query_row(
2042                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2043                params!["local_0003_timeline_summary_cache"],
2044                |row| row.get(0),
2045            )
2046            .unwrap();
2047        assert!(applied);
2048    }
2049
2050    #[test]
2051    fn test_local_migration_files_match_api_local_migrations() {
2052        fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2053            std::fs::read_dir(dir)
2054                .expect("read migrations directory")
2055                .filter_map(Result::ok)
2056                .map(|entry| entry.file_name().to_string_lossy().to_string())
2057                .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2058                .collect()
2059        }
2060
2061        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2062        let local_files = collect_local_sql(manifest_dir.join("migrations"));
2063        let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2064
2065        assert_eq!(
2066            local_files, api_files,
2067            "local-db local migrations must stay in parity with api local migrations"
2068        );
2069    }
2070
2071    #[test]
2072    fn test_upsert_remote_session() {
2073        let db = test_db();
2074        let summary = RemoteSessionSummary {
2075            id: "remote-1".to_string(),
2076            user_id: Some("u1".to_string()),
2077            nickname: Some("alice".to_string()),
2078            team_id: "t1".to_string(),
2079            tool: "claude-code".to_string(),
2080            agent_provider: None,
2081            agent_model: None,
2082            title: Some("Test session".to_string()),
2083            description: None,
2084            tags: None,
2085            created_at: "2024-01-01T00:00:00Z".to_string(),
2086            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2087            message_count: 10,
2088            task_count: 2,
2089            event_count: 20,
2090            duration_seconds: 300,
2091            total_input_tokens: 1000,
2092            total_output_tokens: 500,
2093            git_remote: None,
2094            git_branch: None,
2095            git_commit: None,
2096            git_repo_name: None,
2097            pr_number: None,
2098            pr_url: None,
2099            working_directory: None,
2100            files_modified: None,
2101            files_read: None,
2102            has_errors: false,
2103            max_active_agents: 1,
2104        };
2105        db.upsert_remote_session(&summary).unwrap();
2106
2107        let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2108        assert_eq!(sessions.len(), 1);
2109        assert_eq!(sessions[0].id, "remote-1");
2110        assert_eq!(sessions[0].sync_status, "remote_only");
2111        assert_eq!(sessions[0].nickname, None); // no user in local users table
2112    }
2113
2114    #[test]
2115    fn test_list_filter_by_repo() {
2116        let db = test_db();
2117        // Insert a remote session with team_id
2118        let summary1 = RemoteSessionSummary {
2119            id: "s1".to_string(),
2120            user_id: None,
2121            nickname: None,
2122            team_id: "t1".to_string(),
2123            tool: "claude-code".to_string(),
2124            agent_provider: None,
2125            agent_model: None,
2126            title: Some("Session 1".to_string()),
2127            description: None,
2128            tags: None,
2129            created_at: "2024-01-01T00:00:00Z".to_string(),
2130            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2131            message_count: 5,
2132            task_count: 0,
2133            event_count: 10,
2134            duration_seconds: 60,
2135            total_input_tokens: 100,
2136            total_output_tokens: 50,
2137            git_remote: None,
2138            git_branch: None,
2139            git_commit: None,
2140            git_repo_name: None,
2141            pr_number: None,
2142            pr_url: None,
2143            working_directory: None,
2144            files_modified: None,
2145            files_read: None,
2146            has_errors: false,
2147            max_active_agents: 1,
2148        };
2149        db.upsert_remote_session(&summary1).unwrap();
2150
2151        // Filter by team
2152        let filter = LocalSessionFilter {
2153            team_id: Some("t1".to_string()),
2154            ..Default::default()
2155        };
2156        assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2157
2158        let filter = LocalSessionFilter {
2159            team_id: Some("t999".to_string()),
2160            ..Default::default()
2161        };
2162        assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2163    }
2164
2165    // ── Helpers for inserting test sessions ────────────────────────────
2166
2167    fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2168        RemoteSessionSummary {
2169            id: id.to_string(),
2170            user_id: None,
2171            nickname: None,
2172            team_id: "t1".to_string(),
2173            tool: tool.to_string(),
2174            agent_provider: Some("anthropic".to_string()),
2175            agent_model: Some("claude-opus-4-6".to_string()),
2176            title: Some(title.to_string()),
2177            description: None,
2178            tags: None,
2179            created_at: created_at.to_string(),
2180            uploaded_at: created_at.to_string(),
2181            message_count: 5,
2182            task_count: 1,
2183            event_count: 10,
2184            duration_seconds: 300,
2185            total_input_tokens: 1000,
2186            total_output_tokens: 500,
2187            git_remote: None,
2188            git_branch: None,
2189            git_commit: None,
2190            git_repo_name: None,
2191            pr_number: None,
2192            pr_url: None,
2193            working_directory: None,
2194            files_modified: None,
2195            files_read: None,
2196            has_errors: false,
2197            max_active_agents: 1,
2198        }
2199    }
2200
2201    fn seed_sessions(db: &LocalDb) {
2202        // Insert 5 sessions across two tools, ordered by created_at
2203        db.upsert_remote_session(&make_summary(
2204            "s1",
2205            "claude-code",
2206            "First session",
2207            "2024-01-01T00:00:00Z",
2208        ))
2209        .unwrap();
2210        db.upsert_remote_session(&make_summary(
2211            "s2",
2212            "claude-code",
2213            "JWT auth work",
2214            "2024-01-02T00:00:00Z",
2215        ))
2216        .unwrap();
2217        db.upsert_remote_session(&make_summary(
2218            "s3",
2219            "gemini",
2220            "Gemini test",
2221            "2024-01-03T00:00:00Z",
2222        ))
2223        .unwrap();
2224        db.upsert_remote_session(&make_summary(
2225            "s4",
2226            "claude-code",
2227            "Error handling",
2228            "2024-01-04T00:00:00Z",
2229        ))
2230        .unwrap();
2231        db.upsert_remote_session(&make_summary(
2232            "s5",
2233            "claude-code",
2234            "Final polish",
2235            "2024-01-05T00:00:00Z",
2236        ))
2237        .unwrap();
2238    }
2239
2240    // ── list_sessions_log tests ────────────────────────────────────────
2241
2242    #[test]
2243    fn test_log_no_filters() {
2244        let db = test_db();
2245        seed_sessions(&db);
2246        let filter = LogFilter::default();
2247        let results = db.list_sessions_log(&filter).unwrap();
2248        assert_eq!(results.len(), 5);
2249        // Should be ordered by created_at DESC
2250        assert_eq!(results[0].id, "s5");
2251        assert_eq!(results[4].id, "s1");
2252    }
2253
2254    #[test]
2255    fn test_log_filter_by_tool() {
2256        let db = test_db();
2257        seed_sessions(&db);
2258        let filter = LogFilter {
2259            tool: Some("claude-code".to_string()),
2260            ..Default::default()
2261        };
2262        let results = db.list_sessions_log(&filter).unwrap();
2263        assert_eq!(results.len(), 4);
2264        assert!(results.iter().all(|s| s.tool == "claude-code"));
2265    }
2266
2267    #[test]
2268    fn test_log_filter_by_model_wildcard() {
2269        let db = test_db();
2270        seed_sessions(&db);
2271        let filter = LogFilter {
2272            model: Some("claude*".to_string()),
2273            ..Default::default()
2274        };
2275        let results = db.list_sessions_log(&filter).unwrap();
2276        assert_eq!(results.len(), 5); // all have claude-opus model
2277    }
2278
2279    #[test]
2280    fn test_log_filter_since() {
2281        let db = test_db();
2282        seed_sessions(&db);
2283        let filter = LogFilter {
2284            since: Some("2024-01-03T00:00:00Z".to_string()),
2285            ..Default::default()
2286        };
2287        let results = db.list_sessions_log(&filter).unwrap();
2288        assert_eq!(results.len(), 3); // s3, s4, s5
2289    }
2290
2291    #[test]
2292    fn test_log_filter_before() {
2293        let db = test_db();
2294        seed_sessions(&db);
2295        let filter = LogFilter {
2296            before: Some("2024-01-03T00:00:00Z".to_string()),
2297            ..Default::default()
2298        };
2299        let results = db.list_sessions_log(&filter).unwrap();
2300        assert_eq!(results.len(), 2); // s1, s2
2301    }
2302
2303    #[test]
2304    fn test_log_filter_since_and_before() {
2305        let db = test_db();
2306        seed_sessions(&db);
2307        let filter = LogFilter {
2308            since: Some("2024-01-02T00:00:00Z".to_string()),
2309            before: Some("2024-01-04T00:00:00Z".to_string()),
2310            ..Default::default()
2311        };
2312        let results = db.list_sessions_log(&filter).unwrap();
2313        assert_eq!(results.len(), 2); // s2, s3
2314    }
2315
2316    #[test]
2317    fn test_log_filter_grep() {
2318        let db = test_db();
2319        seed_sessions(&db);
2320        let filter = LogFilter {
2321            grep: Some("JWT".to_string()),
2322            ..Default::default()
2323        };
2324        let results = db.list_sessions_log(&filter).unwrap();
2325        assert_eq!(results.len(), 1);
2326        assert_eq!(results[0].id, "s2");
2327    }
2328
2329    #[test]
2330    fn test_log_limit_and_offset() {
2331        let db = test_db();
2332        seed_sessions(&db);
2333        let filter = LogFilter {
2334            limit: Some(2),
2335            offset: Some(1),
2336            ..Default::default()
2337        };
2338        let results = db.list_sessions_log(&filter).unwrap();
2339        assert_eq!(results.len(), 2);
2340        assert_eq!(results[0].id, "s4"); // second most recent
2341        assert_eq!(results[1].id, "s3");
2342    }
2343
2344    #[test]
2345    fn test_log_limit_only() {
2346        let db = test_db();
2347        seed_sessions(&db);
2348        let filter = LogFilter {
2349            limit: Some(3),
2350            ..Default::default()
2351        };
2352        let results = db.list_sessions_log(&filter).unwrap();
2353        assert_eq!(results.len(), 3);
2354    }
2355
2356    #[test]
2357    fn test_list_sessions_limit_offset() {
2358        let db = test_db();
2359        seed_sessions(&db);
2360        let filter = LocalSessionFilter {
2361            limit: Some(2),
2362            offset: Some(1),
2363            ..Default::default()
2364        };
2365        let results = db.list_sessions(&filter).unwrap();
2366        assert_eq!(results.len(), 2);
2367        assert_eq!(results[0].id, "s4");
2368        assert_eq!(results[1].id, "s3");
2369    }
2370
2371    #[test]
2372    fn test_count_sessions_filtered() {
2373        let db = test_db();
2374        seed_sessions(&db);
2375        let count = db
2376            .count_sessions_filtered(&LocalSessionFilter::default())
2377            .unwrap();
2378        assert_eq!(count, 5);
2379    }
2380
2381    #[test]
2382    fn test_list_working_directories_distinct_non_empty() {
2383        let db = test_db();
2384
2385        let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2386        a.working_directory = Some("/tmp/repo-a".to_string());
2387        let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2388        b.working_directory = Some("/tmp/repo-a".to_string());
2389        let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2390        c.working_directory = Some("/tmp/repo-b".to_string());
2391        let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2392        d.working_directory = Some("".to_string());
2393
2394        db.upsert_remote_session(&a).unwrap();
2395        db.upsert_remote_session(&b).unwrap();
2396        db.upsert_remote_session(&c).unwrap();
2397        db.upsert_remote_session(&d).unwrap();
2398
2399        let dirs = db.list_working_directories().unwrap();
2400        assert_eq!(
2401            dirs,
2402            vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2403        );
2404    }
2405
2406    #[test]
2407    fn test_list_session_tools() {
2408        let db = test_db();
2409        seed_sessions(&db);
2410        let tools = db
2411            .list_session_tools(&LocalSessionFilter::default())
2412            .unwrap();
2413        assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2414    }
2415
2416    #[test]
2417    fn test_log_combined_filters() {
2418        let db = test_db();
2419        seed_sessions(&db);
2420        let filter = LogFilter {
2421            tool: Some("claude-code".to_string()),
2422            since: Some("2024-01-03T00:00:00Z".to_string()),
2423            limit: Some(1),
2424            ..Default::default()
2425        };
2426        let results = db.list_sessions_log(&filter).unwrap();
2427        assert_eq!(results.len(), 1);
2428        assert_eq!(results[0].id, "s5"); // most recent claude-code after Jan 3
2429    }
2430
2431    // ── Session offset/latest tests ────────────────────────────────────
2432
2433    #[test]
2434    fn test_get_session_by_offset() {
2435        let db = test_db();
2436        seed_sessions(&db);
2437        let row = db.get_session_by_offset(0).unwrap().unwrap();
2438        assert_eq!(row.id, "s5"); // most recent
2439        let row = db.get_session_by_offset(2).unwrap().unwrap();
2440        assert_eq!(row.id, "s3");
2441        assert!(db.get_session_by_offset(10).unwrap().is_none());
2442    }
2443
2444    #[test]
2445    fn test_get_session_by_tool_offset() {
2446        let db = test_db();
2447        seed_sessions(&db);
2448        let row = db
2449            .get_session_by_tool_offset("claude-code", 0)
2450            .unwrap()
2451            .unwrap();
2452        assert_eq!(row.id, "s5");
2453        let row = db
2454            .get_session_by_tool_offset("claude-code", 1)
2455            .unwrap()
2456            .unwrap();
2457        assert_eq!(row.id, "s4");
2458        let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2459        assert_eq!(row.id, "s3");
2460        assert!(db
2461            .get_session_by_tool_offset("gemini", 1)
2462            .unwrap()
2463            .is_none());
2464    }
2465
2466    #[test]
2467    fn test_get_sessions_latest() {
2468        let db = test_db();
2469        seed_sessions(&db);
2470        let rows = db.get_sessions_latest(3).unwrap();
2471        assert_eq!(rows.len(), 3);
2472        assert_eq!(rows[0].id, "s5");
2473        assert_eq!(rows[1].id, "s4");
2474        assert_eq!(rows[2].id, "s3");
2475    }
2476
2477    #[test]
2478    fn test_get_sessions_by_tool_latest() {
2479        let db = test_db();
2480        seed_sessions(&db);
2481        let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2482        assert_eq!(rows.len(), 2);
2483        assert_eq!(rows[0].id, "s5");
2484        assert_eq!(rows[1].id, "s4");
2485    }
2486
2487    #[test]
2488    fn test_get_sessions_latest_more_than_available() {
2489        let db = test_db();
2490        seed_sessions(&db);
2491        let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2492        assert_eq!(rows.len(), 1); // only 1 gemini session
2493    }
2494
2495    #[test]
2496    fn test_session_count() {
2497        let db = test_db();
2498        assert_eq!(db.session_count().unwrap(), 0);
2499        seed_sessions(&db);
2500        assert_eq!(db.session_count().unwrap(), 5);
2501    }
2502
2503    // ── Commit link tests ─────────────────────────────────────────────
2504
2505    #[test]
2506    fn test_link_commit_session() {
2507        let db = test_db();
2508        seed_sessions(&db);
2509        db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2510            .unwrap();
2511
2512        let commits = db.get_commits_by_session("s1").unwrap();
2513        assert_eq!(commits.len(), 1);
2514        assert_eq!(commits[0].commit_hash, "abc123");
2515        assert_eq!(commits[0].session_id, "s1");
2516        assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2517        assert_eq!(commits[0].branch.as_deref(), Some("main"));
2518
2519        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2520        assert_eq!(sessions.len(), 1);
2521        assert_eq!(sessions[0].id, "s1");
2522    }
2523
2524    #[test]
2525    fn test_get_sessions_by_commit() {
2526        let db = test_db();
2527        seed_sessions(&db);
2528        // Link multiple sessions to the same commit
2529        db.link_commit_session("abc123", "s1", None, None).unwrap();
2530        db.link_commit_session("abc123", "s2", None, None).unwrap();
2531        db.link_commit_session("abc123", "s3", None, None).unwrap();
2532
2533        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2534        assert_eq!(sessions.len(), 3);
2535        // Ordered by created_at DESC
2536        assert_eq!(sessions[0].id, "s3");
2537        assert_eq!(sessions[1].id, "s2");
2538        assert_eq!(sessions[2].id, "s1");
2539    }
2540
2541    #[test]
2542    fn test_get_commits_by_session() {
2543        let db = test_db();
2544        seed_sessions(&db);
2545        // Link multiple commits to the same session
2546        db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2547            .unwrap();
2548        db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2549            .unwrap();
2550        db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2551            .unwrap();
2552
2553        let commits = db.get_commits_by_session("s1").unwrap();
2554        assert_eq!(commits.len(), 3);
2555        // All linked to s1
2556        assert!(commits.iter().all(|c| c.session_id == "s1"));
2557    }
2558
2559    #[test]
2560    fn test_duplicate_link_ignored() {
2561        let db = test_db();
2562        seed_sessions(&db);
2563        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2564            .unwrap();
2565        // Inserting the same link again should not error
2566        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2567            .unwrap();
2568
2569        let commits = db.get_commits_by_session("s1").unwrap();
2570        assert_eq!(commits.len(), 1);
2571    }
2572
2573    #[test]
2574    fn test_log_filter_by_commit() {
2575        let db = test_db();
2576        seed_sessions(&db);
2577        db.link_commit_session("abc123", "s2", None, None).unwrap();
2578        db.link_commit_session("abc123", "s4", None, None).unwrap();
2579
2580        let filter = LogFilter {
2581            commit: Some("abc123".to_string()),
2582            ..Default::default()
2583        };
2584        let results = db.list_sessions_log(&filter).unwrap();
2585        assert_eq!(results.len(), 2);
2586        assert_eq!(results[0].id, "s4");
2587        assert_eq!(results[1].id, "s2");
2588
2589        // Non-existent commit returns nothing
2590        let filter = LogFilter {
2591            commit: Some("nonexistent".to_string()),
2592            ..Default::default()
2593        };
2594        let results = db.list_sessions_log(&filter).unwrap();
2595        assert_eq!(results.len(), 0);
2596    }
2597}