Skip to main content

opensession_local_db/
lib.rs

1pub mod git;
2
3use anyhow::{Context, Result};
4use opensession_api::db::migrations::{LOCAL_MIGRATIONS, MIGRATIONS};
5use opensession_core::session::{is_auxiliary_session, working_directory};
6use opensession_core::trace::Session;
7use rusqlite::{params, Connection, OptionalExtension};
8use serde_json::Value;
9use std::fs;
10use std::io::{BufRead, BufReader};
11use std::path::PathBuf;
12use std::sync::Mutex;
13
14use git::{normalize_repo_name, GitContext};
15
16const SUMMARY_WORKER_TITLE_PREFIX_LOWER: &str =
17    "convert a real coding session into semantic compression.";
18
19/// A local session row stored in the local SQLite index/cache database.
20#[derive(Debug, Clone)]
21pub struct LocalSessionRow {
22    pub id: String,
23    pub source_path: Option<String>,
24    pub sync_status: String,
25    pub last_synced_at: Option<String>,
26    pub user_id: Option<String>,
27    pub nickname: Option<String>,
28    pub team_id: Option<String>,
29    pub tool: String,
30    pub agent_provider: Option<String>,
31    pub agent_model: Option<String>,
32    pub title: Option<String>,
33    pub description: Option<String>,
34    pub tags: Option<String>,
35    pub created_at: String,
36    pub uploaded_at: Option<String>,
37    pub message_count: i64,
38    pub user_message_count: i64,
39    pub task_count: i64,
40    pub event_count: i64,
41    pub duration_seconds: i64,
42    pub total_input_tokens: i64,
43    pub total_output_tokens: i64,
44    pub git_remote: Option<String>,
45    pub git_branch: Option<String>,
46    pub git_commit: Option<String>,
47    pub git_repo_name: Option<String>,
48    pub pr_number: Option<i64>,
49    pub pr_url: Option<String>,
50    pub working_directory: Option<String>,
51    pub files_modified: Option<String>,
52    pub files_read: Option<String>,
53    pub has_errors: bool,
54    pub max_active_agents: i64,
55    pub is_auxiliary: bool,
56}
57
58/// A lightweight local link row for session-to-session relationships.
59#[derive(Debug, Clone)]
60pub struct LocalSessionLink {
61    pub session_id: String,
62    pub linked_session_id: String,
63    pub link_type: String,
64    pub created_at: String,
65}
66
67/// Session-level semantic summary row persisted in local SQLite.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct SessionSemanticSummaryRow {
70    pub session_id: String,
71    pub summary_json: String,
72    pub generated_at: String,
73    pub provider: String,
74    pub model: Option<String>,
75    pub source_kind: String,
76    pub generation_kind: String,
77    pub prompt_fingerprint: Option<String>,
78    pub source_details_json: Option<String>,
79    pub diff_tree_json: Option<String>,
80    pub error: Option<String>,
81    pub updated_at: String,
82}
83
84/// Upsert payload for session-level semantic summaries.
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct SessionSemanticSummaryUpsert<'a> {
87    pub session_id: &'a str,
88    pub summary_json: &'a str,
89    pub generated_at: &'a str,
90    pub provider: &'a str,
91    pub model: Option<&'a str>,
92    pub source_kind: &'a str,
93    pub generation_kind: &'a str,
94    pub prompt_fingerprint: Option<&'a str>,
95    pub source_details_json: Option<&'a str>,
96    pub diff_tree_json: Option<&'a str>,
97    pub error: Option<&'a str>,
98}
99
100/// Vector chunk payload persisted per session.
101#[derive(Debug, Clone, PartialEq)]
102pub struct VectorChunkUpsert {
103    pub chunk_id: String,
104    pub session_id: String,
105    pub chunk_index: u32,
106    pub start_line: u32,
107    pub end_line: u32,
108    pub line_count: u32,
109    pub content: String,
110    pub content_hash: String,
111    pub embedding: Vec<f32>,
112}
113
114/// Candidate row used for local semantic vector ranking.
115#[derive(Debug, Clone, PartialEq)]
116pub struct VectorChunkCandidateRow {
117    pub chunk_id: String,
118    pub session_id: String,
119    pub start_line: u32,
120    pub end_line: u32,
121    pub content: String,
122    pub embedding: Vec<f32>,
123}
124
125/// Vector indexing progress/status snapshot.
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct VectorIndexJobRow {
128    pub status: String,
129    pub processed_sessions: u32,
130    pub total_sessions: u32,
131    pub message: Option<String>,
132    pub started_at: Option<String>,
133    pub finished_at: Option<String>,
134}
135
136/// Summary batch generation progress/status snapshot.
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct SummaryBatchJobRow {
139    pub status: String,
140    pub processed_sessions: u32,
141    pub total_sessions: u32,
142    pub failed_sessions: u32,
143    pub message: Option<String>,
144    pub started_at: Option<String>,
145    pub finished_at: Option<String>,
146}
147
148fn infer_tool_from_source_path(source_path: Option<&str>) -> Option<&'static str> {
149    let source_path = source_path.map(|path| path.to_ascii_lowercase())?;
150
151    if source_path.contains("/.codex/sessions/")
152        || source_path.contains("\\.codex\\sessions\\")
153        || source_path.contains("/codex/sessions/")
154        || source_path.contains("\\codex\\sessions\\")
155    {
156        return Some("codex");
157    }
158
159    if source_path.contains("/.claude/projects/")
160        || source_path.contains("\\.claude\\projects\\")
161        || source_path.contains("/claude/projects/")
162        || source_path.contains("\\claude\\projects\\")
163    {
164        return Some("claude-code");
165    }
166
167    None
168}
169
170fn normalize_tool_for_source_path(current_tool: &str, source_path: Option<&str>) -> String {
171    infer_tool_from_source_path(source_path)
172        .unwrap_or(current_tool)
173        .to_string()
174}
175
176fn normalize_non_empty(value: Option<&str>) -> Option<String> {
177    value
178        .map(str::trim)
179        .filter(|value| !value.is_empty())
180        .map(ToOwned::to_owned)
181}
182
183fn build_fts_query(raw: &str) -> Option<String> {
184    let mut parts: Vec<String> = Vec::new();
185    for token in raw.split_whitespace() {
186        let trimmed = token.trim();
187        if trimmed.is_empty() {
188            continue;
189        }
190        let escaped = trimmed.replace('"', "\"\"");
191        parts.push(format!("\"{escaped}\""));
192    }
193    if parts.is_empty() {
194        return None;
195    }
196    Some(parts.join(" OR "))
197}
198
199fn json_object_string(value: &Value, keys: &[&str]) -> Option<String> {
200    let obj = value.as_object()?;
201    for key in keys {
202        if let Some(found) = obj.get(*key).and_then(Value::as_str) {
203            let normalized = found.trim();
204            if !normalized.is_empty() {
205                return Some(normalized.to_string());
206            }
207        }
208    }
209    None
210}
211
212fn git_context_from_session_attributes(session: &Session) -> GitContext {
213    let attrs = &session.context.attributes;
214
215    let mut remote = normalize_non_empty(attrs.get("git_remote").and_then(Value::as_str));
216    let mut branch = normalize_non_empty(attrs.get("git_branch").and_then(Value::as_str));
217    let mut commit = normalize_non_empty(attrs.get("git_commit").and_then(Value::as_str));
218    let mut repo_name = normalize_non_empty(attrs.get("git_repo_name").and_then(Value::as_str));
219
220    if let Some(git_value) = attrs.get("git") {
221        if remote.is_none() {
222            remote = json_object_string(
223                git_value,
224                &["remote", "repository_url", "repo_url", "origin", "url"],
225            );
226        }
227        if branch.is_none() {
228            branch = json_object_string(
229                git_value,
230                &["branch", "git_branch", "current_branch", "ref", "head"],
231            );
232        }
233        if commit.is_none() {
234            commit = json_object_string(git_value, &["commit", "commit_hash", "sha", "git_commit"]);
235        }
236        if repo_name.is_none() {
237            repo_name = json_object_string(git_value, &["repo_name", "repository", "repo", "name"]);
238        }
239    }
240
241    if repo_name.is_none() {
242        repo_name = remote
243            .as_deref()
244            .and_then(normalize_repo_name)
245            .map(ToOwned::to_owned);
246    }
247
248    GitContext {
249        remote,
250        branch,
251        commit,
252        repo_name,
253    }
254}
255
256fn git_context_has_any_field(git: &GitContext) -> bool {
257    git.remote.is_some() || git.branch.is_some() || git.commit.is_some() || git.repo_name.is_some()
258}
259
260fn merge_git_context(preferred: &GitContext, fallback: &GitContext) -> GitContext {
261    GitContext {
262        remote: preferred.remote.clone().or_else(|| fallback.remote.clone()),
263        branch: preferred.branch.clone().or_else(|| fallback.branch.clone()),
264        commit: preferred.commit.clone().or_else(|| fallback.commit.clone()),
265        repo_name: preferred
266            .repo_name
267            .clone()
268            .or_else(|| fallback.repo_name.clone()),
269    }
270}
271
272/// Filter for listing sessions from the local DB.
273#[derive(Debug, Clone)]
274pub struct LocalSessionFilter {
275    pub team_id: Option<String>,
276    pub sync_status: Option<String>,
277    pub git_repo_name: Option<String>,
278    pub search: Option<String>,
279    pub exclude_low_signal: bool,
280    pub tool: Option<String>,
281    pub sort: LocalSortOrder,
282    pub time_range: LocalTimeRange,
283    pub limit: Option<u32>,
284    pub offset: Option<u32>,
285}
286
287impl Default for LocalSessionFilter {
288    fn default() -> Self {
289        Self {
290            team_id: None,
291            sync_status: None,
292            git_repo_name: None,
293            search: None,
294            exclude_low_signal: false,
295            tool: None,
296            sort: LocalSortOrder::Recent,
297            time_range: LocalTimeRange::All,
298            limit: None,
299            offset: None,
300        }
301    }
302}
303
304/// Sort order for local session listing.
305#[derive(Debug, Clone, Default, PartialEq, Eq)]
306pub enum LocalSortOrder {
307    #[default]
308    Recent,
309    Popular,
310    Longest,
311}
312
313/// Time range filter for local session listing.
314#[derive(Debug, Clone, Default, PartialEq, Eq)]
315pub enum LocalTimeRange {
316    Hours24,
317    Days7,
318    Days30,
319    #[default]
320    All,
321}
322
323/// Minimal remote session payload needed for local index/cache upsert.
324#[derive(Debug, Clone)]
325pub struct RemoteSessionSummary {
326    pub id: String,
327    pub user_id: Option<String>,
328    pub nickname: Option<String>,
329    pub team_id: String,
330    pub tool: String,
331    pub agent_provider: Option<String>,
332    pub agent_model: Option<String>,
333    pub title: Option<String>,
334    pub description: Option<String>,
335    pub tags: Option<String>,
336    pub created_at: String,
337    pub uploaded_at: String,
338    pub message_count: i64,
339    pub task_count: i64,
340    pub event_count: i64,
341    pub duration_seconds: i64,
342    pub total_input_tokens: i64,
343    pub total_output_tokens: i64,
344    pub git_remote: Option<String>,
345    pub git_branch: Option<String>,
346    pub git_commit: Option<String>,
347    pub git_repo_name: Option<String>,
348    pub pr_number: Option<i64>,
349    pub pr_url: Option<String>,
350    pub working_directory: Option<String>,
351    pub files_modified: Option<String>,
352    pub files_read: Option<String>,
353    pub has_errors: bool,
354    pub max_active_agents: i64,
355}
356
357/// Extended filter for the `log` command.
358#[derive(Debug, Default)]
359pub struct LogFilter {
360    /// Filter by tool name (exact match).
361    pub tool: Option<String>,
362    /// Filter by model (glob-like, uses LIKE).
363    pub model: Option<String>,
364    /// Filter sessions created after this ISO8601 timestamp.
365    pub since: Option<String>,
366    /// Filter sessions created before this ISO8601 timestamp.
367    pub before: Option<String>,
368    /// Filter sessions that touched this file path (searches files_modified JSON).
369    pub touches: Option<String>,
370    /// Free-text search in title, description, tags.
371    pub grep: Option<String>,
372    /// Only sessions with errors.
373    pub has_errors: Option<bool>,
374    /// Filter by working directory (prefix match).
375    pub working_directory: Option<String>,
376    /// Filter by git repo name.
377    pub git_repo_name: Option<String>,
378    /// Maximum number of results.
379    pub limit: Option<u32>,
380    /// Offset for pagination.
381    pub offset: Option<u32>,
382}
383
384/// Base FROM clause for session list queries.
385const FROM_CLAUSE: &str = "\
386FROM sessions s \
387LEFT JOIN session_sync ss ON ss.session_id = s.id \
388LEFT JOIN users u ON u.id = s.user_id";
389
390/// Local SQLite index/cache shared by TUI and Daemon.
391/// This is not the source of truth for canonical session bodies.
392/// Thread-safe: wraps the connection in a Mutex so it can be shared via `Arc<LocalDb>`.
393pub struct LocalDb {
394    conn: Mutex<Connection>,
395}
396
397impl LocalDb {
398    /// Open (or create) the local database at the default path.
399    /// `~/.local/share/opensession/local.db`
400    pub fn open() -> Result<Self> {
401        let path = default_db_path()?;
402        Self::open_path(&path)
403    }
404
405    /// Open (or create) the local database at a specific path.
406    pub fn open_path(path: &PathBuf) -> Result<Self> {
407        if let Some(parent) = path.parent() {
408            std::fs::create_dir_all(parent)
409                .with_context(|| format!("create dir for {}", path.display()))?;
410        }
411        let conn = open_connection_with_latest_schema(path)
412            .with_context(|| format!("open local db {}", path.display()))?;
413        Ok(Self {
414            conn: Mutex::new(conn),
415        })
416    }
417
418    fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
419        self.conn.lock().expect("local db mutex poisoned")
420    }
421
422    // ── Upsert local session (parsed from file) ────────────────────────
423
424    pub fn upsert_local_session(
425        &self,
426        session: &Session,
427        source_path: &str,
428        git: &GitContext,
429    ) -> Result<()> {
430        let is_empty_signal = session.stats.event_count == 0
431            && session.stats.message_count == 0
432            && session.stats.user_message_count == 0
433            && session.stats.task_count == 0;
434        if is_empty_signal {
435            // Some local tools create placeholder thread files before any real conversation.
436            // Do not index these rows; if one already exists, drop it.
437            self.delete_session(&session.session_id)?;
438            return Ok(());
439        }
440
441        let title = session.context.title.as_deref();
442        let description = session.context.description.as_deref();
443        let tags = if session.context.tags.is_empty() {
444            None
445        } else {
446            Some(session.context.tags.join(","))
447        };
448        let created_at = session.context.created_at.to_rfc3339();
449        let cwd = working_directory(session).map(String::from);
450        let is_auxiliary = is_auxiliary_session(session);
451
452        // Extract files_modified, files_read, and has_errors from events
453        let (files_modified, files_read, has_errors) =
454            opensession_core::extract::extract_file_metadata(session);
455        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
456        let normalized_tool =
457            normalize_tool_for_source_path(&session.agent.tool, Some(source_path));
458        let git_from_session = git_context_from_session_attributes(session);
459        let has_session_git = git_context_has_any_field(&git_from_session);
460        let merged_git = merge_git_context(&git_from_session, git);
461
462        let conn = self.conn();
463        // Body contents are resolved via canonical body URLs and local body cache.
464        conn.execute(
465            "INSERT INTO sessions \
466             (id, team_id, tool, agent_provider, agent_model, \
467              title, description, tags, created_at, \
468             message_count, user_message_count, task_count, event_count, duration_seconds, \
469              total_input_tokens, total_output_tokens, body_storage_key, \
470              git_remote, git_branch, git_commit, git_repo_name, working_directory, \
471              files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
472             VALUES (?1,'personal',?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,'',?16,?17,?18,?19,?20,?21,?22,?23,?24,?25) \
473             ON CONFLICT(id) DO UPDATE SET \
474              tool=excluded.tool, agent_provider=excluded.agent_provider, \
475              agent_model=excluded.agent_model, \
476              title=excluded.title, description=excluded.description, \
477              tags=excluded.tags, \
478              message_count=excluded.message_count, user_message_count=excluded.user_message_count, \
479              task_count=excluded.task_count, \
480              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
481              total_input_tokens=excluded.total_input_tokens, \
482              total_output_tokens=excluded.total_output_tokens, \
483              git_remote=CASE WHEN ?26=1 THEN excluded.git_remote ELSE COALESCE(git_remote, excluded.git_remote) END, \
484              git_branch=CASE WHEN ?26=1 THEN excluded.git_branch ELSE COALESCE(git_branch, excluded.git_branch) END, \
485              git_commit=CASE WHEN ?26=1 THEN excluded.git_commit ELSE COALESCE(git_commit, excluded.git_commit) END, \
486              git_repo_name=CASE WHEN ?26=1 THEN excluded.git_repo_name ELSE COALESCE(git_repo_name, excluded.git_repo_name) END, \
487              working_directory=excluded.working_directory, \
488              files_modified=excluded.files_modified, files_read=excluded.files_read, \
489              has_errors=excluded.has_errors, \
490              max_active_agents=excluded.max_active_agents, \
491              is_auxiliary=excluded.is_auxiliary",
492            params![
493                &session.session_id,
494                &normalized_tool,
495                &session.agent.provider,
496                &session.agent.model,
497                title,
498                description,
499                &tags,
500                &created_at,
501                session.stats.message_count as i64,
502                session.stats.user_message_count as i64,
503                session.stats.task_count as i64,
504                session.stats.event_count as i64,
505                session.stats.duration_seconds as i64,
506                session.stats.total_input_tokens as i64,
507                session.stats.total_output_tokens as i64,
508                &merged_git.remote,
509                &merged_git.branch,
510                &merged_git.commit,
511                &merged_git.repo_name,
512                &cwd,
513                &files_modified,
514                &files_read,
515                has_errors,
516                max_active_agents,
517                is_auxiliary as i64,
518                has_session_git as i64,
519            ],
520        )?;
521
522        conn.execute(
523            "INSERT INTO session_sync (session_id, source_path, sync_status) \
524             VALUES (?1, ?2, 'local_only') \
525             ON CONFLICT(session_id) DO UPDATE SET source_path=excluded.source_path",
526            params![&session.session_id, source_path],
527        )?;
528        Ok(())
529    }
530
531    // ── Upsert remote session (from server sync pull) ──────────────────
532
533    pub fn upsert_remote_session(&self, summary: &RemoteSessionSummary) -> Result<()> {
534        let conn = self.conn();
535        // Body contents are resolved via canonical body URLs and local body cache.
536        conn.execute(
537            "INSERT INTO sessions \
538             (id, user_id, team_id, tool, agent_provider, agent_model, \
539              title, description, tags, created_at, uploaded_at, \
540              message_count, task_count, event_count, duration_seconds, \
541              total_input_tokens, total_output_tokens, body_storage_key, \
542              git_remote, git_branch, git_commit, git_repo_name, \
543              pr_number, pr_url, working_directory, \
544              files_modified, files_read, has_errors, max_active_agents, is_auxiliary) \
545             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,'',?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28,0) \
546             ON CONFLICT(id) DO UPDATE SET \
547              title=excluded.title, description=excluded.description, \
548              tags=excluded.tags, uploaded_at=excluded.uploaded_at, \
549              message_count=excluded.message_count, task_count=excluded.task_count, \
550              event_count=excluded.event_count, duration_seconds=excluded.duration_seconds, \
551              total_input_tokens=excluded.total_input_tokens, \
552              total_output_tokens=excluded.total_output_tokens, \
553              git_remote=excluded.git_remote, git_branch=excluded.git_branch, \
554              git_commit=excluded.git_commit, git_repo_name=excluded.git_repo_name, \
555              pr_number=excluded.pr_number, pr_url=excluded.pr_url, \
556              working_directory=excluded.working_directory, \
557              files_modified=excluded.files_modified, files_read=excluded.files_read, \
558              has_errors=excluded.has_errors, \
559              max_active_agents=excluded.max_active_agents, \
560              is_auxiliary=excluded.is_auxiliary",
561            params![
562                &summary.id,
563                &summary.user_id,
564                &summary.team_id,
565                &summary.tool,
566                &summary.agent_provider,
567                &summary.agent_model,
568                &summary.title,
569                &summary.description,
570                &summary.tags,
571                &summary.created_at,
572                &summary.uploaded_at,
573                summary.message_count,
574                summary.task_count,
575                summary.event_count,
576                summary.duration_seconds,
577                summary.total_input_tokens,
578                summary.total_output_tokens,
579                &summary.git_remote,
580                &summary.git_branch,
581                &summary.git_commit,
582                &summary.git_repo_name,
583                summary.pr_number,
584                &summary.pr_url,
585                &summary.working_directory,
586                &summary.files_modified,
587                &summary.files_read,
588                summary.has_errors,
589                summary.max_active_agents,
590            ],
591        )?;
592
593        conn.execute(
594            "INSERT INTO session_sync (session_id, sync_status) \
595             VALUES (?1, 'remote_only') \
596             ON CONFLICT(session_id) DO UPDATE SET \
597              sync_status = CASE WHEN session_sync.sync_status = 'local_only' THEN 'synced' ELSE session_sync.sync_status END",
598            params![&summary.id],
599        )?;
600        Ok(())
601    }
602
603    // ── List sessions ──────────────────────────────────────────────────
604
605    fn build_local_session_where_clause(
606        filter: &LocalSessionFilter,
607    ) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
608        let mut where_clauses = vec![
609            "1=1".to_string(),
610            "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
611            format!(
612                "NOT (LOWER(COALESCE(s.tool, '')) = 'codex' \
613                 AND LOWER(COALESCE(s.title, '')) LIKE '{}%')",
614                SUMMARY_WORKER_TITLE_PREFIX_LOWER
615            ),
616        ];
617        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
618        let mut idx = 1u32;
619
620        if let Some(ref team_id) = filter.team_id {
621            where_clauses.push(format!("s.team_id = ?{idx}"));
622            param_values.push(Box::new(team_id.clone()));
623            idx += 1;
624        }
625
626        if let Some(ref sync_status) = filter.sync_status {
627            where_clauses.push(format!("COALESCE(ss.sync_status, 'unknown') = ?{idx}"));
628            param_values.push(Box::new(sync_status.clone()));
629            idx += 1;
630        }
631
632        if let Some(ref repo) = filter.git_repo_name {
633            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
634            param_values.push(Box::new(repo.clone()));
635            idx += 1;
636        }
637
638        if let Some(ref tool) = filter.tool {
639            where_clauses.push(format!("s.tool = ?{idx}"));
640            param_values.push(Box::new(tool.clone()));
641            idx += 1;
642        }
643
644        if let Some(ref search) = filter.search {
645            let like = format!("%{search}%");
646            where_clauses.push(format!(
647                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
648                i1 = idx,
649                i2 = idx + 1,
650                i3 = idx + 2,
651            ));
652            param_values.push(Box::new(like.clone()));
653            param_values.push(Box::new(like.clone()));
654            param_values.push(Box::new(like));
655            idx += 3;
656        }
657
658        if filter.exclude_low_signal {
659            where_clauses.push(
660                "NOT (COALESCE(s.message_count, 0) = 0 \
661                  AND COALESCE(s.user_message_count, 0) = 0 \
662                  AND COALESCE(s.task_count, 0) = 0 \
663                  AND COALESCE(s.event_count, 0) <= 2 \
664                  AND (s.title IS NULL OR TRIM(s.title) = ''))"
665                    .to_string(),
666            );
667        }
668
669        let interval = match filter.time_range {
670            LocalTimeRange::Hours24 => Some("-1 day"),
671            LocalTimeRange::Days7 => Some("-7 days"),
672            LocalTimeRange::Days30 => Some("-30 days"),
673            LocalTimeRange::All => None,
674        };
675        if let Some(interval) = interval {
676            where_clauses.push(format!("datetime(s.created_at) >= datetime('now', ?{idx})"));
677            param_values.push(Box::new(interval.to_string()));
678        }
679
680        (where_clauses.join(" AND "), param_values)
681    }
682
683    pub fn list_sessions(&self, filter: &LocalSessionFilter) -> Result<Vec<LocalSessionRow>> {
684        let (where_str, mut param_values) = Self::build_local_session_where_clause(filter);
685        let order_clause = match filter.sort {
686            LocalSortOrder::Popular => "s.message_count DESC, s.created_at DESC",
687            LocalSortOrder::Longest => "s.duration_seconds DESC, s.created_at DESC",
688            LocalSortOrder::Recent => "s.created_at DESC",
689        };
690
691        let mut sql = format!(
692            "SELECT {LOCAL_SESSION_COLUMNS} \
693             {FROM_CLAUSE} WHERE {where_str} \
694             ORDER BY {order_clause}"
695        );
696
697        if let Some(limit) = filter.limit {
698            sql.push_str(" LIMIT ?");
699            param_values.push(Box::new(limit));
700            if let Some(offset) = filter.offset {
701                sql.push_str(" OFFSET ?");
702                param_values.push(Box::new(offset));
703            }
704        }
705
706        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
707            param_values.iter().map(|p| p.as_ref()).collect();
708        let conn = self.conn();
709        let mut stmt = conn.prepare(&sql)?;
710        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
711
712        let mut result = Vec::new();
713        for row in rows {
714            result.push(row?);
715        }
716
717        Ok(result)
718    }
719
720    /// Count sessions for a given list filter (before UI-level page slicing).
721    pub fn count_sessions_filtered(&self, filter: &LocalSessionFilter) -> Result<i64> {
722        let mut count_filter = filter.clone();
723        count_filter.limit = None;
724        count_filter.offset = None;
725        let (where_str, param_values) = Self::build_local_session_where_clause(&count_filter);
726        let sql = format!("SELECT COUNT(*) {FROM_CLAUSE} WHERE {where_str}");
727        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
728            param_values.iter().map(|p| p.as_ref()).collect();
729        let conn = self.conn();
730        let count = conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
731        Ok(count)
732    }
733
734    /// List distinct tool names for the current list filter (ignores active tool filter).
735    pub fn list_session_tools(&self, filter: &LocalSessionFilter) -> Result<Vec<String>> {
736        let mut tool_filter = filter.clone();
737        tool_filter.tool = None;
738        tool_filter.limit = None;
739        tool_filter.offset = None;
740        let (where_str, param_values) = Self::build_local_session_where_clause(&tool_filter);
741        let sql = format!(
742            "SELECT DISTINCT s.tool \
743             {FROM_CLAUSE} WHERE {where_str} \
744             ORDER BY s.tool ASC"
745        );
746        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
747            param_values.iter().map(|p| p.as_ref()).collect();
748        let conn = self.conn();
749        let mut stmt = conn.prepare(&sql)?;
750        let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?;
751
752        let mut tools = Vec::new();
753        for row in rows {
754            let tool = row?;
755            if !tool.trim().is_empty() {
756                tools.push(tool);
757            }
758        }
759        Ok(tools)
760    }
761
762    // ── Log query ─────────────────────────────────────────────────────
763
764    /// Query sessions with extended filters for the `log` command.
765    pub fn list_sessions_log(&self, filter: &LogFilter) -> Result<Vec<LocalSessionRow>> {
766        let mut where_clauses = vec![
767            "1=1".to_string(),
768            "COALESCE(s.is_auxiliary, 0) = 0".to_string(),
769            format!(
770                "NOT (LOWER(COALESCE(s.tool, '')) = 'codex' \
771                 AND LOWER(COALESCE(s.title, '')) LIKE '{}%')",
772                SUMMARY_WORKER_TITLE_PREFIX_LOWER
773            ),
774        ];
775        let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
776        let mut idx = 1u32;
777
778        if let Some(ref tool) = filter.tool {
779            where_clauses.push(format!("s.tool = ?{idx}"));
780            param_values.push(Box::new(tool.clone()));
781            idx += 1;
782        }
783
784        if let Some(ref model) = filter.model {
785            let like = model.replace('*', "%");
786            where_clauses.push(format!("s.agent_model LIKE ?{idx}"));
787            param_values.push(Box::new(like));
788            idx += 1;
789        }
790
791        if let Some(ref since) = filter.since {
792            where_clauses.push(format!("s.created_at >= ?{idx}"));
793            param_values.push(Box::new(since.clone()));
794            idx += 1;
795        }
796
797        if let Some(ref before) = filter.before {
798            where_clauses.push(format!("s.created_at < ?{idx}"));
799            param_values.push(Box::new(before.clone()));
800            idx += 1;
801        }
802
803        if let Some(ref touches) = filter.touches {
804            let like = format!("%\"{touches}\"%");
805            where_clauses.push(format!("s.files_modified LIKE ?{idx}"));
806            param_values.push(Box::new(like));
807            idx += 1;
808        }
809
810        if let Some(ref grep) = filter.grep {
811            let like = format!("%{grep}%");
812            where_clauses.push(format!(
813                "(s.title LIKE ?{i1} OR s.description LIKE ?{i2} OR s.tags LIKE ?{i3})",
814                i1 = idx,
815                i2 = idx + 1,
816                i3 = idx + 2,
817            ));
818            param_values.push(Box::new(like.clone()));
819            param_values.push(Box::new(like.clone()));
820            param_values.push(Box::new(like));
821            idx += 3;
822        }
823
824        if let Some(true) = filter.has_errors {
825            where_clauses.push("s.has_errors = 1".to_string());
826        }
827
828        if let Some(ref wd) = filter.working_directory {
829            where_clauses.push(format!("s.working_directory LIKE ?{idx}"));
830            param_values.push(Box::new(format!("{wd}%")));
831            idx += 1;
832        }
833
834        if let Some(ref repo) = filter.git_repo_name {
835            where_clauses.push(format!("s.git_repo_name = ?{idx}"));
836            param_values.push(Box::new(repo.clone()));
837            idx += 1;
838        }
839
840        let _ = idx; // suppress unused warning
841
842        let where_str = where_clauses.join(" AND ");
843        let mut sql = format!(
844            "SELECT {LOCAL_SESSION_COLUMNS} \
845             {FROM_CLAUSE} WHERE {where_str} \
846             ORDER BY s.created_at DESC"
847        );
848
849        if let Some(limit) = filter.limit {
850            sql.push_str(" LIMIT ?");
851            param_values.push(Box::new(limit));
852            if let Some(offset) = filter.offset {
853                sql.push_str(" OFFSET ?");
854                param_values.push(Box::new(offset));
855            }
856        }
857
858        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
859            param_values.iter().map(|p| p.as_ref()).collect();
860        let conn = self.conn();
861        let mut stmt = conn.prepare(&sql)?;
862        let rows = stmt.query_map(param_refs.as_slice(), row_to_local_session)?;
863
864        let mut result = Vec::new();
865        for row in rows {
866            result.push(row?);
867        }
868        Ok(result)
869    }
870
871    /// Get the latest N sessions for a specific tool, ordered by created_at DESC.
872    pub fn get_sessions_by_tool_latest(
873        &self,
874        tool: &str,
875        count: u32,
876    ) -> Result<Vec<LocalSessionRow>> {
877        let sql = format!(
878            "SELECT {LOCAL_SESSION_COLUMNS} \
879             {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
880             ORDER BY s.created_at DESC"
881        );
882        let conn = self.conn();
883        let mut stmt = conn.prepare(&sql)?;
884        let rows = stmt.query_map(params![tool], row_to_local_session)?;
885        let mut result = Vec::new();
886        for row in rows {
887            result.push(row?);
888        }
889
890        result.truncate(count as usize);
891        Ok(result)
892    }
893
894    /// Get the latest N sessions across all tools, ordered by created_at DESC.
895    pub fn get_sessions_latest(&self, count: u32) -> Result<Vec<LocalSessionRow>> {
896        let sql = format!(
897            "SELECT {LOCAL_SESSION_COLUMNS} \
898             {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
899             ORDER BY s.created_at DESC"
900        );
901        let conn = self.conn();
902        let mut stmt = conn.prepare(&sql)?;
903        let rows = stmt.query_map([], row_to_local_session)?;
904        let mut result = Vec::new();
905        for row in rows {
906            result.push(row?);
907        }
908
909        result.truncate(count as usize);
910        Ok(result)
911    }
912
913    /// Get the Nth most recent session for a specific tool (0 = HEAD, 1 = HEAD~1, etc.).
914    pub fn get_session_by_tool_offset(
915        &self,
916        tool: &str,
917        offset: u32,
918    ) -> Result<Option<LocalSessionRow>> {
919        let sql = format!(
920            "SELECT {LOCAL_SESSION_COLUMNS} \
921             {FROM_CLAUSE} WHERE s.tool = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
922             ORDER BY s.created_at DESC"
923        );
924        let conn = self.conn();
925        let mut stmt = conn.prepare(&sql)?;
926        let rows = stmt.query_map(params![tool], row_to_local_session)?;
927        let result = rows.collect::<Result<Vec<_>, _>>()?;
928        Ok(result.into_iter().nth(offset as usize))
929    }
930
931    /// Get the Nth most recent session across all tools (0 = HEAD, 1 = HEAD~1, etc.).
932    pub fn get_session_by_offset(&self, offset: u32) -> Result<Option<LocalSessionRow>> {
933        let sql = format!(
934            "SELECT {LOCAL_SESSION_COLUMNS} \
935             {FROM_CLAUSE} WHERE COALESCE(s.is_auxiliary, 0) = 0 \
936             ORDER BY s.created_at DESC"
937        );
938        let conn = self.conn();
939        let mut stmt = conn.prepare(&sql)?;
940        let rows = stmt.query_map([], row_to_local_session)?;
941        let result = rows.collect::<Result<Vec<_>, _>>()?;
942        Ok(result.into_iter().nth(offset as usize))
943    }
944
945    /// Fetch the source path used when the session was last parsed/loaded.
946    pub fn get_session_source_path(&self, session_id: &str) -> Result<Option<String>> {
947        let conn = self.conn();
948        let result = conn
949            .query_row(
950                "SELECT source_path FROM session_sync WHERE session_id = ?1",
951                params![session_id],
952                |row| row.get(0),
953            )
954            .optional()?;
955
956        Ok(result)
957    }
958
959    /// List every session id with a non-empty source path from session_sync.
960    pub fn list_session_source_paths(&self) -> Result<Vec<(String, String)>> {
961        let conn = self.conn();
962        let mut stmt = conn.prepare(
963            "SELECT session_id, source_path \
964             FROM session_sync \
965             WHERE source_path IS NOT NULL AND TRIM(source_path) != ''",
966        )?;
967        let rows = stmt.query_map([], |row| {
968            let session_id: String = row.get(0)?;
969            let source_path: String = row.get(1)?;
970            Ok((session_id, source_path))
971        })?;
972        rows.collect::<std::result::Result<Vec<_>, _>>()
973            .map_err(Into::into)
974    }
975
976    /// Get a single session row by id.
977    pub fn get_session_by_id(&self, session_id: &str) -> Result<Option<LocalSessionRow>> {
978        let sql = format!(
979            "SELECT {LOCAL_SESSION_COLUMNS} \
980             {FROM_CLAUSE} WHERE s.id = ?1 LIMIT 1"
981        );
982        let conn = self.conn();
983        let mut stmt = conn.prepare(&sql)?;
984        let row = stmt
985            .query_map(params![session_id], row_to_local_session)?
986            .next()
987            .transpose()?;
988        Ok(row)
989    }
990
991    /// List links where the given session is the source session.
992    pub fn list_session_links(&self, session_id: &str) -> Result<Vec<LocalSessionLink>> {
993        let conn = self.conn();
994        let mut stmt = conn.prepare(
995            "SELECT session_id, linked_session_id, link_type, created_at \
996             FROM session_links WHERE session_id = ?1 ORDER BY created_at ASC",
997        )?;
998        let rows = stmt.query_map(params![session_id], |row| {
999            Ok(LocalSessionLink {
1000                session_id: row.get(0)?,
1001                linked_session_id: row.get(1)?,
1002                link_type: row.get(2)?,
1003                created_at: row.get(3)?,
1004            })
1005        })?;
1006        rows.collect::<std::result::Result<Vec<_>, _>>()
1007            .map_err(Into::into)
1008    }
1009
1010    /// Count total sessions in the local DB.
1011    pub fn session_count(&self) -> Result<i64> {
1012        let count = self
1013            .conn()
1014            .query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
1015        Ok(count)
1016    }
1017
1018    // ── Delete session ─────────────────────────────────────────────────
1019
1020    pub fn delete_session(&self, session_id: &str) -> Result<()> {
1021        let conn = self.conn();
1022        conn.execute(
1023            "DELETE FROM session_links WHERE session_id = ?1 OR linked_session_id = ?1",
1024            params![session_id],
1025        )?;
1026        conn.execute(
1027            "DELETE FROM vector_embeddings \
1028             WHERE chunk_id IN (SELECT id FROM vector_chunks WHERE session_id = ?1)",
1029            params![session_id],
1030        )?;
1031        conn.execute(
1032            "DELETE FROM vector_chunks_fts WHERE session_id = ?1",
1033            params![session_id],
1034        )?;
1035        conn.execute(
1036            "DELETE FROM vector_chunks WHERE session_id = ?1",
1037            params![session_id],
1038        )?;
1039        conn.execute(
1040            "DELETE FROM vector_index_sessions WHERE session_id = ?1",
1041            params![session_id],
1042        )?;
1043        conn.execute(
1044            "DELETE FROM session_semantic_summaries WHERE session_id = ?1",
1045            params![session_id],
1046        )?;
1047        conn.execute(
1048            "DELETE FROM body_cache WHERE session_id = ?1",
1049            params![session_id],
1050        )?;
1051        conn.execute(
1052            "DELETE FROM session_sync WHERE session_id = ?1",
1053            params![session_id],
1054        )?;
1055        conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
1056        Ok(())
1057    }
1058
1059    // ── Semantic summary cache ───────────────────────────────────────────
1060
1061    pub fn upsert_session_semantic_summary(
1062        &self,
1063        payload: &SessionSemanticSummaryUpsert<'_>,
1064    ) -> Result<()> {
1065        self.conn().execute(
1066            "INSERT INTO session_semantic_summaries (\
1067                session_id, summary_json, generated_at, provider, model, \
1068                source_kind, generation_kind, prompt_fingerprint, source_details_json, \
1069                diff_tree_json, error, updated_at\
1070             ) VALUES (\
1071                ?1, ?2, ?3, ?4, ?5, \
1072                ?6, ?7, ?8, ?9, \
1073                ?10, ?11, datetime('now')\
1074             ) \
1075             ON CONFLICT(session_id) DO UPDATE SET \
1076                summary_json=excluded.summary_json, \
1077                generated_at=excluded.generated_at, \
1078                provider=excluded.provider, \
1079                model=excluded.model, \
1080                source_kind=excluded.source_kind, \
1081                generation_kind=excluded.generation_kind, \
1082                prompt_fingerprint=excluded.prompt_fingerprint, \
1083                source_details_json=excluded.source_details_json, \
1084                diff_tree_json=excluded.diff_tree_json, \
1085                error=excluded.error, \
1086                updated_at=datetime('now')",
1087            params![
1088                payload.session_id,
1089                payload.summary_json,
1090                payload.generated_at,
1091                payload.provider,
1092                payload.model,
1093                payload.source_kind,
1094                payload.generation_kind,
1095                payload.prompt_fingerprint,
1096                payload.source_details_json,
1097                payload.diff_tree_json,
1098                payload.error,
1099            ],
1100        )?;
1101        Ok(())
1102    }
1103
1104    pub fn list_expired_session_ids(&self, keep_days: u32) -> Result<Vec<String>> {
1105        let conn = self.conn();
1106        let mut stmt = conn.prepare(
1107            "SELECT id FROM sessions \
1108             WHERE julianday(created_at) <= julianday('now') - ?1 \
1109             ORDER BY created_at ASC",
1110        )?;
1111        let rows = stmt.query_map(params![keep_days as i64], |row| row.get(0))?;
1112        rows.collect::<std::result::Result<Vec<_>, _>>()
1113            .map_err(Into::into)
1114    }
1115
1116    /// List all known session ids for migration or maintenance workflows.
1117    pub fn list_all_session_ids(&self) -> Result<Vec<String>> {
1118        let conn = self.conn();
1119        let mut stmt = conn.prepare("SELECT id FROM sessions ORDER BY id ASC")?;
1120        let rows = stmt.query_map([], |row| row.get(0))?;
1121        rows.collect::<std::result::Result<Vec<_>, _>>()
1122            .map_err(Into::into)
1123    }
1124
1125    /// List all session ids that currently have cached semantic summaries.
1126    pub fn list_session_semantic_summary_ids(&self) -> Result<Vec<String>> {
1127        let conn = self.conn();
1128        let mut stmt = conn
1129            .prepare("SELECT session_id FROM session_semantic_summaries ORDER BY session_id ASC")?;
1130        let rows = stmt.query_map([], |row| row.get(0))?;
1131        rows.collect::<std::result::Result<Vec<_>, _>>()
1132            .map_err(Into::into)
1133    }
1134
1135    pub fn get_session_semantic_summary(
1136        &self,
1137        session_id: &str,
1138    ) -> Result<Option<SessionSemanticSummaryRow>> {
1139        let row = self
1140            .conn()
1141            .query_row(
1142                "SELECT session_id, summary_json, generated_at, provider, model, \
1143                        source_kind, generation_kind, prompt_fingerprint, source_details_json, \
1144                        diff_tree_json, error, updated_at \
1145                 FROM session_semantic_summaries WHERE session_id = ?1 LIMIT 1",
1146                params![session_id],
1147                |row| {
1148                    Ok(SessionSemanticSummaryRow {
1149                        session_id: row.get(0)?,
1150                        summary_json: row.get(1)?,
1151                        generated_at: row.get(2)?,
1152                        provider: row.get(3)?,
1153                        model: row.get(4)?,
1154                        source_kind: row.get(5)?,
1155                        generation_kind: row.get(6)?,
1156                        prompt_fingerprint: row.get(7)?,
1157                        source_details_json: row.get(8)?,
1158                        diff_tree_json: row.get(9)?,
1159                        error: row.get(10)?,
1160                        updated_at: row.get(11)?,
1161                    })
1162                },
1163            )
1164            .optional()?;
1165        Ok(row)
1166    }
1167
1168    pub fn delete_expired_session_summaries(&self, keep_days: u32) -> Result<u32> {
1169        let deleted = self.conn().execute(
1170            "DELETE FROM session_semantic_summaries \
1171             WHERE julianday(generated_at) <= julianday('now') - ?1",
1172            params![keep_days as i64],
1173        )?;
1174        Ok(deleted as u32)
1175    }
1176
1177    // ── Semantic vector index cache ────────────────────────────────────
1178
1179    pub fn vector_index_source_hash(&self, session_id: &str) -> Result<Option<String>> {
1180        let hash = self
1181            .conn()
1182            .query_row(
1183                "SELECT source_hash FROM vector_index_sessions WHERE session_id = ?1",
1184                params![session_id],
1185                |row| row.get(0),
1186            )
1187            .optional()?;
1188        Ok(hash)
1189    }
1190
1191    pub fn clear_vector_index(&self) -> Result<()> {
1192        let conn = self.conn();
1193        conn.execute("DELETE FROM vector_embeddings", [])?;
1194        conn.execute("DELETE FROM vector_chunks_fts", [])?;
1195        conn.execute("DELETE FROM vector_chunks", [])?;
1196        conn.execute("DELETE FROM vector_index_sessions", [])?;
1197        Ok(())
1198    }
1199
1200    pub fn replace_session_vector_chunks(
1201        &self,
1202        session_id: &str,
1203        source_hash: &str,
1204        model: &str,
1205        chunks: &[VectorChunkUpsert],
1206    ) -> Result<()> {
1207        let mut conn = self.conn();
1208        let tx = conn.transaction()?;
1209
1210        tx.execute(
1211            "DELETE FROM vector_embeddings \
1212             WHERE chunk_id IN (SELECT id FROM vector_chunks WHERE session_id = ?1)",
1213            params![session_id],
1214        )?;
1215        tx.execute(
1216            "DELETE FROM vector_chunks_fts WHERE session_id = ?1",
1217            params![session_id],
1218        )?;
1219        tx.execute(
1220            "DELETE FROM vector_chunks WHERE session_id = ?1",
1221            params![session_id],
1222        )?;
1223
1224        for chunk in chunks {
1225            let embedding_json = serde_json::to_string(&chunk.embedding)
1226                .context("serialize vector embedding for local cache")?;
1227            tx.execute(
1228                "INSERT INTO vector_chunks \
1229                 (id, session_id, chunk_index, start_line, end_line, line_count, content, content_hash, created_at, updated_at) \
1230                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, datetime('now'), datetime('now'))",
1231                params![
1232                    &chunk.chunk_id,
1233                    &chunk.session_id,
1234                    chunk.chunk_index as i64,
1235                    chunk.start_line as i64,
1236                    chunk.end_line as i64,
1237                    chunk.line_count as i64,
1238                    &chunk.content,
1239                    &chunk.content_hash,
1240                ],
1241            )?;
1242            tx.execute(
1243                "INSERT INTO vector_embeddings \
1244                 (chunk_id, model, embedding_dim, embedding_json, updated_at) \
1245                 VALUES (?1, ?2, ?3, ?4, datetime('now'))",
1246                params![
1247                    &chunk.chunk_id,
1248                    model,
1249                    chunk.embedding.len() as i64,
1250                    &embedding_json
1251                ],
1252            )?;
1253            tx.execute(
1254                "INSERT INTO vector_chunks_fts (chunk_id, session_id, content) VALUES (?1, ?2, ?3)",
1255                params![&chunk.chunk_id, &chunk.session_id, &chunk.content],
1256            )?;
1257        }
1258
1259        tx.execute(
1260            "INSERT INTO vector_index_sessions \
1261             (session_id, source_hash, chunk_count, last_indexed_at, updated_at) \
1262             VALUES (?1, ?2, ?3, datetime('now'), datetime('now')) \
1263             ON CONFLICT(session_id) DO UPDATE SET \
1264             source_hash=excluded.source_hash, \
1265             chunk_count=excluded.chunk_count, \
1266             last_indexed_at=datetime('now'), \
1267             updated_at=datetime('now')",
1268            params![session_id, source_hash, chunks.len() as i64],
1269        )?;
1270
1271        tx.commit()?;
1272        Ok(())
1273    }
1274
1275    pub fn list_vector_chunk_candidates(
1276        &self,
1277        query: &str,
1278        model: &str,
1279        limit: u32,
1280    ) -> Result<Vec<VectorChunkCandidateRow>> {
1281        let Some(fts_query) = build_fts_query(query) else {
1282            return Ok(Vec::new());
1283        };
1284        let conn = self.conn();
1285        let mut stmt = conn.prepare(
1286            "SELECT c.id, c.session_id, c.start_line, c.end_line, c.content, e.embedding_json \
1287             FROM vector_chunks_fts f \
1288             INNER JOIN vector_chunks c ON c.id = f.chunk_id \
1289             INNER JOIN vector_embeddings e ON e.chunk_id = c.id \
1290             WHERE f.content MATCH ?1 AND e.model = ?2 \
1291             ORDER BY bm25(vector_chunks_fts) ASC, c.updated_at DESC \
1292             LIMIT ?3",
1293        )?;
1294        let rows = stmt.query_map(params![fts_query, model, limit as i64], |row| {
1295            let embedding_json: String = row.get(5)?;
1296            let embedding =
1297                serde_json::from_str::<Vec<f32>>(&embedding_json).unwrap_or_else(|_| Vec::new());
1298            Ok(VectorChunkCandidateRow {
1299                chunk_id: row.get(0)?,
1300                session_id: row.get(1)?,
1301                start_line: row.get::<_, i64>(2)?.max(0) as u32,
1302                end_line: row.get::<_, i64>(3)?.max(0) as u32,
1303                content: row.get(4)?,
1304                embedding,
1305            })
1306        })?;
1307        rows.collect::<std::result::Result<Vec<_>, _>>()
1308            .map_err(Into::into)
1309    }
1310
1311    pub fn list_recent_vector_chunks_for_model(
1312        &self,
1313        model: &str,
1314        limit: u32,
1315    ) -> Result<Vec<VectorChunkCandidateRow>> {
1316        let conn = self.conn();
1317        let mut stmt = conn.prepare(
1318            "SELECT c.id, c.session_id, c.start_line, c.end_line, c.content, e.embedding_json \
1319             FROM vector_chunks c \
1320             INNER JOIN vector_embeddings e ON e.chunk_id = c.id \
1321             WHERE e.model = ?1 \
1322             ORDER BY c.updated_at DESC \
1323             LIMIT ?2",
1324        )?;
1325        let rows = stmt.query_map(params![model, limit as i64], |row| {
1326            let embedding_json: String = row.get(5)?;
1327            let embedding =
1328                serde_json::from_str::<Vec<f32>>(&embedding_json).unwrap_or_else(|_| Vec::new());
1329            Ok(VectorChunkCandidateRow {
1330                chunk_id: row.get(0)?,
1331                session_id: row.get(1)?,
1332                start_line: row.get::<_, i64>(2)?.max(0) as u32,
1333                end_line: row.get::<_, i64>(3)?.max(0) as u32,
1334                content: row.get(4)?,
1335                embedding,
1336            })
1337        })?;
1338        rows.collect::<std::result::Result<Vec<_>, _>>()
1339            .map_err(Into::into)
1340    }
1341
1342    pub fn set_vector_index_job(&self, payload: &VectorIndexJobRow) -> Result<()> {
1343        self.conn().execute(
1344            "INSERT INTO vector_index_jobs \
1345             (id, status, processed_sessions, total_sessions, message, started_at, finished_at, updated_at) \
1346             VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6, datetime('now')) \
1347             ON CONFLICT(id) DO UPDATE SET \
1348             status=excluded.status, \
1349             processed_sessions=excluded.processed_sessions, \
1350             total_sessions=excluded.total_sessions, \
1351             message=excluded.message, \
1352             started_at=excluded.started_at, \
1353             finished_at=excluded.finished_at, \
1354             updated_at=datetime('now')",
1355            params![
1356                payload.status,
1357                payload.processed_sessions as i64,
1358                payload.total_sessions as i64,
1359                payload.message,
1360                payload.started_at,
1361                payload.finished_at,
1362            ],
1363        )?;
1364        Ok(())
1365    }
1366
1367    pub fn get_vector_index_job(&self) -> Result<Option<VectorIndexJobRow>> {
1368        let row = self
1369            .conn()
1370            .query_row(
1371                "SELECT status, processed_sessions, total_sessions, message, started_at, finished_at \
1372                 FROM vector_index_jobs WHERE id = 1 LIMIT 1",
1373                [],
1374                |row| {
1375                    Ok(VectorIndexJobRow {
1376                        status: row.get(0)?,
1377                        processed_sessions: row.get::<_, i64>(1)?.max(0) as u32,
1378                        total_sessions: row.get::<_, i64>(2)?.max(0) as u32,
1379                        message: row.get(3)?,
1380                        started_at: row.get(4)?,
1381                        finished_at: row.get(5)?,
1382                    })
1383                },
1384            )
1385            .optional()?;
1386        Ok(row)
1387    }
1388
1389    pub fn set_summary_batch_job(&self, payload: &SummaryBatchJobRow) -> Result<()> {
1390        self.conn().execute(
1391            "INSERT INTO summary_batch_jobs \
1392             (id, status, processed_sessions, total_sessions, failed_sessions, message, started_at, finished_at, updated_at) \
1393             VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6, ?7, datetime('now')) \
1394             ON CONFLICT(id) DO UPDATE SET \
1395             status=excluded.status, \
1396             processed_sessions=excluded.processed_sessions, \
1397             total_sessions=excluded.total_sessions, \
1398             failed_sessions=excluded.failed_sessions, \
1399             message=excluded.message, \
1400             started_at=excluded.started_at, \
1401             finished_at=excluded.finished_at, \
1402             updated_at=datetime('now')",
1403            params![
1404                payload.status,
1405                payload.processed_sessions as i64,
1406                payload.total_sessions as i64,
1407                payload.failed_sessions as i64,
1408                payload.message,
1409                payload.started_at,
1410                payload.finished_at,
1411            ],
1412        )?;
1413        Ok(())
1414    }
1415
1416    pub fn get_summary_batch_job(&self) -> Result<Option<SummaryBatchJobRow>> {
1417        let row = self
1418            .conn()
1419            .query_row(
1420                "SELECT status, processed_sessions, total_sessions, failed_sessions, message, started_at, finished_at \
1421                 FROM summary_batch_jobs WHERE id = 1 LIMIT 1",
1422                [],
1423                |row| {
1424                    Ok(SummaryBatchJobRow {
1425                        status: row.get(0)?,
1426                        processed_sessions: row.get::<_, i64>(1)?.max(0) as u32,
1427                        total_sessions: row.get::<_, i64>(2)?.max(0) as u32,
1428                        failed_sessions: row.get::<_, i64>(3)?.max(0) as u32,
1429                        message: row.get(4)?,
1430                        started_at: row.get(5)?,
1431                        finished_at: row.get(6)?,
1432                    })
1433                },
1434            )
1435            .optional()?;
1436        Ok(row)
1437    }
1438
1439    // ── Sync cursor ────────────────────────────────────────────────────
1440
1441    pub fn get_sync_cursor(&self, team_id: &str) -> Result<Option<String>> {
1442        let cursor = self
1443            .conn()
1444            .query_row(
1445                "SELECT cursor FROM sync_cursors WHERE team_id = ?1",
1446                params![team_id],
1447                |row| row.get(0),
1448            )
1449            .optional()?;
1450        Ok(cursor)
1451    }
1452
1453    pub fn set_sync_cursor(&self, team_id: &str, cursor: &str) -> Result<()> {
1454        self.conn().execute(
1455            "INSERT INTO sync_cursors (team_id, cursor, updated_at) \
1456             VALUES (?1, ?2, datetime('now')) \
1457             ON CONFLICT(team_id) DO UPDATE SET cursor=excluded.cursor, updated_at=datetime('now')",
1458            params![team_id, cursor],
1459        )?;
1460        Ok(())
1461    }
1462
1463    // ── Upload tracking ────────────────────────────────────────────────
1464
1465    /// Get sessions that are local_only and need to be uploaded.
1466    pub fn pending_uploads(&self, team_id: &str) -> Result<Vec<LocalSessionRow>> {
1467        let sql = format!(
1468            "SELECT {LOCAL_SESSION_COLUMNS} \
1469             FROM sessions s \
1470             INNER JOIN session_sync ss ON ss.session_id = s.id \
1471             LEFT JOIN users u ON u.id = s.user_id \
1472             WHERE ss.sync_status = 'local_only' AND s.team_id = ?1 AND COALESCE(s.is_auxiliary, 0) = 0 \
1473             ORDER BY s.created_at ASC"
1474        );
1475        let conn = self.conn();
1476        let mut stmt = conn.prepare(&sql)?;
1477        let rows = stmt.query_map(params![team_id], row_to_local_session)?;
1478        let mut result = Vec::new();
1479        for row in rows {
1480            result.push(row?);
1481        }
1482        Ok(result)
1483    }
1484
1485    pub fn mark_synced(&self, session_id: &str) -> Result<()> {
1486        self.conn().execute(
1487            "UPDATE session_sync SET sync_status = 'synced', last_synced_at = datetime('now') \
1488             WHERE session_id = ?1",
1489            params![session_id],
1490        )?;
1491        Ok(())
1492    }
1493
1494    /// Check if a session was already uploaded (synced or remote_only) since the given modification time.
1495    pub fn was_uploaded_after(
1496        &self,
1497        source_path: &str,
1498        modified: &chrono::DateTime<chrono::Utc>,
1499    ) -> Result<bool> {
1500        let result: Option<String> = self
1501            .conn()
1502            .query_row(
1503                "SELECT last_synced_at FROM session_sync \
1504                 WHERE source_path = ?1 AND sync_status = 'synced' AND last_synced_at IS NOT NULL",
1505                params![source_path],
1506                |row| row.get(0),
1507            )
1508            .optional()?;
1509
1510        if let Some(synced_at) = result {
1511            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&synced_at) {
1512                return Ok(dt >= *modified);
1513            }
1514        }
1515        Ok(false)
1516    }
1517
1518    // ── Body cache (local read acceleration) ───────────────────────────
1519
1520    pub fn cache_body(&self, session_id: &str, body: &[u8]) -> Result<()> {
1521        self.conn().execute(
1522            "INSERT INTO body_cache (session_id, body, cached_at) \
1523             VALUES (?1, ?2, datetime('now')) \
1524             ON CONFLICT(session_id) DO UPDATE SET body=excluded.body, cached_at=datetime('now')",
1525            params![session_id, body],
1526        )?;
1527        Ok(())
1528    }
1529
1530    pub fn get_cached_body(&self, session_id: &str) -> Result<Option<Vec<u8>>> {
1531        let body = self
1532            .conn()
1533            .query_row(
1534                "SELECT body FROM body_cache WHERE session_id = ?1",
1535                params![session_id],
1536                |row| row.get(0),
1537            )
1538            .optional()?;
1539        Ok(body)
1540    }
1541
1542    /// Find the most recently active session for a given repo path.
1543    /// "Active" means the session's working_directory matches the repo path
1544    /// and was created within the last `since_minutes` minutes.
1545    pub fn find_active_session_for_repo(
1546        &self,
1547        repo_path: &str,
1548        since_minutes: u32,
1549    ) -> Result<Option<LocalSessionRow>> {
1550        let sql = format!(
1551            "SELECT {LOCAL_SESSION_COLUMNS} \
1552             {FROM_CLAUSE} \
1553             WHERE s.working_directory LIKE ?1 \
1554             AND COALESCE(s.is_auxiliary, 0) = 0 \
1555             AND s.created_at >= datetime('now', ?2) \
1556             ORDER BY s.created_at DESC LIMIT 1"
1557        );
1558        let since = format!("-{since_minutes} minutes");
1559        let like = format!("{repo_path}%");
1560        let conn = self.conn();
1561        let mut stmt = conn.prepare(&sql)?;
1562        let row = stmt
1563            .query_map(params![like, since], row_to_local_session)?
1564            .next()
1565            .transpose()?;
1566        Ok(row)
1567    }
1568
1569    /// Get all session IDs currently in the local DB.
1570    pub fn existing_session_ids(&self) -> std::collections::HashSet<String> {
1571        let conn = self.conn();
1572        let mut stmt = conn
1573            .prepare("SELECT id FROM sessions")
1574            .unwrap_or_else(|_| panic!("failed to prepare existing_session_ids query"));
1575        let rows = stmt.query_map([], |row| row.get::<_, String>(0));
1576        let mut set = std::collections::HashSet::new();
1577        if let Ok(rows) = rows {
1578            for row in rows.flatten() {
1579                set.insert(row);
1580            }
1581        }
1582        set
1583    }
1584
1585    /// Update only stats fields for an existing session (no git context re-extraction).
1586    pub fn update_session_stats(&self, session: &Session) -> Result<()> {
1587        let title = session.context.title.as_deref();
1588        let description = session.context.description.as_deref();
1589        let (files_modified, files_read, has_errors) =
1590            opensession_core::extract::extract_file_metadata(session);
1591        let max_active_agents = opensession_core::agent_metrics::max_active_agents(session) as i64;
1592        let is_auxiliary = is_auxiliary_session(session);
1593
1594        self.conn().execute(
1595            "UPDATE sessions SET \
1596             title=?2, description=?3, \
1597             message_count=?4, user_message_count=?5, task_count=?6, \
1598             event_count=?7, duration_seconds=?8, \
1599             total_input_tokens=?9, total_output_tokens=?10, \
1600              files_modified=?11, files_read=?12, has_errors=?13, \
1601             max_active_agents=?14, is_auxiliary=?15 \
1602             WHERE id=?1",
1603            params![
1604                &session.session_id,
1605                title,
1606                description,
1607                session.stats.message_count as i64,
1608                session.stats.user_message_count as i64,
1609                session.stats.task_count as i64,
1610                session.stats.event_count as i64,
1611                session.stats.duration_seconds as i64,
1612                session.stats.total_input_tokens as i64,
1613                session.stats.total_output_tokens as i64,
1614                &files_modified,
1615                &files_read,
1616                has_errors,
1617                max_active_agents,
1618                is_auxiliary as i64,
1619            ],
1620        )?;
1621        Ok(())
1622    }
1623
1624    /// Update only sync metadata path for an existing session.
1625    pub fn set_session_sync_path(&self, session_id: &str, source_path: &str) -> Result<()> {
1626        self.conn().execute(
1627            "INSERT INTO session_sync (session_id, source_path) \
1628             VALUES (?1, ?2) \
1629             ON CONFLICT(session_id) DO UPDATE SET source_path = excluded.source_path",
1630            params![session_id, source_path],
1631        )?;
1632        Ok(())
1633    }
1634
1635    /// Get a list of distinct git repo names present in the DB.
1636    pub fn list_repos(&self) -> Result<Vec<String>> {
1637        let conn = self.conn();
1638        let mut stmt = conn.prepare(
1639            "SELECT DISTINCT git_repo_name FROM sessions \
1640             WHERE git_repo_name IS NOT NULL AND COALESCE(is_auxiliary, 0) = 0 \
1641             ORDER BY git_repo_name ASC",
1642        )?;
1643        let rows = stmt.query_map([], |row| row.get(0))?;
1644        let mut result = Vec::new();
1645        for row in rows {
1646            result.push(row?);
1647        }
1648        Ok(result)
1649    }
1650
1651    /// Get a list of distinct, non-empty working directories present in the DB.
1652    pub fn list_working_directories(&self) -> Result<Vec<String>> {
1653        let conn = self.conn();
1654        let mut stmt = conn.prepare(
1655            "SELECT DISTINCT working_directory FROM sessions \
1656             WHERE working_directory IS NOT NULL AND TRIM(working_directory) <> '' \
1657             AND COALESCE(is_auxiliary, 0) = 0 \
1658             ORDER BY working_directory ASC",
1659        )?;
1660        let rows = stmt.query_map([], |row| row.get(0))?;
1661        let mut result = Vec::new();
1662        for row in rows {
1663            result.push(row?);
1664        }
1665        Ok(result)
1666    }
1667}
1668
1669// ── Schema bootstrap ──────────────────────────────────────────────────
1670
1671fn open_connection_with_latest_schema(path: &PathBuf) -> Result<Connection> {
1672    let conn = Connection::open(path).with_context(|| format!("open db {}", path.display()))?;
1673    conn.execute_batch("PRAGMA journal_mode=WAL;")?;
1674
1675    // Disable FK constraints for local DB (index/cache, not source of truth)
1676    conn.execute_batch("PRAGMA foreign_keys=OFF;")?;
1677
1678    apply_local_migrations(&conn)?;
1679    repair_session_tools_from_source_path(&conn)?;
1680    repair_auxiliary_flags_from_source_path(&conn)?;
1681    validate_local_schema(&conn)?;
1682
1683    Ok(conn)
1684}
1685
1686fn apply_local_migrations(conn: &Connection) -> Result<()> {
1687    conn.execute_batch(
1688        "CREATE TABLE IF NOT EXISTS _migrations (
1689            id INTEGER PRIMARY KEY,
1690            name TEXT NOT NULL UNIQUE,
1691            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
1692        );",
1693    )
1694    .context("create _migrations table for local db")?;
1695
1696    for (name, sql) in MIGRATIONS.iter().chain(LOCAL_MIGRATIONS.iter()) {
1697        let already_applied: bool = conn
1698            .query_row(
1699                "SELECT COUNT(*) > 0 FROM _migrations WHERE name = ?1",
1700                [name],
1701                |row| row.get(0),
1702            )
1703            .unwrap_or(false);
1704
1705        if already_applied {
1706            continue;
1707        }
1708
1709        conn.execute_batch(sql)
1710            .with_context(|| format!("apply local migration {name}"))?;
1711
1712        conn.execute(
1713            "INSERT OR IGNORE INTO _migrations (name) VALUES (?1)",
1714            [name],
1715        )
1716        .with_context(|| format!("record local migration {name}"))?;
1717    }
1718
1719    Ok(())
1720}
1721
1722fn validate_local_schema(conn: &Connection) -> Result<()> {
1723    let sql = format!("SELECT {LOCAL_SESSION_COLUMNS} {FROM_CLAUSE} WHERE 1=0");
1724    conn.prepare(&sql)
1725        .map(|_| ())
1726        .context("validate local session schema")
1727}
1728
1729fn repair_session_tools_from_source_path(conn: &Connection) -> Result<()> {
1730    let mut stmt = conn.prepare(
1731        "SELECT s.id, s.tool, ss.source_path \
1732         FROM sessions s \
1733         LEFT JOIN session_sync ss ON ss.session_id = s.id \
1734         WHERE ss.source_path IS NOT NULL",
1735    )?;
1736    let rows = stmt.query_map([], |row| {
1737        Ok((
1738            row.get::<_, String>(0)?,
1739            row.get::<_, String>(1)?,
1740            row.get::<_, Option<String>>(2)?,
1741        ))
1742    })?;
1743
1744    let mut updates: Vec<(String, String)> = Vec::new();
1745    for row in rows {
1746        let (id, current_tool, source_path) = row?;
1747        let normalized = normalize_tool_for_source_path(&current_tool, source_path.as_deref());
1748        if normalized != current_tool {
1749            updates.push((id, normalized));
1750        }
1751    }
1752    drop(stmt);
1753
1754    for (id, tool) in updates {
1755        conn.execute(
1756            "UPDATE sessions SET tool = ?1 WHERE id = ?2",
1757            params![tool, id],
1758        )?;
1759    }
1760
1761    Ok(())
1762}
1763
1764fn repair_auxiliary_flags_from_source_path(conn: &Connection) -> Result<()> {
1765    let mut stmt = conn.prepare(
1766        "SELECT s.id, ss.source_path \
1767         FROM sessions s \
1768         LEFT JOIN session_sync ss ON ss.session_id = s.id \
1769         WHERE ss.source_path IS NOT NULL \
1770         AND COALESCE(s.is_auxiliary, 0) = 0",
1771    )?;
1772    let rows = stmt.query_map([], |row| {
1773        Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
1774    })?;
1775
1776    let mut updates: Vec<String> = Vec::new();
1777    for row in rows {
1778        let (id, source_path) = row?;
1779        let Some(source_path) = source_path else {
1780            continue;
1781        };
1782        if infer_tool_from_source_path(Some(&source_path)) != Some("codex") {
1783            continue;
1784        }
1785        if is_codex_auxiliary_source_file(&source_path) {
1786            updates.push(id);
1787        }
1788    }
1789    drop(stmt);
1790
1791    for id in updates {
1792        conn.execute(
1793            "UPDATE sessions SET is_auxiliary = 1 WHERE id = ?1",
1794            params![id],
1795        )?;
1796    }
1797
1798    Ok(())
1799}
1800
1801fn is_codex_auxiliary_source_file(source_path: &str) -> bool {
1802    let Ok(file) = fs::File::open(source_path) else {
1803        return false;
1804    };
1805    let reader = BufReader::new(file);
1806    for line in reader.lines().take(32) {
1807        let Ok(raw) = line else {
1808            continue;
1809        };
1810        let line = raw.trim();
1811        if line.is_empty() {
1812            continue;
1813        }
1814
1815        if line.contains("\"source\":{\"subagent\"")
1816            || line.contains("\"source\": {\"subagent\"")
1817            || line.contains("\"agent_role\":\"awaiter\"")
1818            || line.contains("\"agent_role\":\"worker\"")
1819            || line.contains("\"agent_role\":\"explorer\"")
1820            || line.contains("\"agent_role\":\"subagent\"")
1821        {
1822            return true;
1823        }
1824
1825        if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line) {
1826            let is_session_meta =
1827                parsed.get("type").and_then(|v| v.as_str()) == Some("session_meta");
1828            let payload = if is_session_meta {
1829                parsed.get("payload")
1830            } else {
1831                Some(&parsed)
1832            };
1833            if let Some(payload) = payload {
1834                if payload.pointer("/source/subagent").is_some() {
1835                    return true;
1836                }
1837                let role = payload
1838                    .get("agent_role")
1839                    .and_then(|v| v.as_str())
1840                    .map(str::to_ascii_lowercase);
1841                if matches!(
1842                    role.as_deref(),
1843                    Some("awaiter") | Some("worker") | Some("explorer") | Some("subagent")
1844                ) {
1845                    return true;
1846                }
1847            }
1848        }
1849    }
1850    false
1851}
1852
1853/// Column list for SELECT queries against sessions + session_sync + users.
1854pub const LOCAL_SESSION_COLUMNS: &str = "\
1855s.id, ss.source_path, COALESCE(ss.sync_status, 'unknown') AS sync_status, ss.last_synced_at, \
1856s.user_id, u.nickname, s.team_id, s.tool, s.agent_provider, s.agent_model, \
1857s.title, s.description, s.tags, s.created_at, s.uploaded_at, \
1858s.message_count, COALESCE(s.user_message_count, 0), s.task_count, s.event_count, s.duration_seconds, \
1859s.total_input_tokens, s.total_output_tokens, \
1860s.git_remote, s.git_branch, s.git_commit, s.git_repo_name, \
1861s.pr_number, s.pr_url, s.working_directory, \
1862s.files_modified, s.files_read, s.has_errors, COALESCE(s.max_active_agents, 1), COALESCE(s.is_auxiliary, 0)";
1863
1864fn row_to_local_session(row: &rusqlite::Row) -> rusqlite::Result<LocalSessionRow> {
1865    let source_path: Option<String> = row.get(1)?;
1866    let tool: String = row.get(7)?;
1867    let normalized_tool = normalize_tool_for_source_path(&tool, source_path.as_deref());
1868
1869    Ok(LocalSessionRow {
1870        id: row.get(0)?,
1871        source_path,
1872        sync_status: row.get(2)?,
1873        last_synced_at: row.get(3)?,
1874        user_id: row.get(4)?,
1875        nickname: row.get(5)?,
1876        team_id: row.get(6)?,
1877        tool: normalized_tool,
1878        agent_provider: row.get(8)?,
1879        agent_model: row.get(9)?,
1880        title: row.get(10)?,
1881        description: row.get(11)?,
1882        tags: row.get(12)?,
1883        created_at: row.get(13)?,
1884        uploaded_at: row.get(14)?,
1885        message_count: row.get(15)?,
1886        user_message_count: row.get(16)?,
1887        task_count: row.get(17)?,
1888        event_count: row.get(18)?,
1889        duration_seconds: row.get(19)?,
1890        total_input_tokens: row.get(20)?,
1891        total_output_tokens: row.get(21)?,
1892        git_remote: row.get(22)?,
1893        git_branch: row.get(23)?,
1894        git_commit: row.get(24)?,
1895        git_repo_name: row.get(25)?,
1896        pr_number: row.get(26)?,
1897        pr_url: row.get(27)?,
1898        working_directory: row.get(28)?,
1899        files_modified: row.get(29)?,
1900        files_read: row.get(30)?,
1901        has_errors: row.get::<_, i64>(31).unwrap_or(0) != 0,
1902        max_active_agents: row.get(32).unwrap_or(1),
1903        is_auxiliary: row.get::<_, i64>(33).unwrap_or(0) != 0,
1904    })
1905}
1906
1907fn default_db_path() -> Result<PathBuf> {
1908    let home = std::env::var("HOME")
1909        .or_else(|_| std::env::var("USERPROFILE"))
1910        .context("Could not determine home directory")?;
1911    Ok(PathBuf::from(home)
1912        .join(".local")
1913        .join("share")
1914        .join("opensession")
1915        .join("local.db"))
1916}
1917
1918#[cfg(test)]
1919mod tests {
1920    use super::*;
1921
1922    use std::fs::{create_dir_all, write};
1923    use tempfile::tempdir;
1924
1925    fn test_db() -> LocalDb {
1926        let dir = tempdir().unwrap();
1927        let path = dir.keep().join("test.db");
1928        LocalDb::open_path(&path).unwrap()
1929    }
1930
1931    #[test]
1932    fn test_open_and_schema() {
1933        let _db = test_db();
1934    }
1935
1936    #[test]
1937    fn test_open_repairs_codex_tool_hint_from_source_path() {
1938        let dir = tempfile::tempdir().unwrap();
1939        let path = dir.path().join("repair.db");
1940
1941        {
1942            let _ = LocalDb::open_path(&path).unwrap();
1943        }
1944
1945        {
1946            let conn = Connection::open(&path).unwrap();
1947            conn.execute(
1948                "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key) VALUES (?1, 'personal', 'claude-code', ?2, '')",
1949                params!["rollout-repair", "2026-02-20T00:00:00Z"],
1950            )
1951            .unwrap();
1952            conn.execute(
1953                "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
1954                params!["rollout-repair", "/Users/test/.codex/sessions/2026/02/20/rollout-repair.jsonl"],
1955            )
1956            .unwrap();
1957        }
1958
1959        let db = LocalDb::open_path(&path).unwrap();
1960        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
1961        let row = rows
1962            .iter()
1963            .find(|row| row.id == "rollout-repair")
1964            .expect("repaired row");
1965        assert_eq!(row.tool, "codex");
1966    }
1967
1968    #[test]
1969    fn test_open_repairs_codex_auxiliary_flag_from_source_path() {
1970        let dir = tempfile::tempdir().unwrap();
1971        let path = dir.path().join("repair-auxiliary.db");
1972        let codex_dir = dir
1973            .path()
1974            .join(".codex")
1975            .join("sessions")
1976            .join("2026")
1977            .join("02")
1978            .join("20");
1979        create_dir_all(&codex_dir).unwrap();
1980        let source_path = codex_dir.join("rollout-subagent.jsonl");
1981        write(
1982            &source_path,
1983            r#"{"timestamp":"2026-02-20T00:00:00.000Z","type":"session_meta","payload":{"id":"rollout-subagent","timestamp":"2026-02-20T00:00:00.000Z","cwd":"/tmp","originator":"Codex Desktop","cli_version":"0.105.0","source":{"subagent":{"thread_spawn":{"parent_thread_id":"parent-session-id","depth":1,"agent_role":"awaiter"}}},"agent_role":"awaiter"}}\n"#,
1984        )
1985        .unwrap();
1986
1987        {
1988            let _ = LocalDb::open_path(&path).unwrap();
1989        }
1990
1991        {
1992            let conn = Connection::open(&path).unwrap();
1993            conn.execute(
1994                "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key, is_auxiliary) VALUES (?1, 'personal', 'codex', ?2, '', 0)",
1995                params!["rollout-subagent", "2026-02-20T00:00:00Z"],
1996            )
1997            .unwrap();
1998            conn.execute(
1999                "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
2000                params!["rollout-subagent", source_path.to_string_lossy().to_string()],
2001            )
2002            .unwrap();
2003        }
2004
2005        let db = LocalDb::open_path(&path).unwrap();
2006        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2007        assert!(
2008            rows.iter().all(|row| row.id != "rollout-subagent"),
2009            "auxiliary codex session should be hidden after repair"
2010        );
2011    }
2012
2013    #[test]
2014    fn test_open_repairs_codex_auxiliary_flag_when_session_meta_is_not_first_line() {
2015        let dir = tempfile::tempdir().unwrap();
2016        let path = dir.path().join("repair-auxiliary-shifted.db");
2017        let codex_dir = dir
2018            .path()
2019            .join(".codex")
2020            .join("sessions")
2021            .join("2026")
2022            .join("03")
2023            .join("03");
2024        create_dir_all(&codex_dir).unwrap();
2025        let source_path = codex_dir.join("rollout-subagent-shifted.jsonl");
2026        write(
2027            &source_path,
2028            [
2029                r#"{"timestamp":"2026-03-03T00:00:00.010Z","type":"event_msg","payload":{"type":"agent_message","message":"bootstrap line"}}"#,
2030                r#"{"timestamp":"2026-03-03T00:00:00.020Z","type":"session_meta","payload":{"id":"rollout-subagent-shifted","timestamp":"2026-03-03T00:00:00.000Z","cwd":"/tmp","originator":"Codex Desktop","cli_version":"0.108.0","source":{"subagent":{"thread_spawn":{"parent_thread_id":"parent-session-id","depth":1,"agent_role":"worker"}}},"agent_role":"worker"}}"#,
2031            ]
2032            .join("\n"),
2033        )
2034        .unwrap();
2035
2036        {
2037            let _ = LocalDb::open_path(&path).unwrap();
2038        }
2039
2040        {
2041            let conn = Connection::open(&path).unwrap();
2042            conn.execute(
2043                "INSERT INTO sessions (id, team_id, tool, created_at, body_storage_key, is_auxiliary) VALUES (?1, 'personal', 'codex', ?2, '', 0)",
2044                params!["rollout-subagent-shifted", "2026-03-03T00:00:00Z"],
2045            )
2046            .unwrap();
2047            conn.execute(
2048                "INSERT INTO session_sync (session_id, source_path, sync_status) VALUES (?1, ?2, 'local_only')",
2049                params!["rollout-subagent-shifted", source_path.to_string_lossy().to_string()],
2050            )
2051            .unwrap();
2052        }
2053
2054        let db = LocalDb::open_path(&path).unwrap();
2055        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2056        assert!(
2057            rows.iter().all(|row| row.id != "rollout-subagent-shifted"),
2058            "auxiliary codex session should be hidden after repair even if session_meta is not the first line"
2059        );
2060    }
2061
2062    #[test]
2063    fn test_upsert_local_session_normalizes_tool_from_source_path() {
2064        let db = test_db();
2065        let mut session = Session::new(
2066            "rollout-upsert".to_string(),
2067            opensession_core::trace::Agent {
2068                provider: "openai".to_string(),
2069                model: "gpt-5".to_string(),
2070                tool: "claude-code".to_string(),
2071                tool_version: None,
2072            },
2073        );
2074        session.stats.event_count = 1;
2075
2076        db.upsert_local_session(
2077            &session,
2078            "/Users/test/.codex/sessions/2026/02/20/rollout-upsert.jsonl",
2079            &crate::git::GitContext::default(),
2080        )
2081        .unwrap();
2082
2083        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2084        let row = rows
2085            .iter()
2086            .find(|row| row.id == "rollout-upsert")
2087            .expect("upserted row");
2088        assert_eq!(row.tool, "codex");
2089    }
2090
2091    #[test]
2092    fn test_upsert_local_session_preserves_existing_git_when_session_has_no_git_metadata() {
2093        let db = test_db();
2094        let mut session = Session::new(
2095            "preserve-git".to_string(),
2096            opensession_core::trace::Agent {
2097                provider: "openai".to_string(),
2098                model: "gpt-5".to_string(),
2099                tool: "codex".to_string(),
2100                tool_version: None,
2101            },
2102        );
2103        session.stats.event_count = 1;
2104
2105        let first_git = crate::git::GitContext {
2106            remote: Some("https://github.com/acme/repo.git".to_string()),
2107            branch: Some("feature/original".to_string()),
2108            commit: Some("1111111".to_string()),
2109            repo_name: Some("acme/repo".to_string()),
2110        };
2111        db.upsert_local_session(
2112            &session,
2113            "/Users/test/.codex/sessions/2026/02/20/preserve-git.jsonl",
2114            &first_git,
2115        )
2116        .unwrap();
2117
2118        let second_git = crate::git::GitContext {
2119            remote: Some("https://github.com/acme/repo.git".to_string()),
2120            branch: Some("feature/current-head".to_string()),
2121            commit: Some("2222222".to_string()),
2122            repo_name: Some("acme/repo".to_string()),
2123        };
2124        db.upsert_local_session(
2125            &session,
2126            "/Users/test/.codex/sessions/2026/02/20/preserve-git.jsonl",
2127            &second_git,
2128        )
2129        .unwrap();
2130
2131        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2132        let row = rows
2133            .iter()
2134            .find(|row| row.id == "preserve-git")
2135            .expect("row exists");
2136        assert_eq!(row.git_branch.as_deref(), Some("feature/original"));
2137        assert_eq!(row.git_commit.as_deref(), Some("1111111"));
2138    }
2139
2140    #[test]
2141    fn test_upsert_local_session_prefers_git_branch_from_session_attributes() {
2142        let db = test_db();
2143        let mut session = Session::new(
2144            "session-git-branch".to_string(),
2145            opensession_core::trace::Agent {
2146                provider: "anthropic".to_string(),
2147                model: "claude-opus-4-6".to_string(),
2148                tool: "claude-code".to_string(),
2149                tool_version: None,
2150            },
2151        );
2152        session.stats.event_count = 1;
2153        session.context.attributes.insert(
2154            "git_branch".to_string(),
2155            serde_json::Value::String("from-session".to_string()),
2156        );
2157
2158        let fallback_git = crate::git::GitContext {
2159            remote: Some("https://github.com/acme/repo.git".to_string()),
2160            branch: Some("fallback-branch".to_string()),
2161            commit: Some("aaaaaaaa".to_string()),
2162            repo_name: Some("acme/repo".to_string()),
2163        };
2164        db.upsert_local_session(
2165            &session,
2166            "/Users/test/.claude/projects/foo/session-git-branch.jsonl",
2167            &fallback_git,
2168        )
2169        .unwrap();
2170
2171        session.context.attributes.insert(
2172            "git_branch".to_string(),
2173            serde_json::Value::String("from-session-updated".to_string()),
2174        );
2175        db.upsert_local_session(
2176            &session,
2177            "/Users/test/.claude/projects/foo/session-git-branch.jsonl",
2178            &fallback_git,
2179        )
2180        .unwrap();
2181
2182        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2183        let row = rows
2184            .iter()
2185            .find(|row| row.id == "session-git-branch")
2186            .expect("row exists");
2187        assert_eq!(row.git_branch.as_deref(), Some("from-session-updated"));
2188    }
2189
2190    #[test]
2191    fn test_upsert_local_session_marks_parented_sessions_auxiliary() {
2192        let db = test_db();
2193        let mut session = Session::new(
2194            "aux-upsert".to_string(),
2195            opensession_core::trace::Agent {
2196                provider: "openai".to_string(),
2197                model: "gpt-5".to_string(),
2198                tool: "opencode".to_string(),
2199                tool_version: None,
2200            },
2201        );
2202        session.stats.event_count = 1;
2203        session.context.attributes.insert(
2204            opensession_core::session::ATTR_PARENT_SESSION_ID.to_string(),
2205            serde_json::Value::String("parent-session".to_string()),
2206        );
2207
2208        db.upsert_local_session(
2209            &session,
2210            "/Users/test/.opencode/storage/session/project/aux-upsert.json",
2211            &crate::git::GitContext::default(),
2212        )
2213        .unwrap();
2214
2215        let is_auxiliary: i64 = db
2216            .conn()
2217            .query_row(
2218                "SELECT is_auxiliary FROM sessions WHERE id = ?1",
2219                params!["aux-upsert"],
2220                |row| row.get(0),
2221            )
2222            .unwrap();
2223        assert_eq!(is_auxiliary, 1);
2224
2225        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2226        assert!(
2227            rows.iter().all(|row| row.id != "aux-upsert"),
2228            "auxiliary sessions should be hidden from default listing"
2229        );
2230    }
2231
2232    #[test]
2233    fn test_upsert_local_session_primary_role_overrides_parent_link() {
2234        let db = test_db();
2235        let mut session = Session::new(
2236            "primary-override".to_string(),
2237            opensession_core::trace::Agent {
2238                provider: "openai".to_string(),
2239                model: "gpt-5".to_string(),
2240                tool: "opencode".to_string(),
2241                tool_version: None,
2242            },
2243        );
2244        session.stats.event_count = 1;
2245        session.context.attributes.insert(
2246            opensession_core::session::ATTR_PARENT_SESSION_ID.to_string(),
2247            serde_json::Value::String("parent-session".to_string()),
2248        );
2249        session.context.attributes.insert(
2250            opensession_core::session::ATTR_SESSION_ROLE.to_string(),
2251            serde_json::Value::String("primary".to_string()),
2252        );
2253
2254        db.upsert_local_session(
2255            &session,
2256            "/Users/test/.opencode/storage/session/project/primary-override.json",
2257            &crate::git::GitContext::default(),
2258        )
2259        .unwrap();
2260
2261        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2262        let row = rows
2263            .iter()
2264            .find(|row| row.id == "primary-override")
2265            .expect("session with explicit primary role should stay visible");
2266        assert!(!row.is_auxiliary);
2267    }
2268
2269    #[test]
2270    fn test_upsert_local_session_skips_empty_signal_rows() {
2271        let db = test_db();
2272        let session = Session::new(
2273            "empty-signal-local".to_string(),
2274            opensession_core::trace::Agent {
2275                provider: "sourcegraph".to_string(),
2276                model: "amp-model".to_string(),
2277                tool: "amp".to_string(),
2278                tool_version: None,
2279            },
2280        );
2281
2282        db.upsert_local_session(
2283            &session,
2284            "/Users/test/.local/share/amp/threads/T-empty-signal-local.json",
2285            &crate::git::GitContext::default(),
2286        )
2287        .expect("upsert empty-signal session should not fail");
2288
2289        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2290        assert!(
2291            rows.iter().all(|row| row.id != "empty-signal-local"),
2292            "empty-signal local sessions must not be listed",
2293        );
2294    }
2295
2296    #[test]
2297    fn test_upsert_local_session_empty_signal_deletes_existing_row() {
2298        let db = test_db();
2299        let mut populated = Session::new(
2300            "empty-signal-replace".to_string(),
2301            opensession_core::trace::Agent {
2302                provider: "sourcegraph".to_string(),
2303                model: "amp-model".to_string(),
2304                tool: "amp".to_string(),
2305                tool_version: None,
2306            },
2307        );
2308        populated.stats.event_count = 2;
2309        populated.stats.message_count = 1;
2310        populated.stats.user_message_count = 1;
2311
2312        db.upsert_local_session(
2313            &populated,
2314            "/Users/test/.local/share/amp/threads/T-empty-signal-replace.json",
2315            &crate::git::GitContext::default(),
2316        )
2317        .expect("seed populated row");
2318        assert!(db
2319            .get_session_by_id("empty-signal-replace")
2320            .unwrap()
2321            .is_some());
2322
2323        let empty = Session::new(
2324            "empty-signal-replace".to_string(),
2325            opensession_core::trace::Agent {
2326                provider: "sourcegraph".to_string(),
2327                model: "amp-model".to_string(),
2328                tool: "amp".to_string(),
2329                tool_version: None,
2330            },
2331        );
2332        db.upsert_local_session(
2333            &empty,
2334            "/Users/test/.local/share/amp/threads/T-empty-signal-replace.json",
2335            &crate::git::GitContext::default(),
2336        )
2337        .expect("upsert empty-signal replacement");
2338
2339        assert!(
2340            db.get_session_by_id("empty-signal-replace")
2341                .unwrap()
2342                .is_none(),
2343            "existing local row must be removed when source becomes empty-signal",
2344        );
2345    }
2346
2347    #[test]
2348    fn test_list_sessions_hides_codex_summary_worker_titles() {
2349        let db = test_db();
2350        let mut codex_summary_worker = Session::new(
2351            "codex-summary-worker".to_string(),
2352            opensession_core::trace::Agent {
2353                provider: "openai".to_string(),
2354                model: "gpt-5".to_string(),
2355                tool: "codex".to_string(),
2356                tool_version: None,
2357            },
2358        );
2359        codex_summary_worker.context.title = Some(
2360            "Convert a real coding session into semantic compression. Pipeline: ...".to_string(),
2361        );
2362        codex_summary_worker.stats.event_count = 2;
2363        codex_summary_worker.stats.message_count = 1;
2364
2365        db.upsert_local_session(
2366            &codex_summary_worker,
2367            "/Users/test/.codex/sessions/2026/03/05/summary-worker.jsonl",
2368            &crate::git::GitContext::default(),
2369        )
2370        .expect("upsert codex summary worker session");
2371
2372        let mut non_codex_same_title = Session::new(
2373            "claude-similar-title".to_string(),
2374            opensession_core::trace::Agent {
2375                provider: "anthropic".to_string(),
2376                model: "claude-opus-4-6".to_string(),
2377                tool: "claude-code".to_string(),
2378                tool_version: None,
2379            },
2380        );
2381        non_codex_same_title.context.title = Some(
2382            "Convert a real coding session into semantic compression. Pipeline: ...".to_string(),
2383        );
2384        non_codex_same_title.stats.event_count = 2;
2385        non_codex_same_title.stats.message_count = 1;
2386
2387        db.upsert_local_session(
2388            &non_codex_same_title,
2389            "/Users/test/.claude/projects/p1/claude-similar-title.jsonl",
2390            &crate::git::GitContext::default(),
2391        )
2392        .expect("upsert non-codex session");
2393
2394        let rows = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2395        assert!(
2396            rows.iter().all(|row| row.id != "codex-summary-worker"),
2397            "codex summary worker sessions should be hidden from default listing"
2398        );
2399        assert!(
2400            rows.iter().any(|row| row.id == "claude-similar-title"),
2401            "non-codex sessions must remain visible even with similar title"
2402        );
2403
2404        let count = db
2405            .count_sessions_filtered(&LocalSessionFilter::default())
2406            .unwrap();
2407        assert_eq!(count, 1);
2408    }
2409
2410    #[test]
2411    fn test_sync_cursor() {
2412        let db = test_db();
2413        assert_eq!(db.get_sync_cursor("team1").unwrap(), None);
2414        db.set_sync_cursor("team1", "2024-01-01T00:00:00Z").unwrap();
2415        assert_eq!(
2416            db.get_sync_cursor("team1").unwrap(),
2417            Some("2024-01-01T00:00:00Z".to_string())
2418        );
2419        // Update
2420        db.set_sync_cursor("team1", "2024-06-01T00:00:00Z").unwrap();
2421        assert_eq!(
2422            db.get_sync_cursor("team1").unwrap(),
2423            Some("2024-06-01T00:00:00Z".to_string())
2424        );
2425    }
2426
2427    #[test]
2428    fn test_list_session_source_paths_returns_non_empty_paths_only() {
2429        let db = test_db();
2430        let mut s1 = Session::new(
2431            "source-path-1".to_string(),
2432            opensession_core::trace::Agent {
2433                provider: "openai".to_string(),
2434                model: "gpt-5".to_string(),
2435                tool: "codex".to_string(),
2436                tool_version: None,
2437            },
2438        );
2439        s1.stats.event_count = 1;
2440        db.upsert_local_session(
2441            &s1,
2442            "/tmp/source-path-1.jsonl",
2443            &crate::git::GitContext::default(),
2444        )
2445        .expect("upsert first session");
2446
2447        let mut s2 = Session::new(
2448            "source-path-2".to_string(),
2449            opensession_core::trace::Agent {
2450                provider: "openai".to_string(),
2451                model: "gpt-5".to_string(),
2452                tool: "codex".to_string(),
2453                tool_version: None,
2454            },
2455        );
2456        s2.stats.event_count = 1;
2457        db.upsert_local_session(&s2, "", &crate::git::GitContext::default())
2458            .expect("upsert second session");
2459
2460        let paths = db
2461            .list_session_source_paths()
2462            .expect("list source paths should work");
2463        assert!(paths
2464            .iter()
2465            .any(|(id, path)| id == "source-path-1" && path == "/tmp/source-path-1.jsonl"));
2466        assert!(paths.iter().all(|(id, _)| id != "source-path-2"));
2467    }
2468
2469    #[test]
2470    fn test_body_cache() {
2471        let db = test_db();
2472        assert_eq!(db.get_cached_body("s1").unwrap(), None);
2473        db.cache_body("s1", b"hello world").unwrap();
2474        assert_eq!(
2475            db.get_cached_body("s1").unwrap(),
2476            Some(b"hello world".to_vec())
2477        );
2478    }
2479
2480    #[test]
2481    fn test_get_session_by_id_and_list_session_links() {
2482        let db = test_db();
2483        db.upsert_remote_session(&make_summary(
2484            "parent-session",
2485            "codex",
2486            "Parent session",
2487            "2024-01-01T00:00:00Z",
2488        ))
2489        .unwrap();
2490        db.upsert_remote_session(&make_summary(
2491            "child-session",
2492            "codex",
2493            "Child session",
2494            "2024-01-01T01:00:00Z",
2495        ))
2496        .unwrap();
2497
2498        db.conn()
2499            .execute(
2500                "INSERT INTO session_links (session_id, linked_session_id, link_type, created_at) VALUES (?1, ?2, ?3, ?4)",
2501                params!["parent-session", "child-session", "handoff", "2024-01-01T01:00:00Z"],
2502            )
2503            .unwrap();
2504
2505        let parent = db
2506            .get_session_by_id("parent-session")
2507            .unwrap()
2508            .expect("session should exist");
2509        assert_eq!(parent.id, "parent-session");
2510        assert_eq!(parent.title.as_deref(), Some("Parent session"));
2511
2512        let links = db.list_session_links("parent-session").unwrap();
2513        assert_eq!(links.len(), 1);
2514        assert_eq!(links[0].session_id, "parent-session");
2515        assert_eq!(links[0].linked_session_id, "child-session");
2516        assert_eq!(links[0].link_type, "handoff");
2517    }
2518
2519    #[test]
2520    fn test_local_migrations_are_loaded_from_api_crate() {
2521        let migration_names: Vec<&str> = super::LOCAL_MIGRATIONS
2522            .iter()
2523            .map(|(name, _)| *name)
2524            .collect();
2525        assert!(
2526            migration_names.contains(&"local_0001_schema"),
2527            "expected local_0001_schema migration from opensession-api"
2528        );
2529        assert!(
2530            migration_names.contains(&"local_0002_session_summaries"),
2531            "expected local_0002_session_summaries migration from opensession-api"
2532        );
2533        assert!(
2534            migration_names.contains(&"local_0003_vector_index"),
2535            "expected local_0003_vector_index migration from opensession-api"
2536        );
2537        assert!(
2538            migration_names.contains(&"local_0004_summary_batch_status"),
2539            "expected local_0004_summary_batch_status migration from opensession-api"
2540        );
2541        assert_eq!(
2542            migration_names.len(),
2543            4,
2544            "local schema should include baseline + summary cache + vector index + summary batch status steps"
2545        );
2546
2547        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2548        let migrations_dir = manifest_dir.join("migrations");
2549        if migrations_dir.exists() {
2550            let sql_files = std::fs::read_dir(migrations_dir)
2551                .expect("read local-db migrations directory")
2552                .filter_map(Result::ok)
2553                .map(|entry| entry.file_name().to_string_lossy().to_string())
2554                .filter(|name| name.ends_with(".sql"))
2555                .collect::<Vec<_>>();
2556            assert!(
2557                sql_files.is_empty(),
2558                "local-db must not ship duplicated migration SQL files"
2559            );
2560        }
2561    }
2562
2563    #[test]
2564    fn test_local_schema_bootstrap_includes_is_auxiliary_column() {
2565        let dir = tempdir().unwrap();
2566        let path = dir.path().join("local.db");
2567        let db = LocalDb::open_path(&path).unwrap();
2568        let conn = db.conn();
2569        let mut stmt = conn.prepare("PRAGMA table_info(sessions)").unwrap();
2570        let columns = stmt
2571            .query_map([], |row| row.get::<_, String>(1))
2572            .unwrap()
2573            .collect::<std::result::Result<Vec<_>, _>>()
2574            .unwrap();
2575        assert!(
2576            columns.iter().any(|name| name == "is_auxiliary"),
2577            "sessions schema must include is_auxiliary column in bootstrap migration"
2578        );
2579    }
2580
2581    #[test]
2582    fn test_upsert_remote_session() {
2583        let db = test_db();
2584        let summary = RemoteSessionSummary {
2585            id: "remote-1".to_string(),
2586            user_id: Some("u1".to_string()),
2587            nickname: Some("alice".to_string()),
2588            team_id: "t1".to_string(),
2589            tool: "claude-code".to_string(),
2590            agent_provider: None,
2591            agent_model: None,
2592            title: Some("Test session".to_string()),
2593            description: None,
2594            tags: None,
2595            created_at: "2024-01-01T00:00:00Z".to_string(),
2596            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2597            message_count: 10,
2598            task_count: 2,
2599            event_count: 20,
2600            duration_seconds: 300,
2601            total_input_tokens: 1000,
2602            total_output_tokens: 500,
2603            git_remote: None,
2604            git_branch: None,
2605            git_commit: None,
2606            git_repo_name: None,
2607            pr_number: None,
2608            pr_url: None,
2609            working_directory: None,
2610            files_modified: None,
2611            files_read: None,
2612            has_errors: false,
2613            max_active_agents: 1,
2614        };
2615        db.upsert_remote_session(&summary).unwrap();
2616
2617        let sessions = db.list_sessions(&LocalSessionFilter::default()).unwrap();
2618        assert_eq!(sessions.len(), 1);
2619        assert_eq!(sessions[0].id, "remote-1");
2620        assert_eq!(sessions[0].sync_status, "remote_only");
2621        assert_eq!(sessions[0].nickname, None); // no user in local users table
2622        assert!(!sessions[0].is_auxiliary);
2623    }
2624
2625    #[test]
2626    fn test_list_filter_by_repo() {
2627        let db = test_db();
2628        // Insert a remote session with team_id
2629        let summary1 = RemoteSessionSummary {
2630            id: "s1".to_string(),
2631            user_id: None,
2632            nickname: None,
2633            team_id: "t1".to_string(),
2634            tool: "claude-code".to_string(),
2635            agent_provider: None,
2636            agent_model: None,
2637            title: Some("Session 1".to_string()),
2638            description: None,
2639            tags: None,
2640            created_at: "2024-01-01T00:00:00Z".to_string(),
2641            uploaded_at: "2024-01-01T01:00:00Z".to_string(),
2642            message_count: 5,
2643            task_count: 0,
2644            event_count: 10,
2645            duration_seconds: 60,
2646            total_input_tokens: 100,
2647            total_output_tokens: 50,
2648            git_remote: None,
2649            git_branch: None,
2650            git_commit: None,
2651            git_repo_name: None,
2652            pr_number: None,
2653            pr_url: None,
2654            working_directory: None,
2655            files_modified: None,
2656            files_read: None,
2657            has_errors: false,
2658            max_active_agents: 1,
2659        };
2660        db.upsert_remote_session(&summary1).unwrap();
2661
2662        // Filter by team
2663        let filter = LocalSessionFilter {
2664            team_id: Some("t1".to_string()),
2665            ..Default::default()
2666        };
2667        assert_eq!(db.list_sessions(&filter).unwrap().len(), 1);
2668
2669        let filter = LocalSessionFilter {
2670            team_id: Some("t999".to_string()),
2671            ..Default::default()
2672        };
2673        assert_eq!(db.list_sessions(&filter).unwrap().len(), 0);
2674    }
2675
2676    // ── Helpers for inserting test sessions ────────────────────────────
2677
2678    fn make_summary(id: &str, tool: &str, title: &str, created_at: &str) -> RemoteSessionSummary {
2679        RemoteSessionSummary {
2680            id: id.to_string(),
2681            user_id: None,
2682            nickname: None,
2683            team_id: "t1".to_string(),
2684            tool: tool.to_string(),
2685            agent_provider: Some("anthropic".to_string()),
2686            agent_model: Some("claude-opus-4-6".to_string()),
2687            title: Some(title.to_string()),
2688            description: None,
2689            tags: None,
2690            created_at: created_at.to_string(),
2691            uploaded_at: created_at.to_string(),
2692            message_count: 5,
2693            task_count: 1,
2694            event_count: 10,
2695            duration_seconds: 300,
2696            total_input_tokens: 1000,
2697            total_output_tokens: 500,
2698            git_remote: None,
2699            git_branch: None,
2700            git_commit: None,
2701            git_repo_name: None,
2702            pr_number: None,
2703            pr_url: None,
2704            working_directory: None,
2705            files_modified: None,
2706            files_read: None,
2707            has_errors: false,
2708            max_active_agents: 1,
2709        }
2710    }
2711
2712    fn seed_sessions(db: &LocalDb) {
2713        // Insert 5 sessions across two tools, ordered by created_at
2714        db.upsert_remote_session(&make_summary(
2715            "s1",
2716            "claude-code",
2717            "First session",
2718            "2024-01-01T00:00:00Z",
2719        ))
2720        .unwrap();
2721        db.upsert_remote_session(&make_summary(
2722            "s2",
2723            "claude-code",
2724            "JWT auth work",
2725            "2024-01-02T00:00:00Z",
2726        ))
2727        .unwrap();
2728        db.upsert_remote_session(&make_summary(
2729            "s3",
2730            "gemini",
2731            "Gemini test",
2732            "2024-01-03T00:00:00Z",
2733        ))
2734        .unwrap();
2735        db.upsert_remote_session(&make_summary(
2736            "s4",
2737            "claude-code",
2738            "Error handling",
2739            "2024-01-04T00:00:00Z",
2740        ))
2741        .unwrap();
2742        db.upsert_remote_session(&make_summary(
2743            "s5",
2744            "claude-code",
2745            "Final polish",
2746            "2024-01-05T00:00:00Z",
2747        ))
2748        .unwrap();
2749    }
2750
2751    // ── list_sessions_log tests ────────────────────────────────────────
2752
2753    #[test]
2754    fn test_log_no_filters() {
2755        let db = test_db();
2756        seed_sessions(&db);
2757        let filter = LogFilter::default();
2758        let results = db.list_sessions_log(&filter).unwrap();
2759        assert_eq!(results.len(), 5);
2760        // Should be ordered by created_at DESC
2761        assert_eq!(results[0].id, "s5");
2762        assert_eq!(results[4].id, "s1");
2763    }
2764
2765    #[test]
2766    fn test_log_filter_by_tool() {
2767        let db = test_db();
2768        seed_sessions(&db);
2769        let filter = LogFilter {
2770            tool: Some("claude-code".to_string()),
2771            ..Default::default()
2772        };
2773        let results = db.list_sessions_log(&filter).unwrap();
2774        assert_eq!(results.len(), 4);
2775        assert!(results.iter().all(|s| s.tool == "claude-code"));
2776    }
2777
2778    #[test]
2779    fn test_log_filter_by_model_wildcard() {
2780        let db = test_db();
2781        seed_sessions(&db);
2782        let filter = LogFilter {
2783            model: Some("claude*".to_string()),
2784            ..Default::default()
2785        };
2786        let results = db.list_sessions_log(&filter).unwrap();
2787        assert_eq!(results.len(), 5); // all have claude-opus model
2788    }
2789
2790    #[test]
2791    fn test_log_filter_since() {
2792        let db = test_db();
2793        seed_sessions(&db);
2794        let filter = LogFilter {
2795            since: Some("2024-01-03T00:00:00Z".to_string()),
2796            ..Default::default()
2797        };
2798        let results = db.list_sessions_log(&filter).unwrap();
2799        assert_eq!(results.len(), 3); // s3, s4, s5
2800    }
2801
2802    #[test]
2803    fn test_log_filter_before() {
2804        let db = test_db();
2805        seed_sessions(&db);
2806        let filter = LogFilter {
2807            before: Some("2024-01-03T00:00:00Z".to_string()),
2808            ..Default::default()
2809        };
2810        let results = db.list_sessions_log(&filter).unwrap();
2811        assert_eq!(results.len(), 2); // s1, s2
2812    }
2813
2814    #[test]
2815    fn test_log_filter_since_and_before() {
2816        let db = test_db();
2817        seed_sessions(&db);
2818        let filter = LogFilter {
2819            since: Some("2024-01-02T00:00:00Z".to_string()),
2820            before: Some("2024-01-04T00:00:00Z".to_string()),
2821            ..Default::default()
2822        };
2823        let results = db.list_sessions_log(&filter).unwrap();
2824        assert_eq!(results.len(), 2); // s2, s3
2825    }
2826
2827    #[test]
2828    fn test_log_filter_grep() {
2829        let db = test_db();
2830        seed_sessions(&db);
2831        let filter = LogFilter {
2832            grep: Some("JWT".to_string()),
2833            ..Default::default()
2834        };
2835        let results = db.list_sessions_log(&filter).unwrap();
2836        assert_eq!(results.len(), 1);
2837        assert_eq!(results[0].id, "s2");
2838    }
2839
2840    #[test]
2841    fn test_log_limit_and_offset() {
2842        let db = test_db();
2843        seed_sessions(&db);
2844        let filter = LogFilter {
2845            limit: Some(2),
2846            offset: Some(1),
2847            ..Default::default()
2848        };
2849        let results = db.list_sessions_log(&filter).unwrap();
2850        assert_eq!(results.len(), 2);
2851        assert_eq!(results[0].id, "s4"); // second most recent
2852        assert_eq!(results[1].id, "s3");
2853    }
2854
2855    #[test]
2856    fn test_log_limit_only() {
2857        let db = test_db();
2858        seed_sessions(&db);
2859        let filter = LogFilter {
2860            limit: Some(3),
2861            ..Default::default()
2862        };
2863        let results = db.list_sessions_log(&filter).unwrap();
2864        assert_eq!(results.len(), 3);
2865    }
2866
2867    #[test]
2868    fn test_list_sessions_limit_offset() {
2869        let db = test_db();
2870        seed_sessions(&db);
2871        let filter = LocalSessionFilter {
2872            limit: Some(2),
2873            offset: Some(1),
2874            ..Default::default()
2875        };
2876        let results = db.list_sessions(&filter).unwrap();
2877        assert_eq!(results.len(), 2);
2878        assert_eq!(results[0].id, "s4");
2879        assert_eq!(results[1].id, "s3");
2880    }
2881
2882    #[test]
2883    fn test_count_sessions_filtered() {
2884        let db = test_db();
2885        seed_sessions(&db);
2886        let count = db
2887            .count_sessions_filtered(&LocalSessionFilter::default())
2888            .unwrap();
2889        assert_eq!(count, 5);
2890    }
2891
2892    #[test]
2893    fn test_list_and_count_filters_match_when_auxiliary_rows_exist() {
2894        let db = test_db();
2895        seed_sessions(&db);
2896        db.conn()
2897            .execute(
2898                "UPDATE sessions SET is_auxiliary = 1 WHERE id IN ('s2', 's3')",
2899                [],
2900            )
2901            .unwrap();
2902
2903        let default_filter = LocalSessionFilter::default();
2904        let rows = db.list_sessions(&default_filter).unwrap();
2905        let count = db.count_sessions_filtered(&default_filter).unwrap();
2906        assert_eq!(rows.len() as i64, count);
2907        assert!(rows.iter().all(|row| !row.is_auxiliary));
2908
2909        let gemini_filter = LocalSessionFilter {
2910            tool: Some("gemini".to_string()),
2911            ..Default::default()
2912        };
2913        let gemini_rows = db.list_sessions(&gemini_filter).unwrap();
2914        let gemini_count = db.count_sessions_filtered(&gemini_filter).unwrap();
2915        assert_eq!(gemini_rows.len() as i64, gemini_count);
2916        assert!(gemini_rows.is_empty());
2917        assert_eq!(gemini_count, 0);
2918    }
2919
2920    #[test]
2921    fn test_exclude_low_signal_filter_hides_metadata_only_sessions() {
2922        let db = test_db();
2923
2924        let mut low_signal = make_summary("meta-only", "claude-code", "", "2024-01-01T00:00:00Z");
2925        low_signal.title = None;
2926        low_signal.message_count = 0;
2927        low_signal.task_count = 0;
2928        low_signal.event_count = 2;
2929        low_signal.git_repo_name = Some("frontend/aviss-react-front".to_string());
2930
2931        let mut normal = make_summary(
2932            "real-work",
2933            "opencode",
2934            "Socket.IO decision",
2935            "2024-01-02T00:00:00Z",
2936        );
2937        normal.message_count = 14;
2938        normal.task_count = 2;
2939        normal.event_count = 38;
2940        normal.git_repo_name = Some("frontend/aviss-react-front".to_string());
2941
2942        db.upsert_remote_session(&low_signal).unwrap();
2943        db.upsert_remote_session(&normal).unwrap();
2944
2945        let default_filter = LocalSessionFilter {
2946            git_repo_name: Some("frontend/aviss-react-front".to_string()),
2947            ..Default::default()
2948        };
2949        assert_eq!(db.list_sessions(&default_filter).unwrap().len(), 2);
2950        assert_eq!(db.count_sessions_filtered(&default_filter).unwrap(), 2);
2951
2952        let repo_filter = LocalSessionFilter {
2953            git_repo_name: Some("frontend/aviss-react-front".to_string()),
2954            exclude_low_signal: true,
2955            ..Default::default()
2956        };
2957        let rows = db.list_sessions(&repo_filter).unwrap();
2958        assert_eq!(rows.len(), 1);
2959        assert_eq!(rows[0].id, "real-work");
2960        assert_eq!(db.count_sessions_filtered(&repo_filter).unwrap(), 1);
2961    }
2962
2963    #[test]
2964    fn test_list_working_directories_distinct_non_empty() {
2965        let db = test_db();
2966
2967        let mut a = make_summary("wd-1", "claude-code", "One", "2024-01-01T00:00:00Z");
2968        a.working_directory = Some("/tmp/repo-a".to_string());
2969        let mut b = make_summary("wd-2", "claude-code", "Two", "2024-01-02T00:00:00Z");
2970        b.working_directory = Some("/tmp/repo-a".to_string());
2971        let mut c = make_summary("wd-3", "claude-code", "Three", "2024-01-03T00:00:00Z");
2972        c.working_directory = Some("/tmp/repo-b".to_string());
2973        let mut d = make_summary("wd-4", "claude-code", "Four", "2024-01-04T00:00:00Z");
2974        d.working_directory = Some("".to_string());
2975
2976        db.upsert_remote_session(&a).unwrap();
2977        db.upsert_remote_session(&b).unwrap();
2978        db.upsert_remote_session(&c).unwrap();
2979        db.upsert_remote_session(&d).unwrap();
2980
2981        let dirs = db.list_working_directories().unwrap();
2982        assert_eq!(
2983            dirs,
2984            vec!["/tmp/repo-a".to_string(), "/tmp/repo-b".to_string()]
2985        );
2986    }
2987
2988    #[test]
2989    fn test_list_session_tools() {
2990        let db = test_db();
2991        seed_sessions(&db);
2992        let tools = db
2993            .list_session_tools(&LocalSessionFilter::default())
2994            .unwrap();
2995        assert_eq!(tools, vec!["claude-code".to_string(), "gemini".to_string()]);
2996    }
2997
2998    #[test]
2999    fn test_log_combined_filters() {
3000        let db = test_db();
3001        seed_sessions(&db);
3002        let filter = LogFilter {
3003            tool: Some("claude-code".to_string()),
3004            since: Some("2024-01-03T00:00:00Z".to_string()),
3005            limit: Some(1),
3006            ..Default::default()
3007        };
3008        let results = db.list_sessions_log(&filter).unwrap();
3009        assert_eq!(results.len(), 1);
3010        assert_eq!(results[0].id, "s5"); // most recent claude-code after Jan 3
3011    }
3012
3013    // ── Session offset/latest tests ────────────────────────────────────
3014
3015    #[test]
3016    fn test_get_session_by_offset() {
3017        let db = test_db();
3018        seed_sessions(&db);
3019        let row = db.get_session_by_offset(0).unwrap().unwrap();
3020        assert_eq!(row.id, "s5"); // most recent
3021        let row = db.get_session_by_offset(2).unwrap().unwrap();
3022        assert_eq!(row.id, "s3");
3023        assert!(db.get_session_by_offset(10).unwrap().is_none());
3024    }
3025
3026    #[test]
3027    fn test_get_session_by_tool_offset() {
3028        let db = test_db();
3029        seed_sessions(&db);
3030        let row = db
3031            .get_session_by_tool_offset("claude-code", 0)
3032            .unwrap()
3033            .unwrap();
3034        assert_eq!(row.id, "s5");
3035        let row = db
3036            .get_session_by_tool_offset("claude-code", 1)
3037            .unwrap()
3038            .unwrap();
3039        assert_eq!(row.id, "s4");
3040        let row = db.get_session_by_tool_offset("gemini", 0).unwrap().unwrap();
3041        assert_eq!(row.id, "s3");
3042        assert!(db
3043            .get_session_by_tool_offset("gemini", 1)
3044            .unwrap()
3045            .is_none());
3046    }
3047
3048    #[test]
3049    fn test_get_sessions_latest() {
3050        let db = test_db();
3051        seed_sessions(&db);
3052        let rows = db.get_sessions_latest(3).unwrap();
3053        assert_eq!(rows.len(), 3);
3054        assert_eq!(rows[0].id, "s5");
3055        assert_eq!(rows[1].id, "s4");
3056        assert_eq!(rows[2].id, "s3");
3057    }
3058
3059    #[test]
3060    fn test_get_sessions_by_tool_latest() {
3061        let db = test_db();
3062        seed_sessions(&db);
3063        let rows = db.get_sessions_by_tool_latest("claude-code", 2).unwrap();
3064        assert_eq!(rows.len(), 2);
3065        assert_eq!(rows[0].id, "s5");
3066        assert_eq!(rows[1].id, "s4");
3067    }
3068
3069    #[test]
3070    fn test_get_sessions_latest_more_than_available() {
3071        let db = test_db();
3072        seed_sessions(&db);
3073        let rows = db.get_sessions_by_tool_latest("gemini", 10).unwrap();
3074        assert_eq!(rows.len(), 1); // only 1 gemini session
3075    }
3076
3077    #[test]
3078    fn test_upsert_and_get_session_semantic_summary() {
3079        let db = test_db();
3080        seed_sessions(&db);
3081
3082        db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3083            session_id: "s1",
3084            summary_json: r#"{"changes":"updated files","auth_security":"none detected","layer_file_changes":[]}"#,
3085            generated_at: "2026-03-04T10:00:00Z",
3086            provider: "codex_exec",
3087            model: Some("gpt-5"),
3088            source_kind: "session_signals",
3089            generation_kind: "provider",
3090            prompt_fingerprint: Some("abc123"),
3091            source_details_json: Some(r#"{"source":"session"}"#),
3092            diff_tree_json: Some(r#"[]"#),
3093            error: None,
3094        })
3095        .expect("upsert semantic summary");
3096
3097        let row = db
3098            .get_session_semantic_summary("s1")
3099            .expect("query semantic summary")
3100            .expect("summary row exists");
3101        assert_eq!(row.session_id, "s1");
3102        assert_eq!(row.provider, "codex_exec");
3103        assert_eq!(row.model.as_deref(), Some("gpt-5"));
3104        assert_eq!(row.source_kind, "session_signals");
3105        assert_eq!(row.generation_kind, "provider");
3106        assert_eq!(row.prompt_fingerprint.as_deref(), Some("abc123"));
3107        assert!(row.error.is_none());
3108    }
3109
3110    #[test]
3111    fn test_delete_session_removes_semantic_summary_row() {
3112        let db = test_db();
3113        seed_sessions(&db);
3114
3115        db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3116            session_id: "s1",
3117            summary_json: r#"{"changes":"updated files","auth_security":"none detected","layer_file_changes":[]}"#,
3118            generated_at: "2026-03-04T10:00:00Z",
3119            provider: "heuristic",
3120            model: None,
3121            source_kind: "heuristic",
3122            generation_kind: "heuristic_fallback",
3123            prompt_fingerprint: None,
3124            source_details_json: None,
3125            diff_tree_json: None,
3126            error: Some("provider disabled"),
3127        })
3128        .expect("upsert semantic summary");
3129
3130        db.delete_session("s1").expect("delete session");
3131
3132        let missing = db
3133            .get_session_semantic_summary("s1")
3134            .expect("query semantic summary");
3135        assert!(missing.is_none());
3136    }
3137
3138    #[test]
3139    fn test_delete_session_removes_session_links_bidirectionally() {
3140        let db = test_db();
3141        seed_sessions(&db);
3142
3143        db.conn()
3144            .execute(
3145                "INSERT INTO session_links (session_id, linked_session_id, link_type, created_at) \
3146                 VALUES (?1, ?2, 'handoff', datetime('now'))",
3147                params!["s1", "s2"],
3148            )
3149            .expect("insert forward link");
3150        db.conn()
3151            .execute(
3152                "INSERT INTO session_links (session_id, linked_session_id, link_type, created_at) \
3153                 VALUES (?1, ?2, 'related', datetime('now'))",
3154                params!["s3", "s1"],
3155            )
3156            .expect("insert reverse link");
3157
3158        db.delete_session("s1").expect("delete root session");
3159
3160        let remaining: i64 = db
3161            .conn()
3162            .query_row(
3163                "SELECT COUNT(*) FROM session_links WHERE session_id = ?1 OR linked_session_id = ?1",
3164                params!["s1"],
3165                |row| row.get(0),
3166            )
3167            .expect("count linked rows");
3168        assert_eq!(remaining, 0);
3169    }
3170
3171    #[test]
3172    fn test_delete_expired_session_summaries_uses_generated_at_ttl() {
3173        let db = test_db();
3174        seed_sessions(&db);
3175
3176        db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3177            session_id: "s1",
3178            summary_json: r#"{"changes":"old"}"#,
3179            generated_at: "2020-01-01T00:00:00Z",
3180            provider: "codex_exec",
3181            model: None,
3182            source_kind: "session_signals",
3183            generation_kind: "provider",
3184            prompt_fingerprint: None,
3185            source_details_json: None,
3186            diff_tree_json: None,
3187            error: None,
3188        })
3189        .expect("upsert old summary");
3190        db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3191            session_id: "s2",
3192            summary_json: r#"{"changes":"new"}"#,
3193            generated_at: "2999-01-01T00:00:00Z",
3194            provider: "codex_exec",
3195            model: None,
3196            source_kind: "session_signals",
3197            generation_kind: "provider",
3198            prompt_fingerprint: None,
3199            source_details_json: None,
3200            diff_tree_json: None,
3201            error: None,
3202        })
3203        .expect("upsert new summary");
3204
3205        let deleted = db
3206            .delete_expired_session_summaries(30)
3207            .expect("delete expired summaries");
3208        assert_eq!(deleted, 1);
3209        assert!(db
3210            .get_session_semantic_summary("s1")
3211            .expect("query old summary")
3212            .is_none());
3213        assert!(db
3214            .get_session_semantic_summary("s2")
3215            .expect("query new summary")
3216            .is_some());
3217    }
3218
3219    #[test]
3220    fn test_list_all_session_ids_returns_sorted_ids() {
3221        let db = test_db();
3222        seed_sessions(&db);
3223
3224        let ids = db.list_all_session_ids().expect("list all session ids");
3225        assert_eq!(ids, vec!["s1", "s2", "s3", "s4", "s5"]);
3226    }
3227
3228    #[test]
3229    fn test_list_session_semantic_summary_ids_returns_sorted_ids() {
3230        let db = test_db();
3231        seed_sessions(&db);
3232
3233        db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3234            session_id: "s4",
3235            summary_json: r#"{"changes":"delta"}"#,
3236            generated_at: "2026-03-04T10:00:00Z",
3237            provider: "codex_exec",
3238            model: Some("gpt-5"),
3239            source_kind: "session_signals",
3240            generation_kind: "provider",
3241            prompt_fingerprint: Some("fingerprint"),
3242            source_details_json: Some(r#"{"source":"session"}"#),
3243            diff_tree_json: Some(r#"[]"#),
3244            error: None,
3245        })
3246        .expect("upsert summary s4");
3247        db.upsert_session_semantic_summary(&SessionSemanticSummaryUpsert {
3248            session_id: "s2",
3249            summary_json: r#"{"changes":"delta"}"#,
3250            generated_at: "2026-03-04T10:00:00Z",
3251            provider: "codex_exec",
3252            model: Some("gpt-5"),
3253            source_kind: "session_signals",
3254            generation_kind: "provider",
3255            prompt_fingerprint: Some("fingerprint"),
3256            source_details_json: Some(r#"{"source":"session"}"#),
3257            diff_tree_json: Some(r#"[]"#),
3258            error: None,
3259        })
3260        .expect("upsert summary s2");
3261
3262        let ids = db
3263            .list_session_semantic_summary_ids()
3264            .expect("list semantic summary ids");
3265        assert_eq!(ids, vec!["s2", "s4"]);
3266    }
3267
3268    #[test]
3269    fn test_list_expired_session_ids_uses_created_at_ttl() {
3270        let db = test_db();
3271        seed_sessions(&db);
3272
3273        let expired = db
3274            .list_expired_session_ids(30)
3275            .expect("list expired sessions");
3276        assert!(
3277            expired.contains(&"s1".to_string()),
3278            "older seeded sessions should be expired for 30-day keep"
3279        );
3280
3281        let none_expired = db
3282            .list_expired_session_ids(10_000)
3283            .expect("list non-expired sessions");
3284        assert!(
3285            none_expired.is_empty(),
3286            "seeded sessions should be retained with a large keep window"
3287        );
3288    }
3289
3290    #[test]
3291    fn test_build_fts_query_quotes_tokens() {
3292        assert_eq!(
3293            build_fts_query("parser retry"),
3294            Some("\"parser\" OR \"retry\"".to_string())
3295        );
3296        assert!(build_fts_query("   ").is_none());
3297    }
3298
3299    #[test]
3300    fn test_vector_chunk_replace_and_candidate_query() {
3301        let db = test_db();
3302        seed_sessions(&db);
3303
3304        let chunks = vec![
3305            VectorChunkUpsert {
3306                chunk_id: "chunk-s1-0".to_string(),
3307                session_id: "s1".to_string(),
3308                chunk_index: 0,
3309                start_line: 1,
3310                end_line: 8,
3311                line_count: 8,
3312                content: "parser selection retry after auth error".to_string(),
3313                content_hash: "hash-0".to_string(),
3314                embedding: vec![0.1, 0.2, 0.3],
3315            },
3316            VectorChunkUpsert {
3317                chunk_id: "chunk-s1-1".to_string(),
3318                session_id: "s1".to_string(),
3319                chunk_index: 1,
3320                start_line: 9,
3321                end_line: 15,
3322                line_count: 7,
3323                content: "session list refresh control wired to runtime".to_string(),
3324                content_hash: "hash-1".to_string(),
3325                embedding: vec![0.3, 0.2, 0.1],
3326            },
3327        ];
3328
3329        db.replace_session_vector_chunks("s1", "source-hash-s1", "bge-m3", &chunks)
3330            .expect("replace vector chunks");
3331
3332        let source_hash = db
3333            .vector_index_source_hash("s1")
3334            .expect("read source hash")
3335            .expect("source hash should exist");
3336        assert_eq!(source_hash, "source-hash-s1");
3337
3338        let matches = db
3339            .list_vector_chunk_candidates("parser retry", "bge-m3", 10)
3340            .expect("query vector chunk candidates");
3341        assert!(
3342            !matches.is_empty(),
3343            "vector FTS query should return at least one candidate"
3344        );
3345        assert_eq!(matches[0].session_id, "s1");
3346        assert!(matches[0].content.contains("parser"));
3347    }
3348
3349    #[test]
3350    fn test_delete_session_removes_vector_index_rows() {
3351        let db = test_db();
3352        seed_sessions(&db);
3353
3354        let chunks = vec![VectorChunkUpsert {
3355            chunk_id: "chunk-s1-delete".to_string(),
3356            session_id: "s1".to_string(),
3357            chunk_index: 0,
3358            start_line: 1,
3359            end_line: 2,
3360            line_count: 2,
3361            content: "delete me from vector cache".to_string(),
3362            content_hash: "hash-delete".to_string(),
3363            embedding: vec![0.7, 0.1, 0.2],
3364        }];
3365        db.replace_session_vector_chunks("s1", "delete-hash", "bge-m3", &chunks)
3366            .expect("insert vector chunk");
3367
3368        db.delete_session("s1")
3369            .expect("delete session with vector rows");
3370
3371        let candidates = db
3372            .list_vector_chunk_candidates("delete", "bge-m3", 10)
3373            .expect("query candidates after delete");
3374        assert!(
3375            candidates.iter().all(|row| row.session_id != "s1"),
3376            "vector rows for deleted session should be removed"
3377        );
3378    }
3379
3380    #[test]
3381    fn test_vector_index_job_round_trip() {
3382        let db = test_db();
3383        let payload = VectorIndexJobRow {
3384            status: "running".to_string(),
3385            processed_sessions: 2,
3386            total_sessions: 10,
3387            message: Some("indexing".to_string()),
3388            started_at: Some("2026-03-05T10:00:00Z".to_string()),
3389            finished_at: None,
3390        };
3391        db.set_vector_index_job(&payload)
3392            .expect("set vector index job snapshot");
3393
3394        let loaded = db
3395            .get_vector_index_job()
3396            .expect("read vector index job snapshot")
3397            .expect("vector index job row should exist");
3398        assert_eq!(loaded.status, "running");
3399        assert_eq!(loaded.processed_sessions, 2);
3400        assert_eq!(loaded.total_sessions, 10);
3401        assert_eq!(loaded.message.as_deref(), Some("indexing"));
3402    }
3403
3404    #[test]
3405    fn test_summary_batch_job_round_trip() {
3406        let db = test_db();
3407        let payload = SummaryBatchJobRow {
3408            status: "running".to_string(),
3409            processed_sessions: 4,
3410            total_sessions: 12,
3411            failed_sessions: 1,
3412            message: Some("processing summaries".to_string()),
3413            started_at: Some("2026-03-05T10:00:00Z".to_string()),
3414            finished_at: None,
3415        };
3416        db.set_summary_batch_job(&payload)
3417            .expect("set summary batch job snapshot");
3418
3419        let loaded = db
3420            .get_summary_batch_job()
3421            .expect("read summary batch job snapshot")
3422            .expect("summary batch job row should exist");
3423        assert_eq!(loaded.status, "running");
3424        assert_eq!(loaded.processed_sessions, 4);
3425        assert_eq!(loaded.total_sessions, 12);
3426        assert_eq!(loaded.failed_sessions, 1);
3427        assert_eq!(loaded.message.as_deref(), Some("processing summaries"));
3428    }
3429
3430    #[test]
3431    fn test_session_count() {
3432        let db = test_db();
3433        assert_eq!(db.session_count().unwrap(), 0);
3434        seed_sessions(&db);
3435        assert_eq!(db.session_count().unwrap(), 5);
3436    }
3437}