Skip to main content

opensession_local_db/
lib.rs

1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16// Keep local cache schema aligned with remote/session schema migrations.
17const REMOTE_MIGRATIONS: &[Migration] = &[
18    ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19    (
20        "0002_team_invite_keys",
21        include_str!("../migrations/0002_team_invite_keys.sql"),
22    ),
23    (
24        "0003_max_active_agents",
25        include_str!("../migrations/0003_max_active_agents.sql"),
26    ),
27    (
28        "0004_oauth_states_provider",
29        include_str!("../migrations/0004_oauth_states_provider.sql"),
30    ),
31    (
32        "0005_sessions_body_url_backfill",
33        include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34    ),
35    (
36        "0006_sessions_remove_fk_constraints",
37        include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38    ),
39    (
40        "0007_sessions_list_perf_indexes",
41        include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42    ),
43    (
44        "0008_teams_force_public",
45        include_str!("../migrations/0008_teams_force_public.sql"),
46    ),
47];
48
49const LOCAL_MIGRATIONS: &[Migration] = &[
50    (
51        "local_0001_schema",
52        include_str!("../migrations/local_0001_schema.sql"),
53    ),
54    (
55        "local_0002_drop_unused_local_sessions",
56        include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
57    ),
58    (
59        "local_0003_timeline_summary_cache",
60        include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
61    ),
62];
63
64/// A local session row stored in the local SQLite database.
65#[derive(Debug, Clone)]
66pub struct LocalSessionRow {
67    pub id: String,
68    pub source_path: Option<String>,
69    pub sync_status: String,
70    pub last_synced_at: Option<String>,
71    pub user_id: Option<String>,
72    pub nickname: Option<String>,
73    pub team_id: Option<String>,
74    pub tool: String,
75    pub agent_provider: Option<String>,
76    pub agent_model: Option<String>,
77    pub title: Option<String>,
78    pub description: Option<String>,
79    pub tags: Option<String>,
80    pub created_at: String,
81    pub uploaded_at: Option<String>,
82    pub message_count: i64,
83    pub user_message_count: i64,
84    pub task_count: i64,
85    pub event_count: i64,
86    pub duration_seconds: i64,
87    pub total_input_tokens: i64,
88    pub total_output_tokens: i64,
89    pub git_remote: Option<String>,
90    pub git_branch: Option<String>,
91    pub git_commit: Option<String>,
92    pub git_repo_name: Option<String>,
93    pub pr_number: Option<i64>,
94    pub pr_url: Option<String>,
95    pub working_directory: Option<String>,
96    pub files_modified: Option<String>,
97    pub files_read: Option<String>,
98    pub has_errors: bool,
99    pub max_active_agents: i64,
100}
101
102/// A link between a git commit and an AI session.
103#[derive(Debug, Clone)]
104pub struct CommitLink {
105    pub commit_hash: String,
106    pub session_id: String,
107    pub repo_path: Option<String>,
108    pub branch: Option<String>,
109    pub created_at: String,
110}
111
112/// A cached timeline summary row stored in local DB.
113#[derive(Debug, Clone)]
114pub struct TimelineSummaryCacheRow {
115    pub lookup_key: String,
116    pub namespace: String,
117    pub compact: String,
118    pub payload: String,
119    pub raw: String,
120    pub cached_at: String,
121}
122
123/// Return true when a cached row corresponds to an OpenCode child session.
124pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
125    if row.tool != "opencode" {
126        return false;
127    }
128
129    // Strong heuristic: some orchestration artifacts are generated as sessions
130    // that do not carry a user-visible request surface and only contain system
131    // or agent-only events.
132    if row.user_message_count <= 0
133        && row.message_count <= 4
134        && row.task_count <= 4
135        && row.event_count > 0
136        && row.event_count <= 16
137    {
138        return true;
139    }
140
141    if is_opencode_subagent_source(row.source_path.as_deref()) {
142        return true;
143    }
144
145    let source_path = match row.source_path.as_deref() {
146        Some(path) if !path.trim().is_empty() => path,
147        _ => return false,
148    };
149
150    parse_opencode_parent_session_id(source_path)
151        .is_some_and(|parent_id| !parent_id.trim().is_empty())
152}
153
154/// Parse `parentID` / `parentId` from an OpenCode session JSON file.
155pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
156    let text = fs::read_to_string(source_path).ok()?;
157    let json: Value = serde_json::from_str(&text).ok()?;
158    lookup_parent_session_id(&json)
159}
160
161fn lookup_parent_session_id(value: &Value) -> Option<String> {
162    match value {
163        Value::Object(obj) => {
164            for (key, value) in obj {
165                if is_parent_id_key(key) {
166                    if let Some(parent_id) = value.as_str() {
167                        let parent_id = parent_id.trim();
168                        if !parent_id.is_empty() {
169                            return Some(parent_id.to_string());
170                        }
171                    }
172                }
173                if let Some(parent_id) = lookup_parent_session_id(value) {
174                    return Some(parent_id);
175                }
176            }
177            None
178        }
179        Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
180        _ => None,
181    }
182}
183
184fn is_parent_id_key(key: &str) -> bool {
185    let flat = key
186        .chars()
187        .filter(|c| c.is_ascii_alphanumeric())
188        .map(|c| c.to_ascii_lowercase())
189        .collect::<String>();
190
191    flat == "parentid"
192        || flat == "parentuuid"
193        || flat == "parentsessionid"
194        || flat == "parentsessionuuid"
195        || flat.ends_with("parentsessionid")
196        || (flat.contains("parent") && flat.ends_with("id"))
197        || (flat.contains("parent") && flat.ends_with("uuid"))
198}
199
200/// Remove OpenCode child sessions so only parent sessions remain visible.
201pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
202    rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
203    rows
204}
205
206fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
207    is_subagent_source(source_path)
208}
209
210fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
211    if row.tool != "claude-code" {
212        return false;
213    }
214
215    is_subagent_source(row.source_path.as_deref())
216}
217
218fn is_subagent_source(source_path: Option<&str>) -> bool {
219    let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
220        return false;
221    };
222
223    if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
224        return true;
225    }
226
227    let filename = match std::path::Path::new(&source_path).file_name() {
228        Some(name) => name.to_string_lossy(),
229        None => return false,
230    };
231
232    filename.starts_with("agent-")
233        || filename.starts_with("agent_")
234        || filename.starts_with("subagent-")
235        || filename.starts_with("subagent_")
236}
237
238/// Filter for listing sessions from the local DB.
239#[derive(Debug, Clone)]
240pub struct LocalSessionFilter {
241    pub team_id: Option<String>,
242    pub sync_status: Option<String>,
243    pub git_repo_name: Option<String>,
244    pub search: Option<String>,
245    pub tool: Option<String>,
246    pub sort: LocalSortOrder,
247    pub time_range: LocalTimeRange,
248    pub limit: Option<u32>,
249    pub offset: Option<u32>,
250}
251
252impl Default for LocalSessionFilter {
253    fn default() -> Self {
254        Self {
255            team_id: None,
256            sync_status: None,
257            git_repo_name: None,
258            search: None,
259            tool: None,
260            sort: LocalSortOrder::Recent,
261            time_range: LocalTimeRange::All,
262            limit: None,
263            offset: None,
264        }
265    }
266}
267
268/// Sort order for local session listing.
269#[derive(Debug, Clone, Default, PartialEq, Eq)]
270pub enum LocalSortOrder {
271    #[default]
272    Recent,
273    Popular,
274    Longest,
275}
276
277/// Time range filter for local session listing.
278#[derive(Debug, Clone, Default, PartialEq, Eq)]
279pub enum LocalTimeRange {
280    Hours24,
281    Days7,
282    Days30,
283    #[default]
284    All,
285}
286
287/// Minimal remote session payload needed for local cache upsert.
288#[derive(Debug, Clone)]
289pub struct RemoteSessionSummary {
290    pub id: String,
291    pub user_id: Option<String>,
292    pub nickname: Option<String>,
293    pub team_id: String,
294    pub tool: String,
295    pub agent_provider: Option<String>,
296    pub agent_model: Option<String>,
297    pub title: Option<String>,
298    pub description: Option<String>,
299    pub tags: Option<String>,
300    pub created_at: String,
301    pub uploaded_at: String,
302    pub message_count: i64,
303    pub task_count: i64,
304    pub event_count: i64,
305    pub duration_seconds: i64,
306    pub total_input_tokens: i64,
307    pub total_output_tokens: i64,
308    pub git_remote: Option<String>,
309    pub git_branch: Option<String>,
310    pub git_commit: Option<String>,
311    pub git_repo_name: Option<String>,
312    pub pr_number: Option<i64>,
313    pub pr_url: Option<String>,
314    pub working_directory: Option<String>,
315    pub files_modified: Option<String>,
316    pub files_read: Option<String>,
317    pub has_errors: bool,
318    pub max_active_agents: i64,
319}
320
321/// Extended filter for the `log` command.
322#[derive(Debug, Default)]
323pub struct LogFilter {
324    /// Filter by tool name (exact match).
325    pub tool: Option<String>,
326    /// Filter by model (glob-like, uses LIKE).
327    pub model: Option<String>,
328    /// Filter sessions created after this ISO8601 timestamp.
329    pub since: Option<String>,
330    /// Filter sessions created before this ISO8601 timestamp.
331    pub before: Option<String>,
332    /// Filter sessions that touched this file path (searches files_modified JSON).
333    pub touches: Option<String>,
334    /// Free-text search in title, description, tags.
335    pub grep: Option<String>,
336    /// Only sessions with errors.
337    pub has_errors: Option<bool>,
338    /// Filter by working directory (prefix match).
339    pub working_directory: Option<String>,
340    /// Filter by git repo name.
341    pub git_repo_name: Option<String>,
342    /// Filter sessions linked to this git commit hash.
343    pub commit: Option<String>,
344    /// Maximum number of results.
345    pub limit: Option<u32>,
346    /// Offset for pagination.
347    pub offset: Option<u32>,
348}
349
350/// Base FROM clause for session list queries.
351const FROM_CLAUSE: &str = "\
352FROM sessions s \
353LEFT JOIN session_sync ss ON ss.session_id = s.id \
354LEFT JOIN users u ON u.id = s.user_id";
355
356/// Local SQLite database shared by TUI and Daemon.
357/// Thread-safe: wraps the connection in a Mutex so it can be shared via `Arc<LocalDb>`.
358pub struct LocalDb {
359    conn: Mutex<Connection>,
360}
361
362impl LocalDb {
363    /// Open (or create) the local database at the default path.
364    /// `~/.local/share/opensession/local.db`
365    pub fn open() -> Result<Self> {
366        let path = default_db_path()?;
367        Self::open_path(&path)
368    }
369
370    /// Open (or create) the local database at a specific path.
371    pub fn open_path(path: &PathBuf) -> Result<Self> {
372        if let Some(parent) = path.parent() {
373            std::fs::create_dir_all(parent)
374                .with_context(|| format!("create dir for {}", path.display()))?;
375        }
376        match open_connection_with_latest_schema(path) {
377            Ok(conn) => Ok(Self {
378                conn: Mutex::new(conn),
379            }),
380            Err(err) => {
381                if !is_schema_compat_error(&err) {
382                    return Err(err);
383                }
384
385                // Local DB is a cache. If schema migration cannot safely reconcile
386                // a legacy/corrupted file, rotate it out and recreate with latest schema.
387                rotate_legacy_db(path)?;
388
389                let conn = open_connection_with_latest_schema(path)
390                    .with_context(|| format!("recreate db {}", path.display()))?;
391                Ok(Self {
392                    conn: Mutex::new(conn),
393                })
394            }
395        }
396    }
397
398    fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
399        self.conn.lock().expect("local db mutex poisoned")
400    }
401
402    // ── Upsert local session (parsed from file) ────────────────────────
403
404    pub fn upsert_local_session(
405        &self,
406        session: &Session,
407        source_path: &str,
408        git: &GitContext,
409    ) -> Result<()> {
410        let title = session.context.title.as_deref();
411        let description = session.context.description.as_deref();
412        let tags = if session.context.tags.is_empty() {
413            None
414        } else {
415            Some(session.context.tags.join(","))
416        };
417        let created_at = session.context.created_at.to_rfc3339();
418        let cwd = session
419            .context
420            .attributes
421            .get("cwd")
422            .or_else(|| session.context.attributes.get("working_directory"))
423            .and_then(|v| v.as_str().map(String::from));
424
425        // Extract files_modified, files_read, and has_errors from events
426        let (files_modified, files_read, has_errors) =
427            opensession_core::extract::extract_file_metadata(session);
428        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
429
430        let conn = self.conn();
431        conn.execute(
432            "INSERT INTO sessions \
433             (id, team_id, tool, agent_provider, agent_model, \
434              title, description, tags, created_at, \
435              message_count, user_message_count, task_count, event_count, duration_seconds, \
436              total_input_tokens, total_output_tokens, body_storage_key, \
437              git_remote, git_branch, git_commit, git_repo_name, working_directory, \
438              files_modified, files_read, has_errors, max_active_agents) \
439             VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
440             ON CONFLICT(id) DO UPDATE SET \
441              tool=excluded.tool, agent_provider=excluded.agent_provider, \
442              agent_model=excluded.agent_model, \
443              title=excluded.title, description=excluded.description, \
444              tags=excluded.tags, \
445              message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
446              task_count=excluded.task_count, \
447              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
448              total_input_tokens=excluded.total_input_tokens, \
449              total_output_tokens=excluded.total_output_tokens, \
450              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
451              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
452              working_directory=excluded.working_directory, \
453              files_modified=excluded.files_modified, files_read=excluded.files_read, \
454              has_errors=excluded.has_errors, \
455              max_active_agents=excluded.max_active_agents",
456            params![
457                &session.session_id,
458                &session.agent.tool,
459                &session.agent.provider,
460                &session.agent.model,
461                title,
462                description,
463                &tags,
464                &created_at,
465                session.stats.message_count as i64,
466                session.stats.user_message_count as i64,
467                session.stats.task_count as i64,
468                session.stats.event_count as i64,
469                session.stats.duration_seconds as i64,
470                session.stats.total_input_tokens as i64,
471                session.stats.total_output_tokens as i64,
472                &git.remote,
473                &git.branch,
474                &git.commit,
475                &git.repo_name,
476                &cwd,
477                &files_modified,
478                &files_read,
479                has_errors,
480                max_active_agents,
481            ],
482        )?;
483
484        conn.execute(
485            "INSERT INTO session_sync (session_id, source_path, sync_status) \
486             VALUES (?1, ?2, 'local_only') \
487             ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
488            params![&session.session_id, source_path],
489        )?;
490        Ok(())
491    }
492
493    // ── Upsert remote session (from server sync pull) ──────────────────
494
495    pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
496        let conn = self.conn();
497        conn.execute(
498            "INSERT INTO sessions \
499             (id, user_id, team_id, tool, agent_provider, agent_model, \
500              title, description, tags, created_at, uploaded_at, \
501              message_count, task_count, event_count, duration_seconds, \
502              total_input_tokens, total_output_tokens, body_storage_key, \
503              git_remote, git_branch, git_commit, git_repo_name, \
504              pr_number, pr_url, working_directory, \
505              files_modified, files_read, has_errors, max_active_agents) \
506             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
507             ON CONFLICT(id) DO UPDATE SET \
508              title=excluded.title, description=excluded.description, \
509              tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
510              message_count=excluded.message_count, task_count=excluded.task_count, \
511              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
512              total_input_tokens=excluded.total_input_tokens, \
513              total_output_tokens=excluded.total_output_tokens, \
514              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
515              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
516              pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
517              working_directory=excluded.working_directory, \
518              files_modified=excluded.files_modified, files_read=excluded.files_read, \
519              has_errors=excluded.has_errors, \
520              max_active_agents=excluded.max_active_agents",
521            params![
522                &summary.id,
523                &summary.user_id,
524                &summary.team_id,
525                &summary.tool,
526                &summary.agent_provider,
527                &summary.agent_model,
528                &summary.title,
529                &summary.description,
530                &summary.tags,
531                &summary.created_at,
532                &summary.uploaded_at,
533                summary.message_count,
534                summary.task_count,
535                summary.event_count,
536                summary.duration_seconds,
537                summary.total_input_tokens,
538                summary.total_output_tokens,
539                &summary.git_remote,
540                &summary.git_branch,
541                &summary.git_commit,
542                &summary.git_repo_name,
543                summary.pr_number,
544                &summary.pr_url,
545                &summary.working_directory,
546                &summary.files_modified,
547                &summary.files_read,
548                summary.has_errors,
549                summary.max_active_agents,
550            ],
551        )?;
552
553        conn.execute(
554            "INSERT INTO session_sync (session_id, sync_status) \
555             VALUES (?1, 'remote_only') \
556             ON CONFLICT(session_id) DO UPDATE SET \
557              sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
558            params![&summary.id],
559        )?;
560        Ok(())
561    }
562
563    // ── List sessions ──────────────────────────────────────────────────
564
565    fn build_local_session_where_clause(
566        filter: &LocalSessionFilter,
567    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
568        let mut where_clauses = vec![
569            "1=1".to_string(),
570            "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
571            "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
572            "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
573        ];
574        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
575        let mut idx = 1u32;
576
577        if let Some(ref team_id) = filter.team_id {
578            where_clauses.push(format!("s.team_id = ?{idx}"));
579            param_values.push(Box::new(team_id.clone()));
580            idx += 1;
581        }
582
583        if let Some(ref sync_status) = filter.sync_status {
584            where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
585            param_values.push(Box::new(sync_status.clone()));
586            idx += 1;
587        }
588
589        if let Some(ref repo) = filter.git_repo_name {
590            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
591            param_values.push(Box::new(repo.clone()));
592            idx += 1;
593        }
594
595        if let Some(ref tool) = filter.tool {
596            where_clauses.push(format!("s.tool = ?{idx}"));
597            param_values.push(Box::new(tool.clone()));
598            idx += 1;
599        }
600
601        if let Some(ref search) = filter.search {
602            let like = format!("%{search}%");
603            where_clauses.push(format!(
604                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
605                i1 = idx,
606                i2 = idx + 1,
607                i3 = idx + 2,
608            ));
609            param_values.push(Box::new(like.clone()));
610            param_values.push(Box::new(like.clone()));
611            param_values.push(Box::new(like));
612            idx += 3;
613        }
614
615        let interval = match filter.time_range {
616            LocalTimeRange::Hours24 => Some("-1 day"),
617            LocalTimeRange::Days7 => Some("-7 days"),
618            LocalTimeRange::Days30 => Some("-30 days"),
619            LocalTimeRange::All => None,
620        };
621        if let Some(interval) = interval {
622            where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
623            param_values.push(Box::new(interval.to_string()));
624        }
625
626        (where_clauses.join(" AND "), param_values)
627    }
628
629    pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
630        let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
631        let order_clause = match filter.sort {
632            LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
633            LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
634            LocalSortOrder::Recent => "s.created_at DESC",
635        };
636
637        let mut sql = format!(
638            "SELECT {LOCAL_SESSION_COLUMNS} \
639             {FROM_CLAUSE} WHERE {where_str} \
640             ORDER BY {order_clause}"
641        );
642
643        if let Some(limit) = filter.limit {
644            sql.push_str(" LIMIT ?");
645            param_values.push(Box::new(limit));
646            if let Some(offset) = filter.offset {
647                sql.push_str(" OFFSET ?");
648                param_values.push(Box::new(offset));
649            }
650        }
651
652        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
653            param_values.iter().map(|p| p.as_ref()).collect();
654        let conn = self.conn();
655        let mut stmt = conn.prepare(&sql)?;
656        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
657
658        let mut result = Vec::new();
659        for row in rows {
660            result.push(row?);
661        }
662
663        Ok(hide_opencode_child_sessions(result))
664    }
665
666    /// Count sessions for a given list filter (before UI-level page slicing).
667    pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
668        let mut count_filter = filter.clone();
669        count_filter.limit = None;
670        count_filter.offset = None;
671        let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
672        let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
673        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
674            param_values.iter().map(|p| p.as_ref()).collect();
675        let conn = self.conn();
676        let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
677        Ok(count)
678    }
679
680    /// List distinct tool names for the current list filter (ignores active tool filter).
681    pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
682        let mut tool_filter = filter.clone();
683        tool_filter.tool = None;
684        tool_filter.limit = None;
685        tool_filter.offset = None;
686        let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
687        let sql = format!(
688            "SELECT DISTINCT s.tool \
689             {FROM_CLAUSE} WHERE {where_str} \
690             ORDER BY s.tool ASC"
691        );
692        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
693            param_values.iter().map(|p| p.as_ref()).collect();
694        let conn = self.conn();
695        let mut stmt = conn.prepare(&sql)?;
696        let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
697
698        let mut tools = Vec::new();
699        for row in rows {
700            let tool = row?;
701            if !tool.trim().is_empty() {
702                tools.push(tool);
703            }
704        }
705        Ok(tools)
706    }
707
708    // ── Log query ─────────────────────────────────────────────────────
709
710    /// Query sessions with extended filters for the `log` command.
711    pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
712        let mut where_clauses = vec!["1=1".to_string()];
713        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
714        let mut idx = 1u32;
715
716        if let Some(ref tool) = filter.tool {
717            where_clauses.push(format!("s.tool = ?{idx}"));
718            param_values.push(Box::new(tool.clone()));
719            idx += 1;
720        }
721
722        if let Some(ref model) = filter.model {
723            let like = model.replace('*', "%");
724            where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
725            param_values.push(Box::new(like));
726            idx += 1;
727        }
728
729        if let Some(ref since) = filter.since {
730            where_clauses.push(format!("s.created_at >= ?{idx}"));
731            param_values.push(Box::new(since.clone()));
732            idx += 1;
733        }
734
735        if let Some(ref before) = filter.before {
736            where_clauses.push(format!("s.created_at < ?{idx}"));
737            param_values.push(Box::new(before.clone()));
738            idx += 1;
739        }
740
741        if let Some(ref touches) = filter.touches {
742            let like = format!("%\"{touches}\"%");
743            where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
744            param_values.push(Box::new(like));
745            idx += 1;
746        }
747
748        if let Some(ref grep) = filter.grep {
749            let like = format!("%{grep}%");
750            where_clauses.push(format!(
751                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
752                i1 = idx,
753                i2 = idx + 1,
754                i3 = idx + 2,
755            ));
756            param_values.push(Box::new(like.clone()));
757            param_values.push(Box::new(like.clone()));
758            param_values.push(Box::new(like));
759            idx += 3;
760        }
761
762        if let Some(true) = filter.has_errors {
763            where_clauses.push("s.has_errors = 1".to_string());
764        }
765
766        if let Some(ref wd) = filter.working_directory {
767            where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
768            param_values.push(Box::new(format!("{wd}%")));
769            idx += 1;
770        }
771
772        if let Some(ref repo) = filter.git_repo_name {
773            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
774            param_values.push(Box::new(repo.clone()));
775            idx += 1;
776        }
777
778        // Optional JOIN for commit hash filter
779        let mut extra_join = String::new();
780        if let Some(ref commit) = filter.commit {
781            extra_join =
782                " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
783            where_clauses.push(format!("csl.commit_hash = ?{idx}"));
784            param_values.push(Box::new(commit.clone()));
785            idx += 1;
786        }
787
788        let _ = idx; // suppress unused warning
789
790        let where_str = where_clauses.join(" AND ");
791        let mut sql = format!(
792            "SELECT {LOCAL_SESSION_COLUMNS} \
793             {FROM_CLAUSE}{extra_join} WHERE {where_str} \
794             ORDER BY s.created_at DESC"
795        );
796
797        if let Some(limit) = filter.limit {
798            sql.push_str(" LIMIT ?");
799            param_values.push(Box::new(limit));
800            if let Some(offset) = filter.offset {
801                sql.push_str(" OFFSET ?");
802                param_values.push(Box::new(offset));
803            }
804        }
805
806        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
807            param_values.iter().map(|p| p.as_ref()).collect();
808        let conn = self.conn();
809        let mut stmt = conn.prepare(&sql)?;
810        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
811
812        let mut result = Vec::new();
813        for row in rows {
814            result.push(row?);
815        }
816        Ok(hide_opencode_child_sessions(result))
817    }
818
819    /// Get the latest N sessions for a specific tool, ordered by created_at DESC.
820    pub fn get_sessions_by_tool_latest(
821        &self,
822        tool: &str,
823        count: u32,
824    ) -> Result<Vec<LocalSessionRow>> {
825        let sql = format!(
826            "SELECT {LOCAL_SESSION_COLUMNS} \
827             {FROM_CLAUSE} WHERE s.tool = ?1 \
828             ORDER BY s.created_at DESC"
829        );
830        let conn = self.conn();
831        let mut stmt = conn.prepare(&sql)?;
832        let rows = stmt.query_map(params![tool], row_to_local_session)?;
833        let mut result = Vec::new();
834        for row in rows {
835            result.push(row?);
836        }
837
838        let mut filtered = hide_opencode_child_sessions(result);
839        filtered.truncate(count as usize);
840        Ok(filtered)
841    }
842
843    /// Get the latest N sessions across all tools, ordered by created_at DESC.
844    pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
845        let sql = format!(
846            "SELECT {LOCAL_SESSION_COLUMNS} \
847             {FROM_CLAUSE} \
848             ORDER BY s.created_at DESC"
849        );
850        let conn = self.conn();
851        let mut stmt = conn.prepare(&sql)?;
852        let rows = stmt.query_map([], row_to_local_session)?;
853        let mut result = Vec::new();
854        for row in rows {
855            result.push(row?);
856        }
857
858        let mut filtered = hide_opencode_child_sessions(result);
859        filtered.truncate(count as usize);
860        Ok(filtered)
861    }
862
863    /// Get the Nth most recent session for a specific tool (0 = HEAD, 1 = HEAD~1, etc.).
864    pub fn get_session_by_tool_offset(
865        &self,
866        tool: &str,
867        offset: u32,
868    ) -> Result<Option<LocalSessionRow>> {
869        let sql = format!(
870            "SELECT {LOCAL_SESSION_COLUMNS} \
871             {FROM_CLAUSE} WHERE s.tool = ?1 \
872             ORDER BY s.created_at DESC"
873        );
874        let conn = self.conn();
875        let mut stmt = conn.prepare(&sql)?;
876        let rows = stmt.query_map(params![tool], row_to_local_session)?;
877        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
878        Ok(filtered.into_iter().nth(offset as usize))
879    }
880
881    /// Get the Nth most recent session across all tools (0 = HEAD, 1 = HEAD~1, etc.).
882    pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
883        let sql = format!(
884            "SELECT {LOCAL_SESSION_COLUMNS} \
885             {FROM_CLAUSE} \
886             ORDER BY s.created_at DESC"
887        );
888        let conn = self.conn();
889        let mut stmt = conn.prepare(&sql)?;
890        let rows = stmt.query_map([], row_to_local_session)?;
891        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
892        Ok(filtered.into_iter().nth(offset as usize))
893    }
894
895    /// Fetch the source path used when the session was last parsed/loaded.
896    pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
897        let conn = self.conn();
898        let result = conn
899            .query_row(
900                "SELECT source_path FROM session_sync WHERE session_id = ?1",
901                params![session_id],
902                |row| row.get(0),
903            )
904            .optional()?;
905
906        Ok(result)
907    }
908
909    /// Count total sessions in the local DB.
910    pub fn session_count(&self) -> Result<i64> {
911        let count = self
912            .conn()
913            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
914        Ok(count)
915    }
916
917    // ── Delete session ─────────────────────────────────────────────────
918
919    pub fn delete_session(&self, session_id: &str) -> Result<()> {
920        let conn = self.conn();
921        conn.execute(
922            "DELETE FROM body_cache WHERE session_id = ?1",
923            params![session_id],
924        )?;
925        conn.execute(
926            "DELETE FROM session_sync WHERE session_id = ?1",
927            params![session_id],
928        )?;
929        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
930        Ok(())
931    }
932
933    // ── Sync cursor ────────────────────────────────────────────────────
934
935    pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
936        let cursor = self
937            .conn()
938            .query_row(
939                "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
940                params![team_id],
941                |row| row.get(0),
942            )
943            .optional()?;
944        Ok(cursor)
945    }
946
947    pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
948        self.conn().execute(
949            "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
950             VALUES (?1, ?2, datetime('now')) \
951             ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
952            params![team_id, cursor],
953        )?;
954        Ok(())
955    }
956
957    // ── Upload tracking ────────────────────────────────────────────────
958
959    /// Get sessions that are local_only and need to be uploaded.
960    pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
961        let sql = format!(
962            "SELECT {LOCAL_SESSION_COLUMNS} \
963             FROM sessions s \
964             INNER JOIN session_sync ss ON ss.session_id = s.id \
965             LEFT JOIN users u ON u.id = s.user_id \
966             WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
967             ORDER BY s.created_at ASC"
968        );
969        let conn = self.conn();
970        let mut stmt = conn.prepare(&sql)?;
971        let rows = stmt.query_map(params![team_id], row_to_local_session)?;
972        let mut result = Vec::new();
973        for row in rows {
974            result.push(row?);
975        }
976        Ok(result)
977    }
978
979    pub fn mark_synced(&self, session_id: &str) -> Result<()> {
980        self.conn().execute(
981            "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
982             WHERE session_id = ?1",
983            params![session_id],
984        )?;
985        Ok(())
986    }
987
988    /// Check if a session was already uploaded (synced or remote_only) since the given modification time.
989    pub fn was_uploaded_after(
990        &self,
991        source_path: &str,
992        modified: &chrono::DateTime<chrono::Utc>,
993    ) -> Result<bool> {
994        let result: Option<String> = self
995            .conn()
996            .query_row(
997                "SELECT last_synced_at FROM session_sync \
998                 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
999                params![source_path],
1000                |row| row.get(0),
1001            )
1002            .optional()?;
1003
1004        if let Some(synced_at) = result {
1005            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1006                return Ok(dt >= *modified);
1007            }
1008        }
1009        Ok(false)
1010    }
1011
1012    // ── Body cache ─────────────────────────────────────────────────────
1013
1014    pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1015        self.conn().execute(
1016            "INSERT INTO body_cache (session_id, body, cached_at) \
1017             VALUES (?1, ?2, datetime('now')) \
1018             ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1019            params![session_id, body],
1020        )?;
1021        Ok(())
1022    }
1023
1024    pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1025        let body = self
1026            .conn()
1027            .query_row(
1028                "SELECT body FROM body_cache WHERE session_id = ?1",
1029                params![session_id],
1030                |row| row.get(0),
1031            )
1032            .optional()?;
1033        Ok(body)
1034    }
1035
1036    // ── Timeline summary cache ────────────────────────────────────────
1037
1038    pub fn upsert_timeline_summary_cache(
1039        &self,
1040        lookup_key: &str,
1041        namespace: &str,
1042        compact: &str,
1043        payload: &str,
1044        raw: &str,
1045    ) -> Result<()> {
1046        self.conn().execute(
1047            "INSERT INTO timeline_summary_cache \
1048             (lookup_key, namespace, compact, payload, raw, cached_at) \
1049             VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1050             ON CONFLICT(lookup_key) DO UPDATE SET \
1051               namespace = excluded.namespace, \
1052               compact = excluded.compact, \
1053               payload = excluded.payload, \
1054               raw = excluded.raw, \
1055               cached_at = datetime('now')",
1056            params![lookup_key, namespace, compact, payload, raw],
1057        )?;
1058        Ok(())
1059    }
1060
1061    pub fn list_timeline_summary_cache_by_namespace(
1062        &self,
1063        namespace: &str,
1064    ) -> Result<Vec<TimelineSummaryCacheRow>> {
1065        let conn = self.conn();
1066        let mut stmt = conn.prepare(
1067            "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1068             FROM timeline_summary_cache \
1069             WHERE namespace = ?1 \
1070             ORDER BY cached_at DESC",
1071        )?;
1072        let rows = stmt.query_map(params![namespace], |row| {
1073            Ok(TimelineSummaryCacheRow {
1074                lookup_key: row.get(0)?,
1075                namespace: row.get(1)?,
1076                compact: row.get(2)?,
1077                payload: row.get(3)?,
1078                raw: row.get(4)?,
1079                cached_at: row.get(5)?,
1080            })
1081        })?;
1082
1083        let mut out = Vec::new();
1084        for row in rows {
1085            out.push(row?);
1086        }
1087        Ok(out)
1088    }
1089
1090    pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1091        let affected = self
1092            .conn()
1093            .execute("DELETE FROM timeline_summary_cache", [])?;
1094        Ok(affected)
1095    }
1096
1097    // ── Migration helper ───────────────────────────────────────────────
1098
1099    /// Migrate entries from the old state.json UploadState into the local DB.
1100    /// Marks them as `synced` with no metadata (we only know the file path was uploaded).
1101    pub fn migrate_from_state_json(
1102        &self,
1103        uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1104    ) -> Result<usize> {
1105        let mut count = 0;
1106        for (path, uploaded_at) in uploaded {
1107            let exists: bool = self
1108                .conn()
1109                .query_row(
1110                    "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1111                    params![path],
1112                    |row| row.get(0),
1113                )
1114                .unwrap_or(false);
1115
1116            if exists {
1117                self.conn().execute(
1118                    "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1119                     WHERE source_path = ?2 AND sync_status = 'local_only'",
1120                    params![uploaded_at.to_rfc3339(), path],
1121                )?;
1122                count += 1;
1123            }
1124        }
1125        Ok(count)
1126    }
1127
1128    // ── Commit ↔ session linking ────────────────────────────────────
1129
1130    /// Link a git commit to an AI session.
1131    pub fn link_commit_session(
1132        &self,
1133        commit_hash: &str,
1134        session_id: &str,
1135        repo_path: Option<&str>,
1136        branch: Option<&str>,
1137    ) -> Result<()> {
1138        self.conn().execute(
1139            "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1140             VALUES (?1, ?2, ?3, ?4) \
1141             ON CONFLICT(commit_hash, session_id) DO NOTHING",
1142            params![commit_hash, session_id, repo_path, branch],
1143        )?;
1144        Ok(())
1145    }
1146
1147    /// Get all sessions linked to a git commit.
1148    pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1149        let sql = format!(
1150            "SELECT {LOCAL_SESSION_COLUMNS} \
1151             {FROM_CLAUSE} \
1152             INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1153             WHERE csl.commit_hash = ?1 \
1154             ORDER BY s.created_at DESC"
1155        );
1156        let conn = self.conn();
1157        let mut stmt = conn.prepare(&sql)?;
1158        let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1159        let mut result = Vec::new();
1160        for row in rows {
1161            result.push(row?);
1162        }
1163        Ok(result)
1164    }
1165
1166    /// Get all commits linked to a session.
1167    pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1168        let conn = self.conn();
1169        let mut stmt = conn.prepare(
1170            "SELECT commit_hash, session_id, repo_path, branch, created_at \
1171             FROM commit_session_links WHERE session_id = ?1 \
1172             ORDER BY created_at DESC",
1173        )?;
1174        let rows = stmt.query_map(params![session_id], |row| {
1175            Ok(CommitLink {
1176                commit_hash: row.get(0)?,
1177                session_id: row.get(1)?,
1178                repo_path: row.get(2)?,
1179                branch: row.get(3)?,
1180                created_at: row.get(4)?,
1181            })
1182        })?;
1183        let mut result = Vec::new();
1184        for row in rows {
1185            result.push(row?);
1186        }
1187        Ok(result)
1188    }
1189
1190    /// Find the most recently active session for a given repo path.
1191    /// "Active" means the session's working_directory matches the repo path
1192    /// and was created within the last `since_minutes` minutes.
1193    pub fn find_active_session_for_repo(
1194        &self,
1195        repo_path: &str,
1196        since_minutes: u32,
1197    ) -> Result<Option<LocalSessionRow>> {
1198        let sql = format!(
1199            "SELECT {LOCAL_SESSION_COLUMNS} \
1200             {FROM_CLAUSE} \
1201             WHERE s.working_directory LIKE ?1 \
1202             AND s.created_at >= datetime('now', ?2) \
1203             ORDER BY s.created_at DESC LIMIT 1"
1204        );
1205        let since = format!("-{since_minutes} minutes");
1206        let like = format!("{repo_path}%");
1207        let conn = self.conn();
1208        let mut stmt = conn.prepare(&sql)?;
1209        let row = stmt
1210            .query_map(params![like, since], row_to_local_session)?
1211            .next()
1212            .transpose()?;
1213        Ok(row)
1214    }
1215
1216    /// Get all session IDs currently in the local DB.
1217    pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1218        let conn = self.conn();
1219        let mut stmt = conn
1220            .prepare("SELECT id FROM sessions")
1221            .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1222        let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1223        let mut set = std::collections::HashSet::new();
1224        if let Ok(rows) = rows {
1225            for row in rows.flatten() {
1226                set.insert(row);
1227            }
1228        }
1229        set
1230    }
1231
1232    /// Update only stats fields for an existing session (no git context re-extraction).
1233    pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1234        let title = session.context.title.as_deref();
1235        let description = session.context.description.as_deref();
1236        let (files_modified, files_read, has_errors) =
1237            opensession_core::extract::extract_file_metadata(session);
1238        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1239
1240        self.conn().execute(
1241            "UPDATE sessions SET \
1242             title=?2, description=?3, \
1243             message_count=?4, user_message_count=?5, task_count=?6, \
1244             event_count=?7, duration_seconds=?8, \
1245             total_input_tokens=?9, total_output_tokens=?10, \
1246             files_modified=?11, files_read=?12, has_errors=?13, \
1247             max_active_agents=?14 \
1248             WHERE id=?1",
1249            params![
1250                &session.session_id,
1251                title,
1252                description,
1253                session.stats.message_count as i64,
1254                session.stats.user_message_count as i64,
1255                session.stats.task_count as i64,
1256                session.stats.event_count as i64,
1257                session.stats.duration_seconds as i64,
1258                session.stats.total_input_tokens as i64,
1259                session.stats.total_output_tokens as i64,
1260                &files_modified,
1261                &files_read,
1262                has_errors,
1263                max_active_agents,
1264            ],
1265        )?;
1266        Ok(())
1267    }
1268
1269    /// Update only sync metadata path for an existing session.
1270    pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1271        self.conn().execute(
1272            "INSERT INTO session_sync (session_id, source_path) \
1273             VALUES (?1, ?2) \
1274             ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1275            params![session_id, source_path],
1276        )?;
1277        Ok(())
1278    }
1279
1280    /// Get a list of distinct git repo names present in the DB.
1281    pub fn list_repos(&self) -> Result<Vec<String>> {
1282        let conn = self.conn();
1283        let mut stmt = conn.prepare(
1284            "SELECT DISTINCT git_repo_name FROM sessions \
1285             WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1286        )?;
1287        let rows = stmt.query_map([], |row| row.get(0))?;
1288        let mut result = Vec::new();
1289        for row in rows {
1290            result.push(row?);
1291        }
1292        Ok(result)
1293    }
1294
1295    /// Get a list of distinct, non-empty working directories present in the DB.
1296    pub fn list_working_directories(&self) -> Result<Vec<String>> {
1297        let conn = self.conn();
1298        let mut stmt = conn.prepare(
1299            "SELECT DISTINCT working_directory FROM sessions \
1300             WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1301             ORDER BY working_directory ASC",
1302        )?;
1303        let rows = stmt.query_map([], |row| row.get(0))?;
1304        let mut result = Vec::new();
1305        for row in rows {
1306            result.push(row?);
1307        }
1308        Ok(result)
1309    }
1310}
1311
1312// ── Legacy schema backfill ─────────────────────────────────────────────
1313
1314fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1315    let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1316    conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1317
1318    // Disable FK constraints for local DB (it's a cache, not source of truth)
1319    conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1320
1321    apply_local_migrations(&conn)?;
1322
1323    // Backfill missing columns for legacy local DBs where `sessions` already
1324    // existed before newer fields were introduced.
1325    ensure_sessions_columns(&conn)?;
1326    validate_local_schema(&conn)?;
1327
1328    Ok(conn)
1329}
1330
1331fn apply_local_migrations(conn: &Connection) -> Result<()> {
1332    conn.execute_batch(
1333        "CREATE TABLE IF NOT EXISTS _migrations (
1334            id INTEGER PRIMARY KEY,
1335            name TEXT NOT NULL UNIQUE,
1336            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1337        );",
1338    )
1339    .context("create _migrations table for local db")?;
1340
1341    for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1342        let already_applied: bool = conn
1343            .query_row(
1344                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1345                [name],
1346                |row| row.get(0),
1347            )
1348            .unwrap_or(false);
1349
1350        if already_applied {
1351            continue;
1352        }
1353
1354        if let Err(e) = conn.execute_batch(sql) {
1355            let msg = e.to_string().to_ascii_lowercase();
1356            if !is_local_migration_compat_error(&msg) {
1357                return Err(e).with_context(|| format!("apply local migration {name}"));
1358            }
1359        }
1360
1361        conn.execute(
1362            "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1363            [name],
1364        )
1365        .with_context(|| format!("record local migration {name}"))?;
1366    }
1367
1368    Ok(())
1369}
1370
1371fn is_local_migration_compat_error(msg: &str) -> bool {
1372    msg.contains("duplicate column name")
1373        || msg.contains("no such column")
1374        || msg.contains("already exists")
1375}
1376
1377fn validate_local_schema(conn: &Connection) -> Result<()> {
1378    let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1379    conn.prepare(&sql)
1380        .map(|_| ())
1381        .context("validate local session schema")
1382}
1383
1384fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1385    let msg = format!("{err:#}").to_ascii_lowercase();
1386    msg.contains("no such column")
1387        || msg.contains("no such table")
1388        || msg.contains("cannot add a column")
1389        || msg.contains("already exists")
1390        || msg.contains("views may not be indexed")
1391        || msg.contains("malformed database schema")
1392        || msg.contains("duplicate column name")
1393}
1394
1395fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1396    if !path.exists() {
1397        return Ok(());
1398    }
1399
1400    let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1401    let backup_name = format!(
1402        "{}.legacy-{}.bak",
1403        path.file_name()
1404            .and_then(|n| n.to_str())
1405            .unwrap_or("local.db"),
1406        ts
1407    );
1408    let backup_path = path.with_file_name(backup_name);
1409    std::fs::rename(path, &backup_path).with_context(|| {
1410        format!(
1411            "rotate legacy db {} -> {}",
1412            path.display(),
1413            backup_path.display()
1414        )
1415    })?;
1416
1417    let wal = PathBuf::from(format!("{}-wal", path.display()));
1418    let shm = PathBuf::from(format!("{}-shm", path.display()));
1419    let _ = std::fs::remove_file(wal);
1420    let _ = std::fs::remove_file(shm);
1421    Ok(())
1422}
1423
1424const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1425    ("user_id", "TEXT"),
1426    ("team_id", "TEXT DEFAULT 'personal'"),
1427    ("tool", "TEXT DEFAULT ''"),
1428    ("agent_provider", "TEXT"),
1429    ("agent_model", "TEXT"),
1430    ("title", "TEXT"),
1431    ("description", "TEXT"),
1432    ("tags", "TEXT"),
1433    ("created_at", "TEXT DEFAULT ''"),
1434    ("uploaded_at", "TEXT DEFAULT ''"),
1435    ("message_count", "INTEGER DEFAULT 0"),
1436    ("user_message_count", "INTEGER DEFAULT 0"),
1437    ("task_count", "INTEGER DEFAULT 0"),
1438    ("event_count", "INTEGER DEFAULT 0"),
1439    ("duration_seconds", "INTEGER DEFAULT 0"),
1440    ("total_input_tokens", "INTEGER DEFAULT 0"),
1441    ("total_output_tokens", "INTEGER DEFAULT 0"),
1442    ("body_storage_key", "TEXT DEFAULT ''"),
1443    ("body_url", "TEXT"),
1444    ("git_remote", "TEXT"),
1445    ("git_branch", "TEXT"),
1446    ("git_commit", "TEXT"),
1447    ("git_repo_name", "TEXT"),
1448    ("pr_number", "INTEGER"),
1449    ("pr_url", "TEXT"),
1450    ("working_directory", "TEXT"),
1451    ("files_modified", "TEXT"),
1452    ("files_read", "TEXT"),
1453    ("has_errors", "BOOLEAN DEFAULT 0"),
1454    ("max_active_agents", "INTEGER DEFAULT 1"),
1455];
1456
1457fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1458    let mut existing = HashSet::new();
1459    let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1460    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1461    for row in rows {
1462        existing.insert(row?);
1463    }
1464
1465    for (name, decl) in REQUIRED_SESSION_COLUMNS {
1466        if existing.contains(*name) {
1467            continue;
1468        }
1469        let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1470        conn.execute_batch(&sql)
1471            .with_context(|| format!("add legacy sessions column '{name}'"))?;
1472    }
1473
1474    Ok(())
1475}
1476
1477/// Column list for SELECT queries against sessions + session_sync + users.
1478pub const LOCAL_SESSION_COLUMNS: &str = "\
1479s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1480s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1481s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1482s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1483s.total_input_tokens, s.total_output_tokens, \
1484s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1485s.pr_number, s.pr_url, s.working_directory, \
1486s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1487
1488fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1489    Ok(LocalSessionRow {
1490        id: row.get(0)?,
1491        source_path: row.get(1)?,
1492        sync_status: row.get(2)?,
1493        last_synced_at: row.get(3)?,
1494        user_id: row.get(4)?,
1495        nickname: row.get(5)?,
1496        team_id: row.get(6)?,
1497        tool: row.get(7)?,
1498        agent_provider: row.get(8)?,
1499        agent_model: row.get(9)?,
1500        title: row.get(10)?,
1501        description: row.get(11)?,
1502        tags: row.get(12)?,
1503        created_at: row.get(13)?,
1504        uploaded_at: row.get(14)?,
1505        message_count: row.get(15)?,
1506        user_message_count: row.get(16)?,
1507        task_count: row.get(17)?,
1508        event_count: row.get(18)?,
1509        duration_seconds: row.get(19)?,
1510        total_input_tokens: row.get(20)?,
1511        total_output_tokens: row.get(21)?,
1512        git_remote: row.get(22)?,
1513        git_branch: row.get(23)?,
1514        git_commit: row.get(24)?,
1515        git_repo_name: row.get(25)?,
1516        pr_number: row.get(26)?,
1517        pr_url: row.get(27)?,
1518        working_directory: row.get(28)?,
1519        files_modified: row.get(29)?,
1520        files_read: row.get(30)?,
1521        has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1522        max_active_agents: row.get(32).unwrap_or(1),
1523    })
1524}
1525
1526fn default_db_path() -> Result<PathBuf> {
1527    let home = std::env::var("HOME")
1528        .or_else(|_| std::env::var("USERPROFILE"))
1529        .context("Could not determine home directory")?;
1530    Ok(PathBuf::from(home)
1531        .join(".local")
1532        .join("share")
1533        .join("opensession")
1534        .join("local.db"))
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539    use super::*;
1540
1541    use std::collections::BTreeSet;
1542    use std::fs::{create_dir_all, write};
1543    use tempfile::tempdir;
1544
1545    fn test_db() -> LocalDb {
1546        let dir = tempdir().unwrap();
1547        let path = dir.keep().join("test.db");
1548        LocalDb::open_path(&path).unwrap()
1549    }
1550
1551    fn temp_root() -> tempfile::TempDir {
1552        tempdir().unwrap()
1553    }
1554
1555    fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1556        LocalSessionRow {
1557            id: id.to_string(),
1558            source_path: source_path.map(String::from),
1559            sync_status: "local_only".to_string(),
1560            last_synced_at: None,
1561            user_id: None,
1562            nickname: None,
1563            team_id: None,
1564            tool: tool.to_string(),
1565            agent_provider: None,
1566            agent_model: None,
1567            title: Some("test".to_string()),
1568            description: None,
1569            tags: None,
1570            created_at: "2024-01-01T00:00:00Z".to_string(),
1571            uploaded_at: None,
1572            message_count: 0,
1573            user_message_count: 0,
1574            task_count: 0,
1575            event_count: 0,
1576            duration_seconds: 0,
1577            total_input_tokens: 0,
1578            total_output_tokens: 0,
1579            git_remote: None,
1580            git_branch: None,
1581            git_commit: None,
1582            git_repo_name: None,
1583            pr_number: None,
1584            pr_url: None,
1585            working_directory: None,
1586            files_modified: None,
1587            files_read: None,
1588            has_errors: false,
1589            max_active_agents: 1,
1590        }
1591    }
1592
1593    #[test]
1594    fn test_open_and_schema() {
1595        let _db = test_db();
1596    }
1597
1598    #[test]
1599    fn test_open_backfills_legacy_sessions_columns() {
1600        let dir = tempfile::tempdir().unwrap();
1601        let path = dir.path().join("legacy.db");
1602        {
1603            let conn = Connection::open(&path).unwrap();
1604            conn.execute_batch(
1605                "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1606                 INSERT INTO sessions (id) VALUES ('legacy-1');",
1607            )
1608            .unwrap();
1609        }
1610
1611        let db = LocalDb::open_path(&path).unwrap();
1612        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1613        assert_eq!(rows.len(), 1);
1614        assert_eq!(rows[0].id, "legacy-1");
1615        assert_eq!(rows[0].user_message_count, 0);
1616    }
1617
1618    #[test]
1619    fn test_open_rotates_incompatible_legacy_schema() {
1620        let dir = tempfile::tempdir().unwrap();
1621        let path = dir.path().join("broken.db");
1622        {
1623            let conn = Connection::open(&path).unwrap();
1624            conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1625                .unwrap();
1626        }
1627
1628        let db = LocalDb::open_path(&path).unwrap();
1629        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1630        assert!(rows.is_empty());
1631
1632        let rotated = std::fs::read_dir(dir.path())
1633            .unwrap()
1634            .filter_map(Result::ok)
1635            .any(|entry| {
1636                let name = entry.file_name();
1637                let name = name.to_string_lossy();
1638                name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1639            });
1640        assert!(rotated, "expected rotated legacy backup file");
1641    }
1642
1643    #[test]
1644    fn test_is_opencode_child_session() {
1645        let root = temp_root();
1646        let dir = root.path().join("sessions");
1647        create_dir_all(&dir).unwrap();
1648        let parent_session = dir.join("parent.json");
1649        write(
1650            &parent_session,
1651            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1652        )
1653        .unwrap();
1654        let child_session = dir.join("child.json");
1655        write(
1656            &child_session,
1657            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1658        )
1659        .unwrap();
1660
1661        let parent = make_row(
1662            "ses_parent",
1663            "opencode",
1664            Some(parent_session.to_str().unwrap()),
1665        );
1666        let child = make_row(
1667            "ses_child",
1668            "opencode",
1669            Some(child_session.to_str().unwrap()),
1670        );
1671        let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1672
1673        assert!(!is_opencode_child_session(&parent));
1674        assert!(is_opencode_child_session(&child));
1675        assert!(!is_opencode_child_session(&codex));
1676    }
1677
1678    #[test]
1679    fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1680        let child = LocalSessionRow {
1681            id: "sess_child".to_string(),
1682            source_path: None,
1683            sync_status: "local_only".to_string(),
1684            last_synced_at: None,
1685            user_id: None,
1686            nickname: None,
1687            team_id: None,
1688            tool: "opencode".to_string(),
1689            agent_provider: None,
1690            agent_model: None,
1691            title: None,
1692            description: None,
1693            tags: None,
1694            created_at: "2024-01-01T00:00:00Z".to_string(),
1695            uploaded_at: None,
1696            message_count: 1,
1697            user_message_count: 0,
1698            task_count: 4,
1699            event_count: 4,
1700            duration_seconds: 0,
1701            total_input_tokens: 0,
1702            total_output_tokens: 0,
1703            git_remote: None,
1704            git_branch: None,
1705            git_commit: None,
1706            git_repo_name: None,
1707            pr_number: None,
1708            pr_url: None,
1709            working_directory: None,
1710            files_modified: None,
1711            files_read: None,
1712            has_errors: false,
1713            max_active_agents: 1,
1714        };
1715
1716        let parent = LocalSessionRow {
1717            id: "sess_parent".to_string(),
1718            source_path: None,
1719            sync_status: "local_only".to_string(),
1720            last_synced_at: None,
1721            user_id: None,
1722            nickname: None,
1723            team_id: None,
1724            tool: "opencode".to_string(),
1725            agent_provider: None,
1726            agent_model: None,
1727            title: Some("regular".to_string()),
1728            description: None,
1729            tags: None,
1730            created_at: "2024-01-01T00:00:00Z".to_string(),
1731            uploaded_at: None,
1732            message_count: 1,
1733            user_message_count: 1,
1734            task_count: 2,
1735            event_count: 20,
1736            duration_seconds: 0,
1737            total_input_tokens: 0,
1738            total_output_tokens: 0,
1739            git_remote: None,
1740            git_branch: None,
1741            git_commit: None,
1742            git_repo_name: None,
1743            pr_number: None,
1744            pr_url: None,
1745            working_directory: None,
1746            files_modified: None,
1747            files_read: None,
1748            has_errors: false,
1749            max_active_agents: 1,
1750        };
1751
1752        assert!(is_opencode_child_session(&child));
1753        assert!(!is_opencode_child_session(&parent));
1754    }
1755
1756    #[test]
1757    fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1758        let child = LocalSessionRow {
1759            id: "sess_child_2".to_string(),
1760            source_path: None,
1761            sync_status: "local_only".to_string(),
1762            last_synced_at: None,
1763            user_id: None,
1764            nickname: None,
1765            team_id: None,
1766            tool: "opencode".to_string(),
1767            agent_provider: None,
1768            agent_model: None,
1769            title: None,
1770            description: None,
1771            tags: None,
1772            created_at: "2024-01-01T00:00:00Z".to_string(),
1773            uploaded_at: None,
1774            message_count: 2,
1775            user_message_count: 0,
1776            task_count: 4,
1777            event_count: 4,
1778            duration_seconds: 0,
1779            total_input_tokens: 0,
1780            total_output_tokens: 0,
1781            git_remote: None,
1782            git_branch: None,
1783            git_commit: None,
1784            git_repo_name: None,
1785            pr_number: None,
1786            pr_url: None,
1787            working_directory: None,
1788            files_modified: None,
1789            files_read: None,
1790            has_errors: false,
1791            max_active_agents: 1,
1792        };
1793
1794        let parent = LocalSessionRow {
1795            id: "sess_parent".to_string(),
1796            source_path: None,
1797            sync_status: "local_only".to_string(),
1798            last_synced_at: None,
1799            user_id: None,
1800            nickname: None,
1801            team_id: None,
1802            tool: "opencode".to_string(),
1803            agent_provider: None,
1804            agent_model: None,
1805            title: Some("regular".to_string()),
1806            description: None,
1807            tags: None,
1808            created_at: "2024-01-01T00:00:00Z".to_string(),
1809            uploaded_at: None,
1810            message_count: 2,
1811            user_message_count: 1,
1812            task_count: 5,
1813            event_count: 20,
1814            duration_seconds: 0,
1815            total_input_tokens: 0,
1816            total_output_tokens: 0,
1817            git_remote: None,
1818            git_branch: None,
1819            git_commit: None,
1820            git_repo_name: None,
1821            pr_number: None,
1822            pr_url: None,
1823            working_directory: None,
1824            files_modified: None,
1825            files_read: None,
1826            has_errors: false,
1827            max_active_agents: 1,
1828        };
1829
1830        assert!(is_opencode_child_session(&child));
1831        assert!(!is_opencode_child_session(&parent));
1832    }
1833
1834    #[test]
1835    fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1836        let child = LocalSessionRow {
1837            id: "sess_child_3".to_string(),
1838            source_path: None,
1839            sync_status: "local_only".to_string(),
1840            last_synced_at: None,
1841            user_id: None,
1842            nickname: None,
1843            team_id: None,
1844            tool: "opencode".to_string(),
1845            agent_provider: None,
1846            agent_model: None,
1847            title: None,
1848            description: None,
1849            tags: None,
1850            created_at: "2024-01-01T00:00:00Z".to_string(),
1851            uploaded_at: None,
1852            message_count: 3,
1853            user_message_count: 0,
1854            task_count: 2,
1855            event_count: 6,
1856            duration_seconds: 0,
1857            total_input_tokens: 0,
1858            total_output_tokens: 0,
1859            git_remote: None,
1860            git_branch: None,
1861            git_commit: None,
1862            git_repo_name: None,
1863            pr_number: None,
1864            pr_url: None,
1865            working_directory: None,
1866            files_modified: None,
1867            files_read: None,
1868            has_errors: false,
1869            max_active_agents: 1,
1870        };
1871
1872        assert!(is_opencode_child_session(&child));
1873    }
1874
1875    #[test]
1876    fn test_parse_opencode_parent_session_id_aliases() {
1877        let root = temp_root();
1878        let dir = root.path().join("session-aliases");
1879        create_dir_all(&dir).unwrap();
1880        let child_session = dir.join("child.json");
1881        write(
1882            &child_session,
1883            r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1884        )
1885        .unwrap();
1886        assert_eq!(
1887            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1888            Some("ses_parent")
1889        );
1890    }
1891
1892    #[test]
1893    fn test_parse_opencode_parent_session_id_nested_metadata() {
1894        let root = temp_root();
1895        let dir = root.path().join("session-nested");
1896        create_dir_all(&dir).unwrap();
1897        let child_session = dir.join("child.json");
1898        write(
1899            &child_session,
1900            r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1901        )
1902        .unwrap();
1903        assert_eq!(
1904            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1905            Some("ses_parent")
1906        );
1907    }
1908
1909    #[test]
1910    fn test_is_claude_subagent_session() {
1911        let row = make_row(
1912            "ses_parent",
1913            "claude-code",
1914            Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1915        );
1916        assert!(!is_opencode_child_session(&row));
1917        assert!(is_claude_subagent_session(&row));
1918        assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1919    }
1920
1921    #[test]
1922    fn test_hide_opencode_child_sessions() {
1923        let root = temp_root();
1924        let dir = root.path().join("sessions");
1925        create_dir_all(&dir).unwrap();
1926        let parent_session = dir.join("parent.json");
1927        let child_session = dir.join("child.json");
1928        let orphan_session = dir.join("orphan.json");
1929
1930        write(
1931            &parent_session,
1932            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1933        )
1934        .unwrap();
1935        write(
1936            &child_session,
1937            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1938        )
1939        .unwrap();
1940        write(
1941            &orphan_session,
1942            r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1943        )
1944        .unwrap();
1945
1946        let rows = vec![
1947            make_row(
1948                "ses_child",
1949                "opencode",
1950                Some(child_session.to_str().unwrap()),
1951            ),
1952            make_row(
1953                "ses_parent",
1954                "opencode",
1955                Some(parent_session.to_str().unwrap()),
1956            ),
1957            {
1958                let mut row = make_row("ses_other", "codex", None);
1959                row.user_message_count = 1;
1960                row
1961            },
1962            make_row(
1963                "ses_orphan",
1964                "opencode",
1965                Some(orphan_session.to_str().unwrap()),
1966            ),
1967        ];
1968
1969        let filtered = hide_opencode_child_sessions(rows);
1970        assert_eq!(filtered.len(), 3);
1971        assert!(filtered.iter().all(|r| r.id != "ses_child"));
1972    }
1973
1974    #[test]
1975    fn test_sync_cursor() {
1976        let db = test_db();
1977        assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1978        db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1979        assert_eq!(
1980            db.get_sync_cursor("team1").unwrap(),
1981            Some("2024-01-01T00:00:00Z".to_string())
1982        );
1983        // Update
1984        db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1985        assert_eq!(
1986            db.get_sync_cursor("team1").unwrap(),
1987            Some("2024-06-01T00:00:00Z".to_string())
1988        );
1989    }
1990
1991    #[test]
1992    fn test_body_cache() {
1993        let db = test_db();
1994        assert_eq!(db.get_cached_body("s1").unwrap(), None);
1995        db.cache_body("s1", b"hello world").unwrap();
1996        assert_eq!(
1997            db.get_cached_body("s1").unwrap(),
1998            Some(b"hello world".to_vec())
1999        );
2000    }
2001
2002    #[test]
2003    fn test_timeline_summary_cache_roundtrip() {
2004        let db = test_db();
2005        db.upsert_timeline_summary_cache(
2006            "k1",
2007            "timeline:v1",
2008            "compact text",
2009            "{\"kind\":\"turn-summary\"}",
2010            "raw text",
2011        )
2012        .unwrap();
2013
2014        let rows = db
2015            .list_timeline_summary_cache_by_namespace("timeline:v1")
2016            .unwrap();
2017        assert_eq!(rows.len(), 1);
2018        assert_eq!(rows[0].lookup_key, "k1");
2019        assert_eq!(rows[0].namespace, "timeline:v1");
2020        assert_eq!(rows[0].compact, "compact text");
2021        assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2022        assert_eq!(rows[0].raw, "raw text");
2023
2024        let cleared = db.clear_timeline_summary_cache().unwrap();
2025        assert_eq!(cleared, 1);
2026        let rows_after = db
2027            .list_timeline_summary_cache_by_namespace("timeline:v1")
2028            .unwrap();
2029        assert!(rows_after.is_empty());
2030    }
2031
2032    #[test]
2033    fn test_local_migrations_include_timeline_summary_cache() {
2034        let db = test_db();
2035        let conn = db.conn();
2036        let applied: bool = conn
2037            .query_row(
2038                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2039                params!["local_0003_timeline_summary_cache"],
2040                |row| row.get(0),
2041            )
2042            .unwrap();
2043        assert!(applied);
2044    }
2045
2046    #[test]
2047    fn test_local_migration_files_match_api_local_migrations() {
2048        fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2049            std::fs::read_dir(dir)
2050                .expect("read migrations directory")
2051                .filter_map(Result::ok)
2052                .map(|entry| entry.file_name().to_string_lossy().to_string())
2053                .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2054                .collect()
2055        }
2056
2057        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2058        let local_files = collect_local_sql(manifest_dir.join("migrations"));
2059        let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2060
2061        assert_eq!(
2062            local_files, api_files,
2063            "local-db local migrations must stay in parity with api local migrations"
2064        );
2065    }
2066
2067    #[test]
2068    fn test_upsert_remote_session() {
2069        let db = test_db();
2070        let summary = RemoteSessionSummary {
2071            id: "remote-1".to_string(),
2072            user_id: Some("u1".to_string()),
2073            nickname: Some("alice".to_string()),
2074            team_id: "t1".to_string(),
2075            tool: "claude-code".to_string(),
2076            agent_provider: None,
2077            agent_model: None,
2078            title: Some("Test session".to_string()),
2079            description: None,
2080            tags: None,
2081            created_at: "2024-01-01T00:00:00Z".to_string(),
2082            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2083            message_count: 10,
2084            task_count: 2,
2085            event_count: 20,
2086            duration_seconds: 300,
2087            total_input_tokens: 1000,
2088            total_output_tokens: 500,
2089            git_remote: None,
2090            git_branch: None,
2091            git_commit: None,
2092            git_repo_name: None,
2093            pr_number: None,
2094            pr_url: None,
2095            working_directory: None,
2096            files_modified: None,
2097            files_read: None,
2098            has_errors: false,
2099            max_active_agents: 1,
2100        };
2101        db.upsert_remote_session(&summary).unwrap();
2102
2103        let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2104        assert_eq!(sessions.len(), 1);
2105        assert_eq!(sessions[0].id, "remote-1");
2106        assert_eq!(sessions[0].sync_status, "remote_only");
2107        assert_eq!(sessions[0].nickname, None); // no user in local users table
2108    }
2109
2110    #[test]
2111    fn test_list_filter_by_repo() {
2112        let db = test_db();
2113        // Insert a remote session with team_id
2114        let summary1 = RemoteSessionSummary {
2115            id: "s1".to_string(),
2116            user_id: None,
2117            nickname: None,
2118            team_id: "t1".to_string(),
2119            tool: "claude-code".to_string(),
2120            agent_provider: None,
2121            agent_model: None,
2122            title: Some("Session 1".to_string()),
2123            description: None,
2124            tags: None,
2125            created_at: "2024-01-01T00:00:00Z".to_string(),
2126            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2127            message_count: 5,
2128            task_count: 0,
2129            event_count: 10,
2130            duration_seconds: 60,
2131            total_input_tokens: 100,
2132            total_output_tokens: 50,
2133            git_remote: None,
2134            git_branch: None,
2135            git_commit: None,
2136            git_repo_name: None,
2137            pr_number: None,
2138            pr_url: None,
2139            working_directory: None,
2140            files_modified: None,
2141            files_read: None,
2142            has_errors: false,
2143            max_active_agents: 1,
2144        };
2145        db.upsert_remote_session(&summary1).unwrap();
2146
2147        // Filter by team
2148        let filter = LocalSessionFilter {
2149            team_id: Some("t1".to_string()),
2150            ..Default::default()
2151        };
2152        assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2153
2154        let filter = LocalSessionFilter {
2155            team_id: Some("t999".to_string()),
2156            ..Default::default()
2157        };
2158        assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2159    }
2160
2161    // ── Helpers for inserting test sessions ────────────────────────────
2162
2163    fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2164        RemoteSessionSummary {
2165            id: id.to_string(),
2166            user_id: None,
2167            nickname: None,
2168            team_id: "t1".to_string(),
2169            tool: tool.to_string(),
2170            agent_provider: Some("anthropic".to_string()),
2171            agent_model: Some("claude-opus-4-6".to_string()),
2172            title: Some(title.to_string()),
2173            description: None,
2174            tags: None,
2175            created_at: created_at.to_string(),
2176            uploaded_at: created_at.to_string(),
2177            message_count: 5,
2178            task_count: 1,
2179            event_count: 10,
2180            duration_seconds: 300,
2181            total_input_tokens: 1000,
2182            total_output_tokens: 500,
2183            git_remote: None,
2184            git_branch: None,
2185            git_commit: None,
2186            git_repo_name: None,
2187            pr_number: None,
2188            pr_url: None,
2189            working_directory: None,
2190            files_modified: None,
2191            files_read: None,
2192            has_errors: false,
2193            max_active_agents: 1,
2194        }
2195    }
2196
2197    fn seed_sessions(db: &LocalDb) {
2198        // Insert 5 sessions across two tools, ordered by created_at
2199        db.upsert_remote_session(&make_summary(
2200            "s1",
2201            "claude-code",
2202            "First session",
2203            "2024-01-01T00:00:00Z",
2204        ))
2205        .unwrap();
2206        db.upsert_remote_session(&make_summary(
2207            "s2",
2208            "claude-code",
2209            "JWT auth work",
2210            "2024-01-02T00:00:00Z",
2211        ))
2212        .unwrap();
2213        db.upsert_remote_session(&make_summary(
2214            "s3",
2215            "gemini",
2216            "Gemini test",
2217            "2024-01-03T00:00:00Z",
2218        ))
2219        .unwrap();
2220        db.upsert_remote_session(&make_summary(
2221            "s4",
2222            "claude-code",
2223            "Error handling",
2224            "2024-01-04T00:00:00Z",
2225        ))
2226        .unwrap();
2227        db.upsert_remote_session(&make_summary(
2228            "s5",
2229            "claude-code",
2230            "Final polish",
2231            "2024-01-05T00:00:00Z",
2232        ))
2233        .unwrap();
2234    }
2235
2236    // ── list_sessions_log tests ────────────────────────────────────────
2237
2238    #[test]
2239    fn test_log_no_filters() {
2240        let db = test_db();
2241        seed_sessions(&db);
2242        let filter = LogFilter::default();
2243        let results = db.list_sessions_log(&filter).unwrap();
2244        assert_eq!(results.len(), 5);
2245        // Should be ordered by created_at DESC
2246        assert_eq!(results[0].id, "s5");
2247        assert_eq!(results[4].id, "s1");
2248    }
2249
2250    #[test]
2251    fn test_log_filter_by_tool() {
2252        let db = test_db();
2253        seed_sessions(&db);
2254        let filter = LogFilter {
2255            tool: Some("claude-code".to_string()),
2256            ..Default::default()
2257        };
2258        let results = db.list_sessions_log(&filter).unwrap();
2259        assert_eq!(results.len(), 4);
2260        assert!(results.iter().all(|s| s.tool == "claude-code"));
2261    }
2262
2263    #[test]
2264    fn test_log_filter_by_model_wildcard() {
2265        let db = test_db();
2266        seed_sessions(&db);
2267        let filter = LogFilter {
2268            model: Some("claude*".to_string()),
2269            ..Default::default()
2270        };
2271        let results = db.list_sessions_log(&filter).unwrap();
2272        assert_eq!(results.len(), 5); // all have claude-opus model
2273    }
2274
2275    #[test]
2276    fn test_log_filter_since() {
2277        let db = test_db();
2278        seed_sessions(&db);
2279        let filter = LogFilter {
2280            since: Some("2024-01-03T00:00:00Z".to_string()),
2281            ..Default::default()
2282        };
2283        let results = db.list_sessions_log(&filter).unwrap();
2284        assert_eq!(results.len(), 3); // s3, s4, s5
2285    }
2286
2287    #[test]
2288    fn test_log_filter_before() {
2289        let db = test_db();
2290        seed_sessions(&db);
2291        let filter = LogFilter {
2292            before: Some("2024-01-03T00:00:00Z".to_string()),
2293            ..Default::default()
2294        };
2295        let results = db.list_sessions_log(&filter).unwrap();
2296        assert_eq!(results.len(), 2); // s1, s2
2297    }
2298
2299    #[test]
2300    fn test_log_filter_since_and_before() {
2301        let db = test_db();
2302        seed_sessions(&db);
2303        let filter = LogFilter {
2304            since: Some("2024-01-02T00:00:00Z".to_string()),
2305            before: Some("2024-01-04T00:00:00Z".to_string()),
2306            ..Default::default()
2307        };
2308        let results = db.list_sessions_log(&filter).unwrap();
2309        assert_eq!(results.len(), 2); // s2, s3
2310    }
2311
2312    #[test]
2313    fn test_log_filter_grep() {
2314        let db = test_db();
2315        seed_sessions(&db);
2316        let filter = LogFilter {
2317            grep: Some("JWT".to_string()),
2318            ..Default::default()
2319        };
2320        let results = db.list_sessions_log(&filter).unwrap();
2321        assert_eq!(results.len(), 1);
2322        assert_eq!(results[0].id, "s2");
2323    }
2324
2325    #[test]
2326    fn test_log_limit_and_offset() {
2327        let db = test_db();
2328        seed_sessions(&db);
2329        let filter = LogFilter {
2330            limit: Some(2),
2331            offset: Some(1),
2332            ..Default::default()
2333        };
2334        let results = db.list_sessions_log(&filter).unwrap();
2335        assert_eq!(results.len(), 2);
2336        assert_eq!(results[0].id, "s4"); // second most recent
2337        assert_eq!(results[1].id, "s3");
2338    }
2339
2340    #[test]
2341    fn test_log_limit_only() {
2342        let db = test_db();
2343        seed_sessions(&db);
2344        let filter = LogFilter {
2345            limit: Some(3),
2346            ..Default::default()
2347        };
2348        let results = db.list_sessions_log(&filter).unwrap();
2349        assert_eq!(results.len(), 3);
2350    }
2351
2352    #[test]
2353    fn test_list_sessions_limit_offset() {
2354        let db = test_db();
2355        seed_sessions(&db);
2356        let filter = LocalSessionFilter {
2357            limit: Some(2),
2358            offset: Some(1),
2359            ..Default::default()
2360        };
2361        let results = db.list_sessions(&filter).unwrap();
2362        assert_eq!(results.len(), 2);
2363        assert_eq!(results[0].id, "s4");
2364        assert_eq!(results[1].id, "s3");
2365    }
2366
2367    #[test]
2368    fn test_count_sessions_filtered() {
2369        let db = test_db();
2370        seed_sessions(&db);
2371        let count = db
2372            .count_sessions_filtered(&LocalSessionFilter::default())
2373            .unwrap();
2374        assert_eq!(count, 5);
2375    }
2376
2377    #[test]
2378    fn test_list_working_directories_distinct_non_empty() {
2379        let db = test_db();
2380
2381        let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2382        a.working_directory = Some("/tmp/repo-a".to_string());
2383        let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2384        b.working_directory = Some("/tmp/repo-a".to_string());
2385        let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2386        c.working_directory = Some("/tmp/repo-b".to_string());
2387        let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2388        d.working_directory = Some("".to_string());
2389
2390        db.upsert_remote_session(&a).unwrap();
2391        db.upsert_remote_session(&b).unwrap();
2392        db.upsert_remote_session(&c).unwrap();
2393        db.upsert_remote_session(&d).unwrap();
2394
2395        let dirs = db.list_working_directories().unwrap();
2396        assert_eq!(
2397            dirs,
2398            vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2399        );
2400    }
2401
2402    #[test]
2403    fn test_list_session_tools() {
2404        let db = test_db();
2405        seed_sessions(&db);
2406        let tools = db
2407            .list_session_tools(&LocalSessionFilter::default())
2408            .unwrap();
2409        assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2410    }
2411
2412    #[test]
2413    fn test_log_combined_filters() {
2414        let db = test_db();
2415        seed_sessions(&db);
2416        let filter = LogFilter {
2417            tool: Some("claude-code".to_string()),
2418            since: Some("2024-01-03T00:00:00Z".to_string()),
2419            limit: Some(1),
2420            ..Default::default()
2421        };
2422        let results = db.list_sessions_log(&filter).unwrap();
2423        assert_eq!(results.len(), 1);
2424        assert_eq!(results[0].id, "s5"); // most recent claude-code after Jan 3
2425    }
2426
2427    // ── Session offset/latest tests ────────────────────────────────────
2428
2429    #[test]
2430    fn test_get_session_by_offset() {
2431        let db = test_db();
2432        seed_sessions(&db);
2433        let row = db.get_session_by_offset(0).unwrap().unwrap();
2434        assert_eq!(row.id, "s5"); // most recent
2435        let row = db.get_session_by_offset(2).unwrap().unwrap();
2436        assert_eq!(row.id, "s3");
2437        assert!(db.get_session_by_offset(10).unwrap().is_none());
2438    }
2439
2440    #[test]
2441    fn test_get_session_by_tool_offset() {
2442        let db = test_db();
2443        seed_sessions(&db);
2444        let row = db
2445            .get_session_by_tool_offset("claude-code", 0)
2446            .unwrap()
2447            .unwrap();
2448        assert_eq!(row.id, "s5");
2449        let row = db
2450            .get_session_by_tool_offset("claude-code", 1)
2451            .unwrap()
2452            .unwrap();
2453        assert_eq!(row.id, "s4");
2454        let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2455        assert_eq!(row.id, "s3");
2456        assert!(db
2457            .get_session_by_tool_offset("gemini", 1)
2458            .unwrap()
2459            .is_none());
2460    }
2461
2462    #[test]
2463    fn test_get_sessions_latest() {
2464        let db = test_db();
2465        seed_sessions(&db);
2466        let rows = db.get_sessions_latest(3).unwrap();
2467        assert_eq!(rows.len(), 3);
2468        assert_eq!(rows[0].id, "s5");
2469        assert_eq!(rows[1].id, "s4");
2470        assert_eq!(rows[2].id, "s3");
2471    }
2472
2473    #[test]
2474    fn test_get_sessions_by_tool_latest() {
2475        let db = test_db();
2476        seed_sessions(&db);
2477        let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2478        assert_eq!(rows.len(), 2);
2479        assert_eq!(rows[0].id, "s5");
2480        assert_eq!(rows[1].id, "s4");
2481    }
2482
2483    #[test]
2484    fn test_get_sessions_latest_more_than_available() {
2485        let db = test_db();
2486        seed_sessions(&db);
2487        let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2488        assert_eq!(rows.len(), 1); // only 1 gemini session
2489    }
2490
2491    #[test]
2492    fn test_session_count() {
2493        let db = test_db();
2494        assert_eq!(db.session_count().unwrap(), 0);
2495        seed_sessions(&db);
2496        assert_eq!(db.session_count().unwrap(), 5);
2497    }
2498
2499    // ── Commit link tests ─────────────────────────────────────────────
2500
2501    #[test]
2502    fn test_link_commit_session() {
2503        let db = test_db();
2504        seed_sessions(&db);
2505        db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2506            .unwrap();
2507
2508        let commits = db.get_commits_by_session("s1").unwrap();
2509        assert_eq!(commits.len(), 1);
2510        assert_eq!(commits[0].commit_hash, "abc123");
2511        assert_eq!(commits[0].session_id, "s1");
2512        assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2513        assert_eq!(commits[0].branch.as_deref(), Some("main"));
2514
2515        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2516        assert_eq!(sessions.len(), 1);
2517        assert_eq!(sessions[0].id, "s1");
2518    }
2519
2520    #[test]
2521    fn test_get_sessions_by_commit() {
2522        let db = test_db();
2523        seed_sessions(&db);
2524        // Link multiple sessions to the same commit
2525        db.link_commit_session("abc123", "s1", None, None).unwrap();
2526        db.link_commit_session("abc123", "s2", None, None).unwrap();
2527        db.link_commit_session("abc123", "s3", None, None).unwrap();
2528
2529        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2530        assert_eq!(sessions.len(), 3);
2531        // Ordered by created_at DESC
2532        assert_eq!(sessions[0].id, "s3");
2533        assert_eq!(sessions[1].id, "s2");
2534        assert_eq!(sessions[2].id, "s1");
2535    }
2536
2537    #[test]
2538    fn test_get_commits_by_session() {
2539        let db = test_db();
2540        seed_sessions(&db);
2541        // Link multiple commits to the same session
2542        db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2543            .unwrap();
2544        db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2545            .unwrap();
2546        db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2547            .unwrap();
2548
2549        let commits = db.get_commits_by_session("s1").unwrap();
2550        assert_eq!(commits.len(), 3);
2551        // All linked to s1
2552        assert!(commits.iter().all(|c| c.session_id == "s1"));
2553    }
2554
2555    #[test]
2556    fn test_duplicate_link_ignored() {
2557        let db = test_db();
2558        seed_sessions(&db);
2559        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2560            .unwrap();
2561        // Inserting the same link again should not error
2562        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2563            .unwrap();
2564
2565        let commits = db.get_commits_by_session("s1").unwrap();
2566        assert_eq!(commits.len(), 1);
2567    }
2568
2569    #[test]
2570    fn test_log_filter_by_commit() {
2571        let db = test_db();
2572        seed_sessions(&db);
2573        db.link_commit_session("abc123", "s2", None, None).unwrap();
2574        db.link_commit_session("abc123", "s4", None, None).unwrap();
2575
2576        let filter = LogFilter {
2577            commit: Some("abc123".to_string()),
2578            ..Default::default()
2579        };
2580        let results = db.list_sessions_log(&filter).unwrap();
2581        assert_eq!(results.len(), 2);
2582        assert_eq!(results[0].id, "s4");
2583        assert_eq!(results[1].id, "s2");
2584
2585        // Non-existent commit returns nothing
2586        let filter = LogFilter {
2587            commit: Some("nonexistent".to_string()),
2588            ..Default::default()
2589        };
2590        let results = db.list_sessions_log(&filter).unwrap();
2591        assert_eq!(results.len(), 0);
2592    }
2593}