Skip to main content

opensession_local_db/
lib.rs

1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_core::trace::Session;
5use rusqlite::{params, Connection, OptionalExtension};
6use serde_json::Value;
7use std::collections::HashSet;
8use std::fs;
9use std::path::PathBuf;
10use std::sync::Mutex;
11
12use git::GitContext;
13
14type Migration = (&'static str, &'static str);
15
16// Keep local index/cache schema aligned with remote/session schema migrations.
17const REMOTE_MIGRATIONS: &[Migration] = &[
18    ("0001_schema", include_str!("../migrations/0001_schema.sql")),
19    (
20        "0002_team_invite_keys",
21        include_str!("../migrations/0002_team_invite_keys.sql"),
22    ),
23    (
24        "0003_max_active_agents",
25        include_str!("../migrations/0003_max_active_agents.sql"),
26    ),
27    (
28        "0004_oauth_states_provider",
29        include_str!("../migrations/0004_oauth_states_provider.sql"),
30    ),
31    (
32        "0005_sessions_body_url_backfill",
33        include_str!("../migrations/0005_sessions_body_url_backfill.sql"),
34    ),
35    (
36        "0006_sessions_remove_fk_constraints",
37        include_str!("../migrations/0006_sessions_remove_fk_constraints.sql"),
38    ),
39    (
40        "0007_sessions_list_perf_indexes",
41        include_str!("../migrations/0007_sessions_list_perf_indexes.sql"),
42    ),
43    (
44        "0008_teams_force_public",
45        include_str!("../migrations/0008_teams_force_public.sql"),
46    ),
47    (
48        "0009_session_score_plugin",
49        include_str!("../migrations/0009_session_score_plugin.sql"),
50    ),
51    (
52        "0010_api_keys_issuance",
53        include_str!("../migrations/0010_api_keys_issuance.sql"),
54    ),
55];
56
57const LOCAL_MIGRATIONS: &[Migration] = &[
58    (
59        "local_0001_schema",
60        include_str!("../migrations/local_0001_schema.sql"),
61    ),
62    (
63        "local_0002_drop_unused_local_sessions",
64        include_str!("../migrations/local_0002_drop_unused_local_sessions.sql"),
65    ),
66    (
67        "local_0003_timeline_summary_cache",
68        include_str!("../migrations/local_0003_timeline_summary_cache.sql"),
69    ),
70];
71
72/// A local session row stored in the local SQLite index/cache database.
73#[derive(Debug, Clone)]
74pub struct LocalSessionRow {
75    pub id: String,
76    pub source_path: Option<String>,
77    pub sync_status: String,
78    pub last_synced_at: Option<String>,
79    pub user_id: Option<String>,
80    pub nickname: Option<String>,
81    pub team_id: Option<String>,
82    pub tool: String,
83    pub agent_provider: Option<String>,
84    pub agent_model: Option<String>,
85    pub title: Option<String>,
86    pub description: Option<String>,
87    pub tags: Option<String>,
88    pub created_at: String,
89    pub uploaded_at: Option<String>,
90    pub message_count: i64,
91    pub user_message_count: i64,
92    pub task_count: i64,
93    pub event_count: i64,
94    pub duration_seconds: i64,
95    pub total_input_tokens: i64,
96    pub total_output_tokens: i64,
97    pub git_remote: Option<String>,
98    pub git_branch: Option<String>,
99    pub git_commit: Option<String>,
100    pub git_repo_name: Option<String>,
101    pub pr_number: Option<i64>,
102    pub pr_url: Option<String>,
103    pub working_directory: Option<String>,
104    pub files_modified: Option<String>,
105    pub files_read: Option<String>,
106    pub has_errors: bool,
107    pub max_active_agents: i64,
108}
109
110/// A link between a git commit and an AI session.
111#[derive(Debug, Clone)]
112pub struct CommitLink {
113    pub commit_hash: String,
114    pub session_id: String,
115    pub repo_path: Option<String>,
116    pub branch: Option<String>,
117    pub created_at: String,
118}
119
120/// A cached timeline summary row stored in the local index/cache DB.
121#[derive(Debug, Clone)]
122pub struct TimelineSummaryCacheRow {
123    pub lookup_key: String,
124    pub namespace: String,
125    pub compact: String,
126    pub payload: String,
127    pub raw: String,
128    pub cached_at: String,
129}
130
131/// Return true when a cached row corresponds to an OpenCode child session.
132pub fn is_opencode_child_session(row: &LocalSessionRow) -> bool {
133    if row.tool != "opencode" {
134        return false;
135    }
136
137    // Strong heuristic: some orchestration artifacts are generated as sessions
138    // that do not carry a user-visible request surface and only contain system
139    // or agent-only events.
140    if row.user_message_count <= 0
141        && row.message_count <= 4
142        && row.task_count <= 4
143        && row.event_count > 0
144        && row.event_count <= 16
145    {
146        return true;
147    }
148
149    if is_opencode_subagent_source(row.source_path.as_deref()) {
150        return true;
151    }
152
153    let source_path = match row.source_path.as_deref() {
154        Some(path) if !path.trim().is_empty() => path,
155        _ => return false,
156    };
157
158    parse_opencode_parent_session_id(source_path)
159        .is_some_and(|parent_id| !parent_id.trim().is_empty())
160}
161
162/// Parse `parentID` / `parentId` from an OpenCode session JSON file.
163pub fn parse_opencode_parent_session_id(source_path: &str) -> Option<String> {
164    let text = fs::read_to_string(source_path).ok()?;
165    let json: Value = serde_json::from_str(&text).ok()?;
166    lookup_parent_session_id(&json)
167}
168
169fn lookup_parent_session_id(value: &Value) -> Option<String> {
170    match value {
171        Value::Object(obj) => {
172            for (key, value) in obj {
173                if is_parent_id_key(key) {
174                    if let Some(parent_id) = value.as_str() {
175                        let parent_id = parent_id.trim();
176                        if !parent_id.is_empty() {
177                            return Some(parent_id.to_string());
178                        }
179                    }
180                }
181                if let Some(parent_id) = lookup_parent_session_id(value) {
182                    return Some(parent_id);
183                }
184            }
185            None
186        }
187        Value::Array(items) => items.iter().find_map(lookup_parent_session_id),
188        _ => None,
189    }
190}
191
192fn is_parent_id_key(key: &str) -> bool {
193    let flat = key
194        .chars()
195        .filter(|c| c.is_ascii_alphanumeric())
196        .map(|c| c.to_ascii_lowercase())
197        .collect::<String>();
198
199    flat == "parentid"
200        || flat == "parentuuid"
201        || flat == "parentsessionid"
202        || flat == "parentsessionuuid"
203        || flat.ends_with("parentsessionid")
204        || (flat.contains("parent") && flat.ends_with("id"))
205        || (flat.contains("parent") && flat.ends_with("uuid"))
206}
207
208/// Remove OpenCode child sessions so only parent sessions remain visible.
209pub fn hide_opencode_child_sessions(mut rows: Vec<LocalSessionRow>) -> Vec<LocalSessionRow> {
210    rows.retain(|row| !is_opencode_child_session(row) && !is_claude_subagent_session(row));
211    rows
212}
213
214fn is_opencode_subagent_source(source_path: Option<&str>) -> bool {
215    is_subagent_source(source_path)
216}
217
218fn is_claude_subagent_session(row: &LocalSessionRow) -> bool {
219    if row.tool != "claude-code" {
220        return false;
221    }
222
223    is_subagent_source(row.source_path.as_deref())
224}
225
226fn is_subagent_source(source_path: Option<&str>) -> bool {
227    let Some(source_path) = source_path.map(|path| path.to_ascii_lowercase()) else {
228        return false;
229    };
230
231    if source_path.contains("/subagents/") || source_path.contains("\\subagents\\") {
232        return true;
233    }
234
235    let filename = match std::path::Path::new(&source_path).file_name() {
236        Some(name) => name.to_string_lossy(),
237        None => return false,
238    };
239
240    filename.starts_with("agent-")
241        || filename.starts_with("agent_")
242        || filename.starts_with("subagent-")
243        || filename.starts_with("subagent_")
244}
245
246/// Filter for listing sessions from the local DB.
247#[derive(Debug, Clone)]
248pub struct LocalSessionFilter {
249    pub team_id: Option<String>,
250    pub sync_status: Option<String>,
251    pub git_repo_name: Option<String>,
252    pub search: Option<String>,
253    pub tool: Option<String>,
254    pub sort: LocalSortOrder,
255    pub time_range: LocalTimeRange,
256    pub limit: Option<u32>,
257    pub offset: Option<u32>,
258}
259
260impl Default for LocalSessionFilter {
261    fn default() -> Self {
262        Self {
263            team_id: None,
264            sync_status: None,
265            git_repo_name: None,
266            search: None,
267            tool: None,
268            sort: LocalSortOrder::Recent,
269            time_range: LocalTimeRange::All,
270            limit: None,
271            offset: None,
272        }
273    }
274}
275
276/// Sort order for local session listing.
277#[derive(Debug, Clone, Default, PartialEq, Eq)]
278pub enum LocalSortOrder {
279    #[default]
280    Recent,
281    Popular,
282    Longest,
283}
284
285/// Time range filter for local session listing.
286#[derive(Debug, Clone, Default, PartialEq, Eq)]
287pub enum LocalTimeRange {
288    Hours24,
289    Days7,
290    Days30,
291    #[default]
292    All,
293}
294
295/// Minimal remote session payload needed for local index/cache upsert.
296#[derive(Debug, Clone)]
297pub struct RemoteSessionSummary {
298    pub id: String,
299    pub user_id: Option<String>,
300    pub nickname: Option<String>,
301    pub team_id: String,
302    pub tool: String,
303    pub agent_provider: Option<String>,
304    pub agent_model: Option<String>,
305    pub title: Option<String>,
306    pub description: Option<String>,
307    pub tags: Option<String>,
308    pub created_at: String,
309    pub uploaded_at: String,
310    pub message_count: i64,
311    pub task_count: i64,
312    pub event_count: i64,
313    pub duration_seconds: i64,
314    pub total_input_tokens: i64,
315    pub total_output_tokens: i64,
316    pub git_remote: Option<String>,
317    pub git_branch: Option<String>,
318    pub git_commit: Option<String>,
319    pub git_repo_name: Option<String>,
320    pub pr_number: Option<i64>,
321    pub pr_url: Option<String>,
322    pub working_directory: Option<String>,
323    pub files_modified: Option<String>,
324    pub files_read: Option<String>,
325    pub has_errors: bool,
326    pub max_active_agents: i64,
327}
328
329/// Extended filter for the `log` command.
330#[derive(Debug, Default)]
331pub struct LogFilter {
332    /// Filter by tool name (exact match).
333    pub tool: Option<String>,
334    /// Filter by model (glob-like, uses LIKE).
335    pub model: Option<String>,
336    /// Filter sessions created after this ISO8601 timestamp.
337    pub since: Option<String>,
338    /// Filter sessions created before this ISO8601 timestamp.
339    pub before: Option<String>,
340    /// Filter sessions that touched this file path (searches files_modified JSON).
341    pub touches: Option<String>,
342    /// Free-text search in title, description, tags.
343    pub grep: Option<String>,
344    /// Only sessions with errors.
345    pub has_errors: Option<bool>,
346    /// Filter by working directory (prefix match).
347    pub working_directory: Option<String>,
348    /// Filter by git repo name.
349    pub git_repo_name: Option<String>,
350    /// Filter sessions linked to this git commit hash.
351    pub commit: Option<String>,
352    /// Maximum number of results.
353    pub limit: Option<u32>,
354    /// Offset for pagination.
355    pub offset: Option<u32>,
356}
357
358/// Base FROM clause for session list queries.
359const FROM_CLAUSE: &str = "\
360FROM sessions s \
361LEFT JOIN session_sync ss ON ss.session_id = s.id \
362LEFT JOIN users u ON u.id = s.user_id";
363
364/// Local SQLite index/cache shared by TUI and Daemon.
365/// This is not the source of truth for canonical session bodies.
366/// Thread-safe: wraps the connection in a Mutex so it can be shared via `Arc<LocalDb>`.
367pub struct LocalDb {
368    conn: Mutex<Connection>,
369}
370
371impl LocalDb {
372    /// Open (or create) the local database at the default path.
373    /// `~/.local/share/opensession/local.db`
374    pub fn open() -> Result<Self> {
375        let path = default_db_path()?;
376        Self::open_path(&path)
377    }
378
379    /// Open (or create) the local database at a specific path.
380    pub fn open_path(path: &PathBuf) -> Result<Self> {
381        if let Some(parent) = path.parent() {
382            std::fs::create_dir_all(parent)
383                .with_context(|| format!("create dir for {}", path.display()))?;
384        }
385        match open_connection_with_latest_schema(path) {
386            Ok(conn) => Ok(Self {
387                conn: Mutex::new(conn),
388            }),
389            Err(err) => {
390                if !is_schema_compat_error(&err) {
391                    return Err(err);
392                }
393
394                // Local DB is a cache. If schema migration cannot safely reconcile
395                // an incompatible/corrupted file, rotate it out and recreate latest schema.
396                rotate_legacy_db(path)?;
397
398                let conn = open_connection_with_latest_schema(path)
399                    .with_context(|| format!("recreate db {}", path.display()))?;
400                Ok(Self {
401                    conn: Mutex::new(conn),
402                })
403            }
404        }
405    }
406
407    fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
408        self.conn.lock().expect("local db mutex poisoned")
409    }
410
411    // ── Upsert local session (parsed from file) ────────────────────────
412
413    pub fn upsert_local_session(
414        &self,
415        session: &Session,
416        source_path: &str,
417        git: &GitContext,
418    ) -> Result<()> {
419        let title = session.context.title.as_deref();
420        let description = session.context.description.as_deref();
421        let tags = if session.context.tags.is_empty() {
422            None
423        } else {
424            Some(session.context.tags.join(","))
425        };
426        let created_at = session.context.created_at.to_rfc3339();
427        let cwd = session
428            .context
429            .attributes
430            .get("cwd")
431            .or_else(|| session.context.attributes.get("working_directory"))
432            .and_then(|v| v.as_str().map(String::from));
433
434        // Extract files_modified, files_read, and has_errors from events
435        let (files_modified, files_read, has_errors) =
436            opensession_core::extract::extract_file_metadata(session);
437        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
438
439        let conn = self.conn();
440        // NOTE: `body_storage_key` is kept only for migration/schema parity.
441        // Runtime lookup uses canonical body URLs and local body cache tables.
442        conn.execute(
443            "INSERT INTO sessions \
444             (id, team_id, tool, agent_provider, agent_model, \
445              title, description, tags, created_at, \
446              message_count, user_message_count, task_count, event_count, duration_seconds, \
447              total_input_tokens, total_output_tokens, body_storage_key, \
448              git_remote, git_branch, git_commit, git_repo_name, working_directory, \
449              files_modified, files_read, has_errors, max_active_agents) \
450             VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24) \
451             ON CONFLICT(id) DO UPDATE SET \
452              tool=excluded.tool, agent_provider=excluded.agent_provider, \
453              agent_model=excluded.agent_model, \
454              title=excluded.title, description=excluded.description, \
455              tags=excluded.tags, \
456              message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
457              task_count=excluded.task_count, \
458              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
459              total_input_tokens=excluded.total_input_tokens, \
460              total_output_tokens=excluded.total_output_tokens, \
461              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
462              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
463              working_directory=excluded.working_directory, \
464              files_modified=excluded.files_modified, files_read=excluded.files_read, \
465              has_errors=excluded.has_errors, \
466              max_active_agents=excluded.max_active_agents",
467            params![
468                &session.session_id,
469                &session.agent.tool,
470                &session.agent.provider,
471                &session.agent.model,
472                title,
473                description,
474                &tags,
475                &created_at,
476                session.stats.message_count as i64,
477                session.stats.user_message_count as i64,
478                session.stats.task_count as i64,
479                session.stats.event_count as i64,
480                session.stats.duration_seconds as i64,
481                session.stats.total_input_tokens as i64,
482                session.stats.total_output_tokens as i64,
483                &git.remote,
484                &git.branch,
485                &git.commit,
486                &git.repo_name,
487                &cwd,
488                &files_modified,
489                &files_read,
490                has_errors,
491                max_active_agents,
492            ],
493        )?;
494
495        conn.execute(
496            "INSERT INTO session_sync (session_id, source_path, sync_status) \
497             VALUES (?1, ?2, 'local_only') \
498             ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
499            params![&session.session_id, source_path],
500        )?;
501        Ok(())
502    }
503
504    // ── Upsert remote session (from server sync pull) ──────────────────
505
506    pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
507        let conn = self.conn();
508        // NOTE: `body_storage_key` is kept only for migration/schema parity.
509        // Runtime lookup uses canonical body URLs and local body cache tables.
510        conn.execute(
511            "INSERT INTO sessions \
512             (id, user_id, team_id, tool, agent_provider, agent_model, \
513              title, description, tags, created_at, uploaded_at, \
514              message_count, task_count, event_count, duration_seconds, \
515              total_input_tokens, total_output_tokens, body_storage_key, \
516              git_remote, git_branch, git_commit, git_repo_name, \
517              pr_number, pr_url, working_directory, \
518              files_modified, files_read, has_errors, max_active_agents) \
519             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28) \
520             ON CONFLICT(id) DO UPDATE SET \
521              title=excluded.title, description=excluded.description, \
522              tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
523              message_count=excluded.message_count, task_count=excluded.task_count, \
524              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
525              total_input_tokens=excluded.total_input_tokens, \
526              total_output_tokens=excluded.total_output_tokens, \
527              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
528              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
529              pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
530              working_directory=excluded.working_directory, \
531              files_modified=excluded.files_modified, files_read=excluded.files_read, \
532              has_errors=excluded.has_errors, \
533              max_active_agents=excluded.max_active_agents",
534            params![
535                &summary.id,
536                &summary.user_id,
537                &summary.team_id,
538                &summary.tool,
539                &summary.agent_provider,
540                &summary.agent_model,
541                &summary.title,
542                &summary.description,
543                &summary.tags,
544                &summary.created_at,
545                &summary.uploaded_at,
546                summary.message_count,
547                summary.task_count,
548                summary.event_count,
549                summary.duration_seconds,
550                summary.total_input_tokens,
551                summary.total_output_tokens,
552                &summary.git_remote,
553                &summary.git_branch,
554                &summary.git_commit,
555                &summary.git_repo_name,
556                summary.pr_number,
557                &summary.pr_url,
558                &summary.working_directory,
559                &summary.files_modified,
560                &summary.files_read,
561                summary.has_errors,
562                summary.max_active_agents,
563            ],
564        )?;
565
566        conn.execute(
567            "INSERT INTO session_sync (session_id, sync_status) \
568             VALUES (?1, 'remote_only') \
569             ON CONFLICT(session_id) DO UPDATE SET \
570              sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
571            params![&summary.id],
572        )?;
573        Ok(())
574    }
575
576    // ── List sessions ──────────────────────────────────────────────────
577
578    fn build_local_session_where_clause(
579        filter: &LocalSessionFilter,
580    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
581        let mut where_clauses = vec![
582            "1=1".to_string(),
583            "NOT (s.tool = 'claude-code' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
584            "NOT (s.tool = 'opencode' AND (LOWER(COALESCE(ss.source_path, '')) LIKE '%/subagents/%' OR LOWER(COALESCE(ss.source_path, '')) LIKE '%\\\\subagents\\\\%'))".to_string(),
585            "NOT (s.tool = 'opencode' AND COALESCE(s.user_message_count, 0) <= 0 AND COALESCE(s.message_count, 0) <= 4 AND COALESCE(s.task_count, 0) <= 4 AND COALESCE(s.event_count, 0) > 0 AND COALESCE(s.event_count, 0) <= 16)".to_string(),
586        ];
587        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
588        let mut idx = 1u32;
589
590        if let Some(ref team_id) = filter.team_id {
591            where_clauses.push(format!("s.team_id = ?{idx}"));
592            param_values.push(Box::new(team_id.clone()));
593            idx += 1;
594        }
595
596        if let Some(ref sync_status) = filter.sync_status {
597            where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
598            param_values.push(Box::new(sync_status.clone()));
599            idx += 1;
600        }
601
602        if let Some(ref repo) = filter.git_repo_name {
603            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
604            param_values.push(Box::new(repo.clone()));
605            idx += 1;
606        }
607
608        if let Some(ref tool) = filter.tool {
609            where_clauses.push(format!("s.tool = ?{idx}"));
610            param_values.push(Box::new(tool.clone()));
611            idx += 1;
612        }
613
614        if let Some(ref search) = filter.search {
615            let like = format!("%{search}%");
616            where_clauses.push(format!(
617                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
618                i1 = idx,
619                i2 = idx + 1,
620                i3 = idx + 2,
621            ));
622            param_values.push(Box::new(like.clone()));
623            param_values.push(Box::new(like.clone()));
624            param_values.push(Box::new(like));
625            idx += 3;
626        }
627
628        let interval = match filter.time_range {
629            LocalTimeRange::Hours24 => Some("-1 day"),
630            LocalTimeRange::Days7 => Some("-7 days"),
631            LocalTimeRange::Days30 => Some("-30 days"),
632            LocalTimeRange::All => None,
633        };
634        if let Some(interval) = interval {
635            where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
636            param_values.push(Box::new(interval.to_string()));
637        }
638
639        (where_clauses.join(" AND "), param_values)
640    }
641
642    pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
643        let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
644        let order_clause = match filter.sort {
645            LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
646            LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
647            LocalSortOrder::Recent => "s.created_at DESC",
648        };
649
650        let mut sql = format!(
651            "SELECT {LOCAL_SESSION_COLUMNS} \
652             {FROM_CLAUSE} WHERE {where_str} \
653             ORDER BY {order_clause}"
654        );
655
656        if let Some(limit) = filter.limit {
657            sql.push_str(" LIMIT ?");
658            param_values.push(Box::new(limit));
659            if let Some(offset) = filter.offset {
660                sql.push_str(" OFFSET ?");
661                param_values.push(Box::new(offset));
662            }
663        }
664
665        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
666            param_values.iter().map(|p| p.as_ref()).collect();
667        let conn = self.conn();
668        let mut stmt = conn.prepare(&sql)?;
669        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
670
671        let mut result = Vec::new();
672        for row in rows {
673            result.push(row?);
674        }
675
676        Ok(hide_opencode_child_sessions(result))
677    }
678
679    /// Count sessions for a given list filter (before UI-level page slicing).
680    pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
681        let mut count_filter = filter.clone();
682        count_filter.limit = None;
683        count_filter.offset = None;
684        let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
685        let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
686        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
687            param_values.iter().map(|p| p.as_ref()).collect();
688        let conn = self.conn();
689        let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
690        Ok(count)
691    }
692
693    /// List distinct tool names for the current list filter (ignores active tool filter).
694    pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
695        let mut tool_filter = filter.clone();
696        tool_filter.tool = None;
697        tool_filter.limit = None;
698        tool_filter.offset = None;
699        let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
700        let sql = format!(
701            "SELECT DISTINCT s.tool \
702             {FROM_CLAUSE} WHERE {where_str} \
703             ORDER BY s.tool ASC"
704        );
705        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
706            param_values.iter().map(|p| p.as_ref()).collect();
707        let conn = self.conn();
708        let mut stmt = conn.prepare(&sql)?;
709        let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
710
711        let mut tools = Vec::new();
712        for row in rows {
713            let tool = row?;
714            if !tool.trim().is_empty() {
715                tools.push(tool);
716            }
717        }
718        Ok(tools)
719    }
720
721    // ── Log query ─────────────────────────────────────────────────────
722
723    /// Query sessions with extended filters for the `log` command.
724    pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
725        let mut where_clauses = vec!["1=1".to_string()];
726        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
727        let mut idx = 1u32;
728
729        if let Some(ref tool) = filter.tool {
730            where_clauses.push(format!("s.tool = ?{idx}"));
731            param_values.push(Box::new(tool.clone()));
732            idx += 1;
733        }
734
735        if let Some(ref model) = filter.model {
736            let like = model.replace('*', "%");
737            where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
738            param_values.push(Box::new(like));
739            idx += 1;
740        }
741
742        if let Some(ref since) = filter.since {
743            where_clauses.push(format!("s.created_at >= ?{idx}"));
744            param_values.push(Box::new(since.clone()));
745            idx += 1;
746        }
747
748        if let Some(ref before) = filter.before {
749            where_clauses.push(format!("s.created_at < ?{idx}"));
750            param_values.push(Box::new(before.clone()));
751            idx += 1;
752        }
753
754        if let Some(ref touches) = filter.touches {
755            let like = format!("%\"{touches}\"%");
756            where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
757            param_values.push(Box::new(like));
758            idx += 1;
759        }
760
761        if let Some(ref grep) = filter.grep {
762            let like = format!("%{grep}%");
763            where_clauses.push(format!(
764                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
765                i1 = idx,
766                i2 = idx + 1,
767                i3 = idx + 2,
768            ));
769            param_values.push(Box::new(like.clone()));
770            param_values.push(Box::new(like.clone()));
771            param_values.push(Box::new(like));
772            idx += 3;
773        }
774
775        if let Some(true) = filter.has_errors {
776            where_clauses.push("s.has_errors = 1".to_string());
777        }
778
779        if let Some(ref wd) = filter.working_directory {
780            where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
781            param_values.push(Box::new(format!("{wd}%")));
782            idx += 1;
783        }
784
785        if let Some(ref repo) = filter.git_repo_name {
786            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
787            param_values.push(Box::new(repo.clone()));
788            idx += 1;
789        }
790
791        // Optional JOIN for commit hash filter
792        let mut extra_join = String::new();
793        if let Some(ref commit) = filter.commit {
794            extra_join =
795                " INNER JOIN commit_session_links csl ON csl.session_id = s.id".to_string();
796            where_clauses.push(format!("csl.commit_hash = ?{idx}"));
797            param_values.push(Box::new(commit.clone()));
798            idx += 1;
799        }
800
801        let _ = idx; // suppress unused warning
802
803        let where_str = where_clauses.join(" AND ");
804        let mut sql = format!(
805            "SELECT {LOCAL_SESSION_COLUMNS} \
806             {FROM_CLAUSE}{extra_join} WHERE {where_str} \
807             ORDER BY s.created_at DESC"
808        );
809
810        if let Some(limit) = filter.limit {
811            sql.push_str(" LIMIT ?");
812            param_values.push(Box::new(limit));
813            if let Some(offset) = filter.offset {
814                sql.push_str(" OFFSET ?");
815                param_values.push(Box::new(offset));
816            }
817        }
818
819        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
820            param_values.iter().map(|p| p.as_ref()).collect();
821        let conn = self.conn();
822        let mut stmt = conn.prepare(&sql)?;
823        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
824
825        let mut result = Vec::new();
826        for row in rows {
827            result.push(row?);
828        }
829        Ok(hide_opencode_child_sessions(result))
830    }
831
832    /// Get the latest N sessions for a specific tool, ordered by created_at DESC.
833    pub fn get_sessions_by_tool_latest(
834        &self,
835        tool: &str,
836        count: u32,
837    ) -> Result<Vec<LocalSessionRow>> {
838        let sql = format!(
839            "SELECT {LOCAL_SESSION_COLUMNS} \
840             {FROM_CLAUSE} WHERE s.tool = ?1 \
841             ORDER BY s.created_at DESC"
842        );
843        let conn = self.conn();
844        let mut stmt = conn.prepare(&sql)?;
845        let rows = stmt.query_map(params![tool], row_to_local_session)?;
846        let mut result = Vec::new();
847        for row in rows {
848            result.push(row?);
849        }
850
851        let mut filtered = hide_opencode_child_sessions(result);
852        filtered.truncate(count as usize);
853        Ok(filtered)
854    }
855
856    /// Get the latest N sessions across all tools, ordered by created_at DESC.
857    pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
858        let sql = format!(
859            "SELECT {LOCAL_SESSION_COLUMNS} \
860             {FROM_CLAUSE} \
861             ORDER BY s.created_at DESC"
862        );
863        let conn = self.conn();
864        let mut stmt = conn.prepare(&sql)?;
865        let rows = stmt.query_map([], row_to_local_session)?;
866        let mut result = Vec::new();
867        for row in rows {
868            result.push(row?);
869        }
870
871        let mut filtered = hide_opencode_child_sessions(result);
872        filtered.truncate(count as usize);
873        Ok(filtered)
874    }
875
876    /// Get the Nth most recent session for a specific tool (0 = HEAD, 1 = HEAD~1, etc.).
877    pub fn get_session_by_tool_offset(
878        &self,
879        tool: &str,
880        offset: u32,
881    ) -> Result<Option<LocalSessionRow>> {
882        let sql = format!(
883            "SELECT {LOCAL_SESSION_COLUMNS} \
884             {FROM_CLAUSE} WHERE s.tool = ?1 \
885             ORDER BY s.created_at DESC"
886        );
887        let conn = self.conn();
888        let mut stmt = conn.prepare(&sql)?;
889        let rows = stmt.query_map(params![tool], row_to_local_session)?;
890        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
891        Ok(filtered.into_iter().nth(offset as usize))
892    }
893
894    /// Get the Nth most recent session across all tools (0 = HEAD, 1 = HEAD~1, etc.).
895    pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
896        let sql = format!(
897            "SELECT {LOCAL_SESSION_COLUMNS} \
898             {FROM_CLAUSE} \
899             ORDER BY s.created_at DESC"
900        );
901        let conn = self.conn();
902        let mut stmt = conn.prepare(&sql)?;
903        let rows = stmt.query_map([], row_to_local_session)?;
904        let filtered = hide_opencode_child_sessions(rows.collect::<Result<Vec<_>, _>>()?);
905        Ok(filtered.into_iter().nth(offset as usize))
906    }
907
908    /// Fetch the source path used when the session was last parsed/loaded.
909    pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
910        let conn = self.conn();
911        let result = conn
912            .query_row(
913                "SELECT source_path FROM session_sync WHERE session_id = ?1",
914                params![session_id],
915                |row| row.get(0),
916            )
917            .optional()?;
918
919        Ok(result)
920    }
921
922    /// Count total sessions in the local DB.
923    pub fn session_count(&self) -> Result<i64> {
924        let count = self
925            .conn()
926            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
927        Ok(count)
928    }
929
930    // ── Delete session ─────────────────────────────────────────────────
931
932    pub fn delete_session(&self, session_id: &str) -> Result<()> {
933        let conn = self.conn();
934        conn.execute(
935            "DELETE FROM body_cache WHERE session_id = ?1",
936            params![session_id],
937        )?;
938        conn.execute(
939            "DELETE FROM session_sync WHERE session_id = ?1",
940            params![session_id],
941        )?;
942        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
943        Ok(())
944    }
945
946    // ── Sync cursor ────────────────────────────────────────────────────
947
948    pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
949        let cursor = self
950            .conn()
951            .query_row(
952                "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
953                params![team_id],
954                |row| row.get(0),
955            )
956            .optional()?;
957        Ok(cursor)
958    }
959
960    pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
961        self.conn().execute(
962            "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
963             VALUES (?1, ?2, datetime('now')) \
964             ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
965            params![team_id, cursor],
966        )?;
967        Ok(())
968    }
969
970    // ── Upload tracking ────────────────────────────────────────────────
971
972    /// Get sessions that are local_only and need to be uploaded.
973    pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
974        let sql = format!(
975            "SELECT {LOCAL_SESSION_COLUMNS} \
976             FROM sessions s \
977             INNER JOIN session_sync ss ON ss.session_id = s.id \
978             LEFT JOIN users u ON u.id = s.user_id \
979             WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 \
980             ORDER BY s.created_at ASC"
981        );
982        let conn = self.conn();
983        let mut stmt = conn.prepare(&sql)?;
984        let rows = stmt.query_map(params![team_id], row_to_local_session)?;
985        let mut result = Vec::new();
986        for row in rows {
987            result.push(row?);
988        }
989        Ok(result)
990    }
991
992    pub fn mark_synced(&self, session_id: &str) -> Result<()> {
993        self.conn().execute(
994            "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
995             WHERE session_id = ?1",
996            params![session_id],
997        )?;
998        Ok(())
999    }
1000
1001    /// Check if a session was already uploaded (synced or remote_only) since the given modification time.
1002    pub fn was_uploaded_after(
1003        &self,
1004        source_path: &str,
1005        modified: &chrono::DateTime<chrono::Utc>,
1006    ) -> Result<bool> {
1007        let result: Option<String> = self
1008            .conn()
1009            .query_row(
1010                "SELECT last_synced_at FROM session_sync \
1011                 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1012                params![source_path],
1013                |row| row.get(0),
1014            )
1015            .optional()?;
1016
1017        if let Some(synced_at) = result {
1018            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1019                return Ok(dt >= *modified);
1020            }
1021        }
1022        Ok(false)
1023    }
1024
1025    // ── Body cache (local read acceleration) ───────────────────────────
1026
1027    pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1028        self.conn().execute(
1029            "INSERT INTO body_cache (session_id, body, cached_at) \
1030             VALUES (?1, ?2, datetime('now')) \
1031             ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1032            params![session_id, body],
1033        )?;
1034        Ok(())
1035    }
1036
1037    pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1038        let body = self
1039            .conn()
1040            .query_row(
1041                "SELECT body FROM body_cache WHERE session_id = ?1",
1042                params![session_id],
1043                |row| row.get(0),
1044            )
1045            .optional()?;
1046        Ok(body)
1047    }
1048
1049    // ── Timeline summary cache ────────────────────────────────────────
1050
1051    pub fn upsert_timeline_summary_cache(
1052        &self,
1053        lookup_key: &str,
1054        namespace: &str,
1055        compact: &str,
1056        payload: &str,
1057        raw: &str,
1058    ) -> Result<()> {
1059        self.conn().execute(
1060            "INSERT INTO timeline_summary_cache \
1061             (lookup_key, namespace, compact, payload, raw, cached_at) \
1062             VALUES (?1, ?2, ?3, ?4, ?5, datetime('now')) \
1063             ON CONFLICT(lookup_key) DO UPDATE SET \
1064               namespace = excluded.namespace, \
1065               compact = excluded.compact, \
1066               payload = excluded.payload, \
1067               raw = excluded.raw, \
1068               cached_at = datetime('now')",
1069            params![lookup_key, namespace, compact, payload, raw],
1070        )?;
1071        Ok(())
1072    }
1073
1074    pub fn list_timeline_summary_cache_by_namespace(
1075        &self,
1076        namespace: &str,
1077    ) -> Result<Vec<TimelineSummaryCacheRow>> {
1078        let conn = self.conn();
1079        let mut stmt = conn.prepare(
1080            "SELECT lookup_key, namespace, compact, payload, raw, cached_at \
1081             FROM timeline_summary_cache \
1082             WHERE namespace = ?1 \
1083             ORDER BY cached_at DESC",
1084        )?;
1085        let rows = stmt.query_map(params![namespace], |row| {
1086            Ok(TimelineSummaryCacheRow {
1087                lookup_key: row.get(0)?,
1088                namespace: row.get(1)?,
1089                compact: row.get(2)?,
1090                payload: row.get(3)?,
1091                raw: row.get(4)?,
1092                cached_at: row.get(5)?,
1093            })
1094        })?;
1095
1096        let mut out = Vec::new();
1097        for row in rows {
1098            out.push(row?);
1099        }
1100        Ok(out)
1101    }
1102
1103    pub fn clear_timeline_summary_cache(&self) -> Result<usize> {
1104        let affected = self
1105            .conn()
1106            .execute("DELETE FROM timeline_summary_cache", [])?;
1107        Ok(affected)
1108    }
1109
1110    // ── Migration helper ───────────────────────────────────────────────
1111
1112    /// Migrate entries from the old state.json UploadState into the local DB.
1113    /// Marks them as `synced` with no metadata (we only know the file path was uploaded).
1114    pub fn migrate_from_state_json(
1115        &self,
1116        uploaded: &std::collections::HashMap<String, chrono::DateTime<chrono::Utc>>,
1117    ) -> Result<usize> {
1118        let mut count = 0;
1119        for (path, uploaded_at) in uploaded {
1120            let exists: bool = self
1121                .conn()
1122                .query_row(
1123                    "SELECT COUNT(*) > 0 FROM session_sync WHERE source_path = ?1",
1124                    params![path],
1125                    |row| row.get(0),
1126                )
1127                .unwrap_or(false);
1128
1129            if exists {
1130                self.conn().execute(
1131                    "UPDATE session_sync SET sync_status = 'synced', last_synced_at = ?1 \
1132                     WHERE source_path = ?2 AND sync_status = 'local_only'",
1133                    params![uploaded_at.to_rfc3339(), path],
1134                )?;
1135                count += 1;
1136            }
1137        }
1138        Ok(count)
1139    }
1140
1141    // ── Commit ↔ session linking ────────────────────────────────────
1142
1143    /// Link a git commit to an AI session.
1144    pub fn link_commit_session(
1145        &self,
1146        commit_hash: &str,
1147        session_id: &str,
1148        repo_path: Option<&str>,
1149        branch: Option<&str>,
1150    ) -> Result<()> {
1151        self.conn().execute(
1152            "INSERT INTO commit_session_links (commit_hash, session_id, repo_path, branch) \
1153             VALUES (?1, ?2, ?3, ?4) \
1154             ON CONFLICT(commit_hash, session_id) DO NOTHING",
1155            params![commit_hash, session_id, repo_path, branch],
1156        )?;
1157        Ok(())
1158    }
1159
1160    /// Get all sessions linked to a git commit.
1161    pub fn get_sessions_by_commit(&self, commit_hash: &str) -> Result<Vec<LocalSessionRow>> {
1162        let sql = format!(
1163            "SELECT {LOCAL_SESSION_COLUMNS} \
1164             {FROM_CLAUSE} \
1165             INNER JOIN commit_session_links csl ON csl.session_id = s.id \
1166             WHERE csl.commit_hash = ?1 \
1167             ORDER BY s.created_at DESC"
1168        );
1169        let conn = self.conn();
1170        let mut stmt = conn.prepare(&sql)?;
1171        let rows = stmt.query_map(params![commit_hash], row_to_local_session)?;
1172        let mut result = Vec::new();
1173        for row in rows {
1174            result.push(row?);
1175        }
1176        Ok(result)
1177    }
1178
1179    /// Get all commits linked to a session.
1180    pub fn get_commits_by_session(&self, session_id: &str) -> Result<Vec<CommitLink>> {
1181        let conn = self.conn();
1182        let mut stmt = conn.prepare(
1183            "SELECT commit_hash, session_id, repo_path, branch, created_at \
1184             FROM commit_session_links WHERE session_id = ?1 \
1185             ORDER BY created_at DESC",
1186        )?;
1187        let rows = stmt.query_map(params![session_id], |row| {
1188            Ok(CommitLink {
1189                commit_hash: row.get(0)?,
1190                session_id: row.get(1)?,
1191                repo_path: row.get(2)?,
1192                branch: row.get(3)?,
1193                created_at: row.get(4)?,
1194            })
1195        })?;
1196        let mut result = Vec::new();
1197        for row in rows {
1198            result.push(row?);
1199        }
1200        Ok(result)
1201    }
1202
1203    /// Find the most recently active session for a given repo path.
1204    /// "Active" means the session's working_directory matches the repo path
1205    /// and was created within the last `since_minutes` minutes.
1206    pub fn find_active_session_for_repo(
1207        &self,
1208        repo_path: &str,
1209        since_minutes: u32,
1210    ) -> Result<Option<LocalSessionRow>> {
1211        let sql = format!(
1212            "SELECT {LOCAL_SESSION_COLUMNS} \
1213             {FROM_CLAUSE} \
1214             WHERE s.working_directory LIKE ?1 \
1215             AND s.created_at >= datetime('now', ?2) \
1216             ORDER BY s.created_at DESC LIMIT 1"
1217        );
1218        let since = format!("-{since_minutes} minutes");
1219        let like = format!("{repo_path}%");
1220        let conn = self.conn();
1221        let mut stmt = conn.prepare(&sql)?;
1222        let row = stmt
1223            .query_map(params![like, since], row_to_local_session)?
1224            .next()
1225            .transpose()?;
1226        Ok(row)
1227    }
1228
1229    /// Get all session IDs currently in the local DB.
1230    pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1231        let conn = self.conn();
1232        let mut stmt = conn
1233            .prepare("SELECT id FROM sessions")
1234            .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1235        let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1236        let mut set = std::collections::HashSet::new();
1237        if let Ok(rows) = rows {
1238            for row in rows.flatten() {
1239                set.insert(row);
1240            }
1241        }
1242        set
1243    }
1244
1245    /// Update only stats fields for an existing session (no git context re-extraction).
1246    pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1247        let title = session.context.title.as_deref();
1248        let description = session.context.description.as_deref();
1249        let (files_modified, files_read, has_errors) =
1250            opensession_core::extract::extract_file_metadata(session);
1251        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1252
1253        self.conn().execute(
1254            "UPDATE sessions SET \
1255             title=?2, description=?3, \
1256             message_count=?4, user_message_count=?5, task_count=?6, \
1257             event_count=?7, duration_seconds=?8, \
1258             total_input_tokens=?9, total_output_tokens=?10, \
1259             files_modified=?11, files_read=?12, has_errors=?13, \
1260             max_active_agents=?14 \
1261             WHERE id=?1",
1262            params![
1263                &session.session_id,
1264                title,
1265                description,
1266                session.stats.message_count as i64,
1267                session.stats.user_message_count as i64,
1268                session.stats.task_count as i64,
1269                session.stats.event_count as i64,
1270                session.stats.duration_seconds as i64,
1271                session.stats.total_input_tokens as i64,
1272                session.stats.total_output_tokens as i64,
1273                &files_modified,
1274                &files_read,
1275                has_errors,
1276                max_active_agents,
1277            ],
1278        )?;
1279        Ok(())
1280    }
1281
1282    /// Update only sync metadata path for an existing session.
1283    pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1284        self.conn().execute(
1285            "INSERT INTO session_sync (session_id, source_path) \
1286             VALUES (?1, ?2) \
1287             ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1288            params![session_id, source_path],
1289        )?;
1290        Ok(())
1291    }
1292
1293    /// Get a list of distinct git repo names present in the DB.
1294    pub fn list_repos(&self) -> Result<Vec<String>> {
1295        let conn = self.conn();
1296        let mut stmt = conn.prepare(
1297            "SELECT DISTINCT git_repo_name FROM sessions \
1298             WHERE git_repo_name IS NOT NULL ORDER BY git_repo_name ASC",
1299        )?;
1300        let rows = stmt.query_map([], |row| row.get(0))?;
1301        let mut result = Vec::new();
1302        for row in rows {
1303            result.push(row?);
1304        }
1305        Ok(result)
1306    }
1307
1308    /// Get a list of distinct, non-empty working directories present in the DB.
1309    pub fn list_working_directories(&self) -> Result<Vec<String>> {
1310        let conn = self.conn();
1311        let mut stmt = conn.prepare(
1312            "SELECT DISTINCT working_directory FROM sessions \
1313             WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1314             ORDER BY working_directory ASC",
1315        )?;
1316        let rows = stmt.query_map([], |row| row.get(0))?;
1317        let mut result = Vec::new();
1318        for row in rows {
1319            result.push(row?);
1320        }
1321        Ok(result)
1322    }
1323}
1324
1325// ── Schema backfill for existing local DB files ───────────────────────
1326
1327fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1328    let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1329    conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1330
1331    // Disable FK constraints for local DB (index/cache, not source of truth)
1332    conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1333
1334    apply_local_migrations(&conn)?;
1335
1336    // Backfill missing columns for existing local DB files where `sessions`
1337    // existed before newer fields were introduced.
1338    ensure_sessions_columns(&conn)?;
1339    validate_local_schema(&conn)?;
1340
1341    Ok(conn)
1342}
1343
1344fn apply_local_migrations(conn: &Connection) -> Result<()> {
1345    conn.execute_batch(
1346        "CREATE TABLE IF NOT EXISTS _migrations (
1347            id INTEGER PRIMARY KEY,
1348            name TEXT NOT NULL UNIQUE,
1349            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1350        );",
1351    )
1352    .context("create _migrations table for local db")?;
1353
1354    for (name, sql) in REMOTE_MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1355        let already_applied: bool = conn
1356            .query_row(
1357                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1358                [name],
1359                |row| row.get(0),
1360            )
1361            .unwrap_or(false);
1362
1363        if already_applied {
1364            continue;
1365        }
1366
1367        if let Err(e) = conn.execute_batch(sql) {
1368            let msg = e.to_string().to_ascii_lowercase();
1369            if !is_local_migration_compat_error(&msg) {
1370                return Err(e).with_context(|| format!("apply local migration {name}"));
1371            }
1372        }
1373
1374        conn.execute(
1375            "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1376            [name],
1377        )
1378        .with_context(|| format!("record local migration {name}"))?;
1379    }
1380
1381    Ok(())
1382}
1383
1384fn is_local_migration_compat_error(msg: &str) -> bool {
1385    msg.contains("duplicate column name")
1386        || msg.contains("no such column")
1387        || msg.contains("already exists")
1388}
1389
1390fn validate_local_schema(conn: &Connection) -> Result<()> {
1391    let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1392    conn.prepare(&sql)
1393        .map(|_| ())
1394        .context("validate local session schema")
1395}
1396
1397fn is_schema_compat_error(err: &anyhow::Error) -> bool {
1398    let msg = format!("{err:#}").to_ascii_lowercase();
1399    msg.contains("no such column")
1400        || msg.contains("no such table")
1401        || msg.contains("cannot add a column")
1402        || msg.contains("already exists")
1403        || msg.contains("views may not be indexed")
1404        || msg.contains("malformed database schema")
1405        || msg.contains("duplicate column name")
1406}
1407
1408fn rotate_legacy_db(path: &PathBuf) -> Result<()> {
1409    if !path.exists() {
1410        return Ok(());
1411    }
1412
1413    let ts = chrono::Utc::now().format("%Y%m%d%H%M%S");
1414    let backup_name = format!(
1415        "{}.legacy-{}.bak",
1416        path.file_name()
1417            .and_then(|n| n.to_str())
1418            .unwrap_or("local.db"),
1419        ts
1420    );
1421    let backup_path = path.with_file_name(backup_name);
1422    std::fs::rename(path, &backup_path).with_context(|| {
1423        format!(
1424            "rotate local db backup {} -> {}",
1425            path.display(),
1426            backup_path.display()
1427        )
1428    })?;
1429
1430    let wal = PathBuf::from(format!("{}-wal", path.display()));
1431    let shm = PathBuf::from(format!("{}-shm", path.display()));
1432    let _ = std::fs::remove_file(wal);
1433    let _ = std::fs::remove_file(shm);
1434    Ok(())
1435}
1436
1437const REQUIRED_SESSION_COLUMNS: &[(&str, &str)] = &[
1438    ("user_id", "TEXT"),
1439    ("team_id", "TEXT DEFAULT 'personal'"),
1440    ("tool", "TEXT DEFAULT ''"),
1441    ("agent_provider", "TEXT"),
1442    ("agent_model", "TEXT"),
1443    ("title", "TEXT"),
1444    ("description", "TEXT"),
1445    ("tags", "TEXT"),
1446    ("created_at", "TEXT DEFAULT ''"),
1447    ("uploaded_at", "TEXT DEFAULT ''"),
1448    ("message_count", "INTEGER DEFAULT 0"),
1449    ("user_message_count", "INTEGER DEFAULT 0"),
1450    ("task_count", "INTEGER DEFAULT 0"),
1451    ("event_count", "INTEGER DEFAULT 0"),
1452    ("duration_seconds", "INTEGER DEFAULT 0"),
1453    ("total_input_tokens", "INTEGER DEFAULT 0"),
1454    ("total_output_tokens", "INTEGER DEFAULT 0"),
1455    // Migration-only compatibility column from pre git-native body storage.
1456    ("body_storage_key", "TEXT DEFAULT ''"),
1457    ("body_url", "TEXT"),
1458    ("git_remote", "TEXT"),
1459    ("git_branch", "TEXT"),
1460    ("git_commit", "TEXT"),
1461    ("git_repo_name", "TEXT"),
1462    ("pr_number", "INTEGER"),
1463    ("pr_url", "TEXT"),
1464    ("working_directory", "TEXT"),
1465    ("files_modified", "TEXT"),
1466    ("files_read", "TEXT"),
1467    ("has_errors", "BOOLEAN DEFAULT 0"),
1468    ("max_active_agents", "INTEGER DEFAULT 1"),
1469];
1470
1471fn ensure_sessions_columns(conn: &Connection) -> Result<()> {
1472    let mut existing = HashSet::new();
1473    let mut stmt = conn.prepare("PRAGMA table_info(sessions)")?;
1474    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1475    for row in rows {
1476        existing.insert(row?);
1477    }
1478
1479    for (name, decl) in REQUIRED_SESSION_COLUMNS {
1480        if existing.contains(*name) {
1481            continue;
1482        }
1483        let sql = format!("ALTER TABLE sessions ADD COLUMN {name} {decl};");
1484        conn.execute_batch(&sql)
1485            .with_context(|| format!("add sessions column '{name}'"))?;
1486    }
1487
1488    Ok(())
1489}
1490
1491/// Column list for SELECT queries against sessions + session_sync + users.
1492pub const LOCAL_SESSION_COLUMNS: &str = "\
1493s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1494s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1495s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1496s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1497s.total_input_tokens, s.total_output_tokens, \
1498s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1499s.pr_number, s.pr_url, s.working_directory, \
1500s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1)";
1501
1502fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1503    Ok(LocalSessionRow {
1504        id: row.get(0)?,
1505        source_path: row.get(1)?,
1506        sync_status: row.get(2)?,
1507        last_synced_at: row.get(3)?,
1508        user_id: row.get(4)?,
1509        nickname: row.get(5)?,
1510        team_id: row.get(6)?,
1511        tool: row.get(7)?,
1512        agent_provider: row.get(8)?,
1513        agent_model: row.get(9)?,
1514        title: row.get(10)?,
1515        description: row.get(11)?,
1516        tags: row.get(12)?,
1517        created_at: row.get(13)?,
1518        uploaded_at: row.get(14)?,
1519        message_count: row.get(15)?,
1520        user_message_count: row.get(16)?,
1521        task_count: row.get(17)?,
1522        event_count: row.get(18)?,
1523        duration_seconds: row.get(19)?,
1524        total_input_tokens: row.get(20)?,
1525        total_output_tokens: row.get(21)?,
1526        git_remote: row.get(22)?,
1527        git_branch: row.get(23)?,
1528        git_commit: row.get(24)?,
1529        git_repo_name: row.get(25)?,
1530        pr_number: row.get(26)?,
1531        pr_url: row.get(27)?,
1532        working_directory: row.get(28)?,
1533        files_modified: row.get(29)?,
1534        files_read: row.get(30)?,
1535        has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1536        max_active_agents: row.get(32).unwrap_or(1),
1537    })
1538}
1539
1540fn default_db_path() -> Result<PathBuf> {
1541    let home = std::env::var("HOME")
1542        .or_else(|_| std::env::var("USERPROFILE"))
1543        .context("Could not determine home directory")?;
1544    Ok(PathBuf::from(home)
1545        .join(".local")
1546        .join("share")
1547        .join("opensession")
1548        .join("local.db"))
1549}
1550
1551#[cfg(test)]
1552mod tests {
1553    use super::*;
1554
1555    use std::collections::BTreeSet;
1556    use std::fs::{create_dir_all, write};
1557    use tempfile::tempdir;
1558
1559    fn test_db() -> LocalDb {
1560        let dir = tempdir().unwrap();
1561        let path = dir.keep().join("test.db");
1562        LocalDb::open_path(&path).unwrap()
1563    }
1564
1565    fn temp_root() -> tempfile::TempDir {
1566        tempdir().unwrap()
1567    }
1568
1569    fn make_row(id: &str, tool: &str, source_path: Option<&str>) -> LocalSessionRow {
1570        LocalSessionRow {
1571            id: id.to_string(),
1572            source_path: source_path.map(String::from),
1573            sync_status: "local_only".to_string(),
1574            last_synced_at: None,
1575            user_id: None,
1576            nickname: None,
1577            team_id: None,
1578            tool: tool.to_string(),
1579            agent_provider: None,
1580            agent_model: None,
1581            title: Some("test".to_string()),
1582            description: None,
1583            tags: None,
1584            created_at: "2024-01-01T00:00:00Z".to_string(),
1585            uploaded_at: None,
1586            message_count: 0,
1587            user_message_count: 0,
1588            task_count: 0,
1589            event_count: 0,
1590            duration_seconds: 0,
1591            total_input_tokens: 0,
1592            total_output_tokens: 0,
1593            git_remote: None,
1594            git_branch: None,
1595            git_commit: None,
1596            git_repo_name: None,
1597            pr_number: None,
1598            pr_url: None,
1599            working_directory: None,
1600            files_modified: None,
1601            files_read: None,
1602            has_errors: false,
1603            max_active_agents: 1,
1604        }
1605    }
1606
1607    #[test]
1608    fn test_open_and_schema() {
1609        let _db = test_db();
1610    }
1611
1612    #[test]
1613    fn test_open_backfills_legacy_sessions_columns() {
1614        let dir = tempfile::tempdir().unwrap();
1615        let path = dir.path().join("legacy.db");
1616        {
1617            let conn = Connection::open(&path).unwrap();
1618            conn.execute_batch(
1619                "CREATE TABLE sessions (id TEXT PRIMARY KEY);
1620                 INSERT INTO sessions (id) VALUES ('legacy-1');",
1621            )
1622            .unwrap();
1623        }
1624
1625        let db = LocalDb::open_path(&path).unwrap();
1626        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1627        assert_eq!(rows.len(), 1);
1628        assert_eq!(rows[0].id, "legacy-1");
1629        assert_eq!(rows[0].user_message_count, 0);
1630    }
1631
1632    #[test]
1633    fn test_open_rotates_incompatible_legacy_schema() {
1634        let dir = tempfile::tempdir().unwrap();
1635        let path = dir.path().join("broken.db");
1636        {
1637            let conn = Connection::open(&path).unwrap();
1638            conn.execute_batch("CREATE VIEW sessions AS SELECT 'x' AS id;")
1639                .unwrap();
1640        }
1641
1642        let db = LocalDb::open_path(&path).unwrap();
1643        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1644        assert!(rows.is_empty());
1645
1646        let rotated = std::fs::read_dir(dir.path())
1647            .unwrap()
1648            .filter_map(Result::ok)
1649            .any(|entry| {
1650                let name = entry.file_name();
1651                let name = name.to_string_lossy();
1652                name.starts_with("broken.db.legacy-") && name.ends_with(".bak")
1653            });
1654        assert!(rotated, "expected rotated legacy backup file");
1655    }
1656
1657    #[test]
1658    fn test_is_opencode_child_session() {
1659        let root = temp_root();
1660        let dir = root.path().join("sessions");
1661        create_dir_all(&dir).unwrap();
1662        let parent_session = dir.join("parent.json");
1663        write(
1664            &parent_session,
1665            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1666        )
1667        .unwrap();
1668        let child_session = dir.join("child.json");
1669        write(
1670            &child_session,
1671            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1672        )
1673        .unwrap();
1674
1675        let parent = make_row(
1676            "ses_parent",
1677            "opencode",
1678            Some(parent_session.to_str().unwrap()),
1679        );
1680        let child = make_row(
1681            "ses_child",
1682            "opencode",
1683            Some(child_session.to_str().unwrap()),
1684        );
1685        let codex = make_row("ses_other", "codex", Some(child_session.to_str().unwrap()));
1686
1687        assert!(!is_opencode_child_session(&parent));
1688        assert!(is_opencode_child_session(&child));
1689        assert!(!is_opencode_child_session(&codex));
1690    }
1691
1692    #[test]
1693    fn test_is_opencode_child_session_uses_event_shape_heuristic() {
1694        let child = LocalSessionRow {
1695            id: "sess_child".to_string(),
1696            source_path: None,
1697            sync_status: "local_only".to_string(),
1698            last_synced_at: None,
1699            user_id: None,
1700            nickname: None,
1701            team_id: None,
1702            tool: "opencode".to_string(),
1703            agent_provider: None,
1704            agent_model: None,
1705            title: None,
1706            description: None,
1707            tags: None,
1708            created_at: "2024-01-01T00:00:00Z".to_string(),
1709            uploaded_at: None,
1710            message_count: 1,
1711            user_message_count: 0,
1712            task_count: 4,
1713            event_count: 4,
1714            duration_seconds: 0,
1715            total_input_tokens: 0,
1716            total_output_tokens: 0,
1717            git_remote: None,
1718            git_branch: None,
1719            git_commit: None,
1720            git_repo_name: None,
1721            pr_number: None,
1722            pr_url: None,
1723            working_directory: None,
1724            files_modified: None,
1725            files_read: None,
1726            has_errors: false,
1727            max_active_agents: 1,
1728        };
1729
1730        let parent = LocalSessionRow {
1731            id: "sess_parent".to_string(),
1732            source_path: None,
1733            sync_status: "local_only".to_string(),
1734            last_synced_at: None,
1735            user_id: None,
1736            nickname: None,
1737            team_id: None,
1738            tool: "opencode".to_string(),
1739            agent_provider: None,
1740            agent_model: None,
1741            title: Some("regular".to_string()),
1742            description: None,
1743            tags: None,
1744            created_at: "2024-01-01T00:00:00Z".to_string(),
1745            uploaded_at: None,
1746            message_count: 1,
1747            user_message_count: 1,
1748            task_count: 2,
1749            event_count: 20,
1750            duration_seconds: 0,
1751            total_input_tokens: 0,
1752            total_output_tokens: 0,
1753            git_remote: None,
1754            git_branch: None,
1755            git_commit: None,
1756            git_repo_name: None,
1757            pr_number: None,
1758            pr_url: None,
1759            working_directory: None,
1760            files_modified: None,
1761            files_read: None,
1762            has_errors: false,
1763            max_active_agents: 1,
1764        };
1765
1766        assert!(is_opencode_child_session(&child));
1767        assert!(!is_opencode_child_session(&parent));
1768    }
1769
1770    #[test]
1771    fn test_is_opencode_child_session_with_more_messages_is_hidden_if_task_count_small() {
1772        let child = LocalSessionRow {
1773            id: "sess_child_2".to_string(),
1774            source_path: None,
1775            sync_status: "local_only".to_string(),
1776            last_synced_at: None,
1777            user_id: None,
1778            nickname: None,
1779            team_id: None,
1780            tool: "opencode".to_string(),
1781            agent_provider: None,
1782            agent_model: None,
1783            title: None,
1784            description: None,
1785            tags: None,
1786            created_at: "2024-01-01T00:00:00Z".to_string(),
1787            uploaded_at: None,
1788            message_count: 2,
1789            user_message_count: 0,
1790            task_count: 4,
1791            event_count: 4,
1792            duration_seconds: 0,
1793            total_input_tokens: 0,
1794            total_output_tokens: 0,
1795            git_remote: None,
1796            git_branch: None,
1797            git_commit: None,
1798            git_repo_name: None,
1799            pr_number: None,
1800            pr_url: None,
1801            working_directory: None,
1802            files_modified: None,
1803            files_read: None,
1804            has_errors: false,
1805            max_active_agents: 1,
1806        };
1807
1808        let parent = LocalSessionRow {
1809            id: "sess_parent".to_string(),
1810            source_path: None,
1811            sync_status: "local_only".to_string(),
1812            last_synced_at: None,
1813            user_id: None,
1814            nickname: None,
1815            team_id: None,
1816            tool: "opencode".to_string(),
1817            agent_provider: None,
1818            agent_model: None,
1819            title: Some("regular".to_string()),
1820            description: None,
1821            tags: None,
1822            created_at: "2024-01-01T00:00:00Z".to_string(),
1823            uploaded_at: None,
1824            message_count: 2,
1825            user_message_count: 1,
1826            task_count: 5,
1827            event_count: 20,
1828            duration_seconds: 0,
1829            total_input_tokens: 0,
1830            total_output_tokens: 0,
1831            git_remote: None,
1832            git_branch: None,
1833            git_commit: None,
1834            git_repo_name: None,
1835            pr_number: None,
1836            pr_url: None,
1837            working_directory: None,
1838            files_modified: None,
1839            files_read: None,
1840            has_errors: false,
1841            max_active_agents: 1,
1842        };
1843
1844        assert!(is_opencode_child_session(&child));
1845        assert!(!is_opencode_child_session(&parent));
1846    }
1847
1848    #[test]
1849    fn test_is_opencode_child_session_with_more_messages_but_few_tasks() {
1850        let child = LocalSessionRow {
1851            id: "sess_child_3".to_string(),
1852            source_path: None,
1853            sync_status: "local_only".to_string(),
1854            last_synced_at: None,
1855            user_id: None,
1856            nickname: None,
1857            team_id: None,
1858            tool: "opencode".to_string(),
1859            agent_provider: None,
1860            agent_model: None,
1861            title: None,
1862            description: None,
1863            tags: None,
1864            created_at: "2024-01-01T00:00:00Z".to_string(),
1865            uploaded_at: None,
1866            message_count: 3,
1867            user_message_count: 0,
1868            task_count: 2,
1869            event_count: 6,
1870            duration_seconds: 0,
1871            total_input_tokens: 0,
1872            total_output_tokens: 0,
1873            git_remote: None,
1874            git_branch: None,
1875            git_commit: None,
1876            git_repo_name: None,
1877            pr_number: None,
1878            pr_url: None,
1879            working_directory: None,
1880            files_modified: None,
1881            files_read: None,
1882            has_errors: false,
1883            max_active_agents: 1,
1884        };
1885
1886        assert!(is_opencode_child_session(&child));
1887    }
1888
1889    #[test]
1890    fn test_parse_opencode_parent_session_id_aliases() {
1891        let root = temp_root();
1892        let dir = root.path().join("session-aliases");
1893        create_dir_all(&dir).unwrap();
1894        let child_session = dir.join("child.json");
1895        write(
1896            &child_session,
1897            r#"{"id":"ses_child","parentUUID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1898        )
1899        .unwrap();
1900        assert_eq!(
1901            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1902            Some("ses_parent")
1903        );
1904    }
1905
1906    #[test]
1907    fn test_parse_opencode_parent_session_id_nested_metadata() {
1908        let root = temp_root();
1909        let dir = root.path().join("session-nested");
1910        create_dir_all(&dir).unwrap();
1911        let child_session = dir.join("child.json");
1912        write(
1913            &child_session,
1914            r#"{"id":"ses_child","metadata":{"links":{"parentSessionId":"ses_parent","trace":"x"}}}"#,
1915        )
1916        .unwrap();
1917        assert_eq!(
1918            parse_opencode_parent_session_id(child_session.to_str().unwrap()).as_deref(),
1919            Some("ses_parent")
1920        );
1921    }
1922
1923    #[test]
1924    fn test_is_claude_subagent_session() {
1925        let row = make_row(
1926            "ses_parent",
1927            "claude-code",
1928            Some("/Users/test/.claude/projects/foo/subagents/agent-abc.jsonl"),
1929        );
1930        assert!(!is_opencode_child_session(&row));
1931        assert!(is_claude_subagent_session(&row));
1932        assert!(hide_opencode_child_sessions(vec![row]).is_empty());
1933    }
1934
1935    #[test]
1936    fn test_hide_opencode_child_sessions() {
1937        let root = temp_root();
1938        let dir = root.path().join("sessions");
1939        create_dir_all(&dir).unwrap();
1940        let parent_session = dir.join("parent.json");
1941        let child_session = dir.join("child.json");
1942        let orphan_session = dir.join("orphan.json");
1943
1944        write(
1945            &parent_session,
1946            r#"{"id":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1947        )
1948        .unwrap();
1949        write(
1950            &child_session,
1951            r#"{"id":"ses_child","parentID":"ses_parent","time":{"created":1000,"updated":1000}}"#,
1952        )
1953        .unwrap();
1954        write(
1955            &orphan_session,
1956            r#"{"id":"ses_orphan","time":{"created":1000,"updated":1000}}"#,
1957        )
1958        .unwrap();
1959
1960        let rows = vec![
1961            make_row(
1962                "ses_child",
1963                "opencode",
1964                Some(child_session.to_str().unwrap()),
1965            ),
1966            make_row(
1967                "ses_parent",
1968                "opencode",
1969                Some(parent_session.to_str().unwrap()),
1970            ),
1971            {
1972                let mut row = make_row("ses_other", "codex", None);
1973                row.user_message_count = 1;
1974                row
1975            },
1976            make_row(
1977                "ses_orphan",
1978                "opencode",
1979                Some(orphan_session.to_str().unwrap()),
1980            ),
1981        ];
1982
1983        let filtered = hide_opencode_child_sessions(rows);
1984        assert_eq!(filtered.len(), 3);
1985        assert!(filtered.iter().all(|r| r.id != "ses_child"));
1986    }
1987
1988    #[test]
1989    fn test_sync_cursor() {
1990        let db = test_db();
1991        assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
1992        db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
1993        assert_eq!(
1994            db.get_sync_cursor("team1").unwrap(),
1995            Some("2024-01-01T00:00:00Z".to_string())
1996        );
1997        // Update
1998        db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
1999        assert_eq!(
2000            db.get_sync_cursor("team1").unwrap(),
2001            Some("2024-06-01T00:00:00Z".to_string())
2002        );
2003    }
2004
2005    #[test]
2006    fn test_body_cache() {
2007        let db = test_db();
2008        assert_eq!(db.get_cached_body("s1").unwrap(), None);
2009        db.cache_body("s1", b"hello world").unwrap();
2010        assert_eq!(
2011            db.get_cached_body("s1").unwrap(),
2012            Some(b"hello world".to_vec())
2013        );
2014    }
2015
2016    #[test]
2017    fn test_timeline_summary_cache_roundtrip() {
2018        let db = test_db();
2019        db.upsert_timeline_summary_cache(
2020            "k1",
2021            "timeline:v1",
2022            "compact text",
2023            "{\"kind\":\"turn-summary\"}",
2024            "raw text",
2025        )
2026        .unwrap();
2027
2028        let rows = db
2029            .list_timeline_summary_cache_by_namespace("timeline:v1")
2030            .unwrap();
2031        assert_eq!(rows.len(), 1);
2032        assert_eq!(rows[0].lookup_key, "k1");
2033        assert_eq!(rows[0].namespace, "timeline:v1");
2034        assert_eq!(rows[0].compact, "compact text");
2035        assert_eq!(rows[0].payload, "{\"kind\":\"turn-summary\"}");
2036        assert_eq!(rows[0].raw, "raw text");
2037
2038        let cleared = db.clear_timeline_summary_cache().unwrap();
2039        assert_eq!(cleared, 1);
2040        let rows_after = db
2041            .list_timeline_summary_cache_by_namespace("timeline:v1")
2042            .unwrap();
2043        assert!(rows_after.is_empty());
2044    }
2045
2046    #[test]
2047    fn test_local_migrations_include_timeline_summary_cache() {
2048        let db = test_db();
2049        let conn = db.conn();
2050        let applied: bool = conn
2051            .query_row(
2052                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
2053                params!["local_0003_timeline_summary_cache"],
2054                |row| row.get(0),
2055            )
2056            .unwrap();
2057        assert!(applied);
2058    }
2059
2060    #[test]
2061    fn test_local_migration_files_match_api_local_migrations() {
2062        fn collect_local_sql(dir: PathBuf) -> BTreeSet<String> {
2063            std::fs::read_dir(dir)
2064                .expect("read migrations directory")
2065                .filter_map(Result::ok)
2066                .map(|entry| entry.file_name().to_string_lossy().to_string())
2067                .filter(|name| name.starts_with("local_") && name.ends_with(".sql"))
2068                .collect()
2069        }
2070
2071        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2072        let local_files = collect_local_sql(manifest_dir.join("migrations"));
2073        let api_files = collect_local_sql(manifest_dir.join("../api/migrations"));
2074
2075        assert_eq!(
2076            local_files, api_files,
2077            "local-db local migrations must stay in parity with api local migrations"
2078        );
2079    }
2080
2081    #[test]
2082    fn test_upsert_remote_session() {
2083        let db = test_db();
2084        let summary = RemoteSessionSummary {
2085            id: "remote-1".to_string(),
2086            user_id: Some("u1".to_string()),
2087            nickname: Some("alice".to_string()),
2088            team_id: "t1".to_string(),
2089            tool: "claude-code".to_string(),
2090            agent_provider: None,
2091            agent_model: None,
2092            title: Some("Test session".to_string()),
2093            description: None,
2094            tags: None,
2095            created_at: "2024-01-01T00:00:00Z".to_string(),
2096            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2097            message_count: 10,
2098            task_count: 2,
2099            event_count: 20,
2100            duration_seconds: 300,
2101            total_input_tokens: 1000,
2102            total_output_tokens: 500,
2103            git_remote: None,
2104            git_branch: None,
2105            git_commit: None,
2106            git_repo_name: None,
2107            pr_number: None,
2108            pr_url: None,
2109            working_directory: None,
2110            files_modified: None,
2111            files_read: None,
2112            has_errors: false,
2113            max_active_agents: 1,
2114        };
2115        db.upsert_remote_session(&summary).unwrap();
2116
2117        let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2118        assert_eq!(sessions.len(), 1);
2119        assert_eq!(sessions[0].id, "remote-1");
2120        assert_eq!(sessions[0].sync_status, "remote_only");
2121        assert_eq!(sessions[0].nickname, None); // no user in local users table
2122    }
2123
2124    #[test]
2125    fn test_list_filter_by_repo() {
2126        let db = test_db();
2127        // Insert a remote session with team_id
2128        let summary1 = RemoteSessionSummary {
2129            id: "s1".to_string(),
2130            user_id: None,
2131            nickname: None,
2132            team_id: "t1".to_string(),
2133            tool: "claude-code".to_string(),
2134            agent_provider: None,
2135            agent_model: None,
2136            title: Some("Session 1".to_string()),
2137            description: None,
2138            tags: None,
2139            created_at: "2024-01-01T00:00:00Z".to_string(),
2140            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2141            message_count: 5,
2142            task_count: 0,
2143            event_count: 10,
2144            duration_seconds: 60,
2145            total_input_tokens: 100,
2146            total_output_tokens: 50,
2147            git_remote: None,
2148            git_branch: None,
2149            git_commit: None,
2150            git_repo_name: None,
2151            pr_number: None,
2152            pr_url: None,
2153            working_directory: None,
2154            files_modified: None,
2155            files_read: None,
2156            has_errors: false,
2157            max_active_agents: 1,
2158        };
2159        db.upsert_remote_session(&summary1).unwrap();
2160
2161        // Filter by team
2162        let filter = LocalSessionFilter {
2163            team_id: Some("t1".to_string()),
2164            ..Default::default()
2165        };
2166        assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2167
2168        let filter = LocalSessionFilter {
2169            team_id: Some("t999".to_string()),
2170            ..Default::default()
2171        };
2172        assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2173    }
2174
2175    // ── Helpers for inserting test sessions ────────────────────────────
2176
2177    fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2178        RemoteSessionSummary {
2179            id: id.to_string(),
2180            user_id: None,
2181            nickname: None,
2182            team_id: "t1".to_string(),
2183            tool: tool.to_string(),
2184            agent_provider: Some("anthropic".to_string()),
2185            agent_model: Some("claude-opus-4-6".to_string()),
2186            title: Some(title.to_string()),
2187            description: None,
2188            tags: None,
2189            created_at: created_at.to_string(),
2190            uploaded_at: created_at.to_string(),
2191            message_count: 5,
2192            task_count: 1,
2193            event_count: 10,
2194            duration_seconds: 300,
2195            total_input_tokens: 1000,
2196            total_output_tokens: 500,
2197            git_remote: None,
2198            git_branch: None,
2199            git_commit: None,
2200            git_repo_name: None,
2201            pr_number: None,
2202            pr_url: None,
2203            working_directory: None,
2204            files_modified: None,
2205            files_read: None,
2206            has_errors: false,
2207            max_active_agents: 1,
2208        }
2209    }
2210
2211    fn seed_sessions(db: &LocalDb) {
2212        // Insert 5 sessions across two tools, ordered by created_at
2213        db.upsert_remote_session(&make_summary(
2214            "s1",
2215            "claude-code",
2216            "First session",
2217            "2024-01-01T00:00:00Z",
2218        ))
2219        .unwrap();
2220        db.upsert_remote_session(&make_summary(
2221            "s2",
2222            "claude-code",
2223            "JWT auth work",
2224            "2024-01-02T00:00:00Z",
2225        ))
2226        .unwrap();
2227        db.upsert_remote_session(&make_summary(
2228            "s3",
2229            "gemini",
2230            "Gemini test",
2231            "2024-01-03T00:00:00Z",
2232        ))
2233        .unwrap();
2234        db.upsert_remote_session(&make_summary(
2235            "s4",
2236            "claude-code",
2237            "Error handling",
2238            "2024-01-04T00:00:00Z",
2239        ))
2240        .unwrap();
2241        db.upsert_remote_session(&make_summary(
2242            "s5",
2243            "claude-code",
2244            "Final polish",
2245            "2024-01-05T00:00:00Z",
2246        ))
2247        .unwrap();
2248    }
2249
2250    // ── list_sessions_log tests ────────────────────────────────────────
2251
2252    #[test]
2253    fn test_log_no_filters() {
2254        let db = test_db();
2255        seed_sessions(&db);
2256        let filter = LogFilter::default();
2257        let results = db.list_sessions_log(&filter).unwrap();
2258        assert_eq!(results.len(), 5);
2259        // Should be ordered by created_at DESC
2260        assert_eq!(results[0].id, "s5");
2261        assert_eq!(results[4].id, "s1");
2262    }
2263
2264    #[test]
2265    fn test_log_filter_by_tool() {
2266        let db = test_db();
2267        seed_sessions(&db);
2268        let filter = LogFilter {
2269            tool: Some("claude-code".to_string()),
2270            ..Default::default()
2271        };
2272        let results = db.list_sessions_log(&filter).unwrap();
2273        assert_eq!(results.len(), 4);
2274        assert!(results.iter().all(|s| s.tool == "claude-code"));
2275    }
2276
2277    #[test]
2278    fn test_log_filter_by_model_wildcard() {
2279        let db = test_db();
2280        seed_sessions(&db);
2281        let filter = LogFilter {
2282            model: Some("claude*".to_string()),
2283            ..Default::default()
2284        };
2285        let results = db.list_sessions_log(&filter).unwrap();
2286        assert_eq!(results.len(), 5); // all have claude-opus model
2287    }
2288
2289    #[test]
2290    fn test_log_filter_since() {
2291        let db = test_db();
2292        seed_sessions(&db);
2293        let filter = LogFilter {
2294            since: Some("2024-01-03T00:00:00Z".to_string()),
2295            ..Default::default()
2296        };
2297        let results = db.list_sessions_log(&filter).unwrap();
2298        assert_eq!(results.len(), 3); // s3, s4, s5
2299    }
2300
2301    #[test]
2302    fn test_log_filter_before() {
2303        let db = test_db();
2304        seed_sessions(&db);
2305        let filter = LogFilter {
2306            before: Some("2024-01-03T00:00:00Z".to_string()),
2307            ..Default::default()
2308        };
2309        let results = db.list_sessions_log(&filter).unwrap();
2310        assert_eq!(results.len(), 2); // s1, s2
2311    }
2312
2313    #[test]
2314    fn test_log_filter_since_and_before() {
2315        let db = test_db();
2316        seed_sessions(&db);
2317        let filter = LogFilter {
2318            since: Some("2024-01-02T00:00:00Z".to_string()),
2319            before: Some("2024-01-04T00:00:00Z".to_string()),
2320            ..Default::default()
2321        };
2322        let results = db.list_sessions_log(&filter).unwrap();
2323        assert_eq!(results.len(), 2); // s2, s3
2324    }
2325
2326    #[test]
2327    fn test_log_filter_grep() {
2328        let db = test_db();
2329        seed_sessions(&db);
2330        let filter = LogFilter {
2331            grep: Some("JWT".to_string()),
2332            ..Default::default()
2333        };
2334        let results = db.list_sessions_log(&filter).unwrap();
2335        assert_eq!(results.len(), 1);
2336        assert_eq!(results[0].id, "s2");
2337    }
2338
2339    #[test]
2340    fn test_log_limit_and_offset() {
2341        let db = test_db();
2342        seed_sessions(&db);
2343        let filter = LogFilter {
2344            limit: Some(2),
2345            offset: Some(1),
2346            ..Default::default()
2347        };
2348        let results = db.list_sessions_log(&filter).unwrap();
2349        assert_eq!(results.len(), 2);
2350        assert_eq!(results[0].id, "s4"); // second most recent
2351        assert_eq!(results[1].id, "s3");
2352    }
2353
2354    #[test]
2355    fn test_log_limit_only() {
2356        let db = test_db();
2357        seed_sessions(&db);
2358        let filter = LogFilter {
2359            limit: Some(3),
2360            ..Default::default()
2361        };
2362        let results = db.list_sessions_log(&filter).unwrap();
2363        assert_eq!(results.len(), 3);
2364    }
2365
2366    #[test]
2367    fn test_list_sessions_limit_offset() {
2368        let db = test_db();
2369        seed_sessions(&db);
2370        let filter = LocalSessionFilter {
2371            limit: Some(2),
2372            offset: Some(1),
2373            ..Default::default()
2374        };
2375        let results = db.list_sessions(&filter).unwrap();
2376        assert_eq!(results.len(), 2);
2377        assert_eq!(results[0].id, "s4");
2378        assert_eq!(results[1].id, "s3");
2379    }
2380
2381    #[test]
2382    fn test_count_sessions_filtered() {
2383        let db = test_db();
2384        seed_sessions(&db);
2385        let count = db
2386            .count_sessions_filtered(&LocalSessionFilter::default())
2387            .unwrap();
2388        assert_eq!(count, 5);
2389    }
2390
2391    #[test]
2392    fn test_list_working_directories_distinct_non_empty() {
2393        let db = test_db();
2394
2395        let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2396        a.working_directory = Some("/tmp/repo-a".to_string());
2397        let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2398        b.working_directory = Some("/tmp/repo-a".to_string());
2399        let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2400        c.working_directory = Some("/tmp/repo-b".to_string());
2401        let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2402        d.working_directory = Some("".to_string());
2403
2404        db.upsert_remote_session(&a).unwrap();
2405        db.upsert_remote_session(&b).unwrap();
2406        db.upsert_remote_session(&c).unwrap();
2407        db.upsert_remote_session(&d).unwrap();
2408
2409        let dirs = db.list_working_directories().unwrap();
2410        assert_eq!(
2411            dirs,
2412            vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2413        );
2414    }
2415
2416    #[test]
2417    fn test_list_session_tools() {
2418        let db = test_db();
2419        seed_sessions(&db);
2420        let tools = db
2421            .list_session_tools(&LocalSessionFilter::default())
2422            .unwrap();
2423        assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2424    }
2425
2426    #[test]
2427    fn test_log_combined_filters() {
2428        let db = test_db();
2429        seed_sessions(&db);
2430        let filter = LogFilter {
2431            tool: Some("claude-code".to_string()),
2432            since: Some("2024-01-03T00:00:00Z".to_string()),
2433            limit: Some(1),
2434            ..Default::default()
2435        };
2436        let results = db.list_sessions_log(&filter).unwrap();
2437        assert_eq!(results.len(), 1);
2438        assert_eq!(results[0].id, "s5"); // most recent claude-code after Jan 3
2439    }
2440
2441    // ── Session offset/latest tests ────────────────────────────────────
2442
2443    #[test]
2444    fn test_get_session_by_offset() {
2445        let db = test_db();
2446        seed_sessions(&db);
2447        let row = db.get_session_by_offset(0).unwrap().unwrap();
2448        assert_eq!(row.id, "s5"); // most recent
2449        let row = db.get_session_by_offset(2).unwrap().unwrap();
2450        assert_eq!(row.id, "s3");
2451        assert!(db.get_session_by_offset(10).unwrap().is_none());
2452    }
2453
2454    #[test]
2455    fn test_get_session_by_tool_offset() {
2456        let db = test_db();
2457        seed_sessions(&db);
2458        let row = db
2459            .get_session_by_tool_offset("claude-code", 0)
2460            .unwrap()
2461            .unwrap();
2462        assert_eq!(row.id, "s5");
2463        let row = db
2464            .get_session_by_tool_offset("claude-code", 1)
2465            .unwrap()
2466            .unwrap();
2467        assert_eq!(row.id, "s4");
2468        let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
2469        assert_eq!(row.id, "s3");
2470        assert!(db
2471            .get_session_by_tool_offset("gemini", 1)
2472            .unwrap()
2473            .is_none());
2474    }
2475
2476    #[test]
2477    fn test_get_sessions_latest() {
2478        let db = test_db();
2479        seed_sessions(&db);
2480        let rows = db.get_sessions_latest(3).unwrap();
2481        assert_eq!(rows.len(), 3);
2482        assert_eq!(rows[0].id, "s5");
2483        assert_eq!(rows[1].id, "s4");
2484        assert_eq!(rows[2].id, "s3");
2485    }
2486
2487    #[test]
2488    fn test_get_sessions_by_tool_latest() {
2489        let db = test_db();
2490        seed_sessions(&db);
2491        let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
2492        assert_eq!(rows.len(), 2);
2493        assert_eq!(rows[0].id, "s5");
2494        assert_eq!(rows[1].id, "s4");
2495    }
2496
2497    #[test]
2498    fn test_get_sessions_latest_more_than_available() {
2499        let db = test_db();
2500        seed_sessions(&db);
2501        let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
2502        assert_eq!(rows.len(), 1); // only 1 gemini session
2503    }
2504
2505    #[test]
2506    fn test_session_count() {
2507        let db = test_db();
2508        assert_eq!(db.session_count().unwrap(), 0);
2509        seed_sessions(&db);
2510        assert_eq!(db.session_count().unwrap(), 5);
2511    }
2512
2513    // ── Commit link tests ─────────────────────────────────────────────
2514
2515    #[test]
2516    fn test_link_commit_session() {
2517        let db = test_db();
2518        seed_sessions(&db);
2519        db.link_commit_session("abc123", "s1", Some("/tmp/repo"), Some("main"))
2520            .unwrap();
2521
2522        let commits = db.get_commits_by_session("s1").unwrap();
2523        assert_eq!(commits.len(), 1);
2524        assert_eq!(commits[0].commit_hash, "abc123");
2525        assert_eq!(commits[0].session_id, "s1");
2526        assert_eq!(commits[0].repo_path.as_deref(), Some("/tmp/repo"));
2527        assert_eq!(commits[0].branch.as_deref(), Some("main"));
2528
2529        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2530        assert_eq!(sessions.len(), 1);
2531        assert_eq!(sessions[0].id, "s1");
2532    }
2533
2534    #[test]
2535    fn test_get_sessions_by_commit() {
2536        let db = test_db();
2537        seed_sessions(&db);
2538        // Link multiple sessions to the same commit
2539        db.link_commit_session("abc123", "s1", None, None).unwrap();
2540        db.link_commit_session("abc123", "s2", None, None).unwrap();
2541        db.link_commit_session("abc123", "s3", None, None).unwrap();
2542
2543        let sessions = db.get_sessions_by_commit("abc123").unwrap();
2544        assert_eq!(sessions.len(), 3);
2545        // Ordered by created_at DESC
2546        assert_eq!(sessions[0].id, "s3");
2547        assert_eq!(sessions[1].id, "s2");
2548        assert_eq!(sessions[2].id, "s1");
2549    }
2550
2551    #[test]
2552    fn test_get_commits_by_session() {
2553        let db = test_db();
2554        seed_sessions(&db);
2555        // Link multiple commits to the same session
2556        db.link_commit_session("aaa111", "s1", Some("/repo"), Some("main"))
2557            .unwrap();
2558        db.link_commit_session("bbb222", "s1", Some("/repo"), Some("main"))
2559            .unwrap();
2560        db.link_commit_session("ccc333", "s1", Some("/repo"), Some("feat"))
2561            .unwrap();
2562
2563        let commits = db.get_commits_by_session("s1").unwrap();
2564        assert_eq!(commits.len(), 3);
2565        // All linked to s1
2566        assert!(commits.iter().all(|c| c.session_id == "s1"));
2567    }
2568
2569    #[test]
2570    fn test_duplicate_link_ignored() {
2571        let db = test_db();
2572        seed_sessions(&db);
2573        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2574            .unwrap();
2575        // Inserting the same link again should not error
2576        db.link_commit_session("abc123", "s1", Some("/repo"), Some("main"))
2577            .unwrap();
2578
2579        let commits = db.get_commits_by_session("s1").unwrap();
2580        assert_eq!(commits.len(), 1);
2581    }
2582
2583    #[test]
2584    fn test_log_filter_by_commit() {
2585        let db = test_db();
2586        seed_sessions(&db);
2587        db.link_commit_session("abc123", "s2", None, None).unwrap();
2588        db.link_commit_session("abc123", "s4", None, None).unwrap();
2589
2590        let filter = LogFilter {
2591            commit: Some("abc123".to_string()),
2592            ..Default::default()
2593        };
2594        let results = db.list_sessions_log(&filter).unwrap();
2595        assert_eq!(results.len(), 2);
2596        assert_eq!(results[0].id, "s4");
2597        assert_eq!(results[1].id, "s2");
2598
2599        // Non-existent commit returns nothing
2600        let filter = LogFilter {
2601            commit: Some("nonexistent".to_string()),
2602            ..Default::default()
2603        };
2604        let results = db.list_sessions_log(&filter).unwrap();
2605        assert_eq!(results.len(), 0);
2606    }
2607}