Skip to main content

open_loops/index/
mod.rs

1//! SQLite-backed disposable index for cached scan and session data.
2//!
3//! The index lives at `<base>/index.db` (WAL mode). It is a **cache** only —
4//! git is the source of truth. Any open/migrate/integrity failure deletes the
5//! db file (and its `-wal`/`-shm` siblings) and recreates it from scratch.
6//! The program never panics or aborts on index failure.
7//!
8//! Schema is set to `user_version = 1` after the initial migration. Callers
9//! in later tasks wire read/write logic on top of the tables created here.
10
11use chrono::{DateTime, TimeZone, Utc};
12use rusqlite::{Connection, OpenFlags};
13use std::path::{Path, PathBuf};
14
15/// One cached open-loop row for a repo, persisted in the `loops` table.
16///
17/// Mirrors the heavy-phase output of `scanner::open_loops` for a single
18/// unmerged branch. `ahead`/`behind` are `None` when the cached scan ran
19/// without `need_ahead_behind` (light phase only).
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct LoopRow {
22    pub branch: String,
23    pub head_sha: String,
24    pub base_sha: String,
25    pub ahead: Option<u32>,
26    pub behind: Option<u32>,
27    pub last_commit: DateTime<Utc>,
28    pub worktree_path: PathBuf,
29}
30
31/// SQLite-backed cache index.
32///
33/// Wraps a single `Connection`. All public methods in this module treat index
34/// errors as non-fatal: they warn to stderr and fall back gracefully, matching
35/// the tolerant pattern used in `inventory.rs`.
36pub struct Index {
37    conn: Connection,
38}
39
40impl Index {
41    /// Opens (or creates) the index at `<base>/index.db`.
42    ///
43    /// Behaviour on failure at any stage (open, WAL pragma, migration,
44    /// integrity check):
45    /// 1. Print `warning: …` to stderr.
46    /// 2. Delete `index.db`, `index.db-wal`, `index.db-shm` from `base`.
47    /// 3. Attempt to create a fresh db in the same location.
48    /// 4. If that also fails, fall back to an in-memory db so the command
49    ///    continues without an index — never panic, never abort.
50    pub fn open(base: &Path) -> Self {
51        let db_path = base.join("index.db");
52        match Self::try_open_disk(&db_path) {
53            Ok(index) => index,
54            Err(e) => {
55                eprintln!("warning: index open/migrate failed ({e:#}); rebuilding");
56                Self::delete_db_files(base);
57                match Self::try_open_disk(&db_path) {
58                    Ok(index) => index,
59                    Err(e2) => {
60                        eprintln!(
61                            "warning: index rebuild also failed ({e2:#}); \
62                             falling back to in-memory index"
63                        );
64                        // In-memory fallback so the command still runs.
65                        Self::open_in_memory()
66                    }
67                }
68            }
69        }
70    }
71
72    /// Opens an in-memory index for tests (same migration, no disk I/O).
73    pub fn open_in_memory() -> Self {
74        let conn = Connection::open_in_memory().expect("in-memory SQLite must always open");
75        let mut index = Self { conn };
76        // In-memory: migration cannot fail; panic only here (test/fallback path).
77        index
78            .apply_pragmas()
79            .expect("in-memory pragma must succeed");
80        index
81            .run_migrations()
82            .expect("in-memory migration must succeed");
83        index
84    }
85
86    // -------------------------------------------------------------------------
87    // Public cache accessors (Task 2)
88    // -------------------------------------------------------------------------
89
90    /// Returns `(common_dir_hash, common_dir)` cached for `path`, or `None` on
91    /// miss or any index error.
92    pub fn cached_common_dir(&self, path: &Path) -> Option<(String, PathBuf)> {
93        let path_str = path.to_string_lossy();
94        match self.conn.query_row(
95            "SELECT common_dir_hash, common_dir FROM repos WHERE path = ?1",
96            rusqlite::params![path_str.as_ref()],
97            |row| {
98                let hash: String = row.get(0)?;
99                let cd: String = row.get(1)?;
100                Ok((hash, PathBuf::from(cd)))
101            },
102        ) {
103            Ok(pair) => Some(pair),
104            Err(rusqlite::Error::QueryReturnedNoRows) => None,
105            Err(e) => {
106                eprintln!("warning: index cached_common_dir query failed: {e:#}");
107                None
108            }
109        }
110    }
111
112    /// Upserts `(path, common_dir_hash, common_dir)` into `repos`, leaving
113    /// the remaining columns (default_branch, default_sha, refs_fingerprint,
114    /// last_indexed) NULL. On any index error, prints a warning and continues.
115    pub fn put_repo_common_dir(&self, path: &Path, common_dir_hash: &str, common_dir: &Path) {
116        let path_str = path.to_string_lossy();
117        let cd_str = common_dir.to_string_lossy();
118        if let Err(e) = self.conn.execute(
119            "INSERT INTO repos (common_dir_hash, path, common_dir)
120             VALUES (?1, ?2, ?3)
121             ON CONFLICT(path) DO UPDATE SET
122                 common_dir_hash = excluded.common_dir_hash,
123                 common_dir      = excluded.common_dir",
124            rusqlite::params![common_dir_hash, path_str.as_ref(), cd_str.as_ref()],
125        ) {
126            eprintln!("warning: index put_repo_common_dir failed: {e:#}");
127        }
128    }
129
130    // -------------------------------------------------------------------------
131    // Public cache accessors (Task 3): refs-fingerprint gate
132    // -------------------------------------------------------------------------
133
134    /// Returns the cached loops for `hash`, but ONLY when the stored repo row
135    /// proves the cache is still valid:
136    ///
137    /// 1. `repos.refs_fingerprint == refs_fp` (refs haven't changed), AND
138    /// 2. `repos.default_sha == default_sha` (the base hasn't moved).
139    ///
140    /// Returns `None` on any mismatch, on a missing/un-populated repo row, or on
141    /// any index error. A NULL `default_sha` / `refs_fingerprint` (a repo row
142    /// inserted by `put_repo_common_dir` but never `put_loops`'d) is a clean
143    /// miss — no warning is emitted, since it is the normal pre-`put_loops` state.
144    pub fn cached_loops(
145        &self,
146        hash: &str,
147        refs_fp: i64,
148        default_sha: &str,
149    ) -> Option<Vec<LoopRow>> {
150        // Read the gate columns. NULL columns map to `None` so an un-populated
151        // repos row is a clean miss rather than a warning.
152        let gate: Option<(i64, String)> = match self.conn.query_row(
153            "SELECT refs_fingerprint, default_sha FROM repos WHERE common_dir_hash = ?1",
154            rusqlite::params![hash],
155            |row| {
156                let fp: Option<i64> = row.get(0)?;
157                let sha: Option<String> = row.get(1)?;
158                Ok(fp.zip(sha))
159            },
160        ) {
161            Ok(g) => g,
162            Err(rusqlite::Error::QueryReturnedNoRows) => return None,
163            Err(e) => {
164                eprintln!("warning: index cached_loops gate query failed: {e:#}");
165                return None;
166            }
167        };
168
169        let (stored_fp, stored_sha) = gate?;
170        if stored_fp != refs_fp || stored_sha != default_sha {
171            return None;
172        }
173
174        // Gate passed: load the loop rows.
175        let mut stmt = match self.conn.prepare(
176            "SELECT branch, head_sha, base_sha, ahead, behind, last_commit, worktree_path
177             FROM loops WHERE common_dir_hash = ?1 ORDER BY branch",
178        ) {
179            Ok(s) => s,
180            Err(e) => {
181                eprintln!("warning: index cached_loops prepare failed: {e:#}");
182                return None;
183            }
184        };
185        let rows = stmt.query_map(rusqlite::params![hash], |row| {
186            let branch: String = row.get(0)?;
187            let head_sha: String = row.get(1)?;
188            let base_sha: String = row.get(2)?;
189            let ahead: Option<i64> = row.get(3)?;
190            let behind: Option<i64> = row.get(4)?;
191            let last_commit_secs: i64 = row.get(5)?;
192            let worktree_path: String = row.get(6)?;
193            Ok(LoopRow {
194                branch,
195                head_sha,
196                base_sha,
197                ahead: ahead.map(|v| v as u32),
198                behind: behind.map(|v| v as u32),
199                last_commit: Utc
200                    .timestamp_opt(last_commit_secs, 0)
201                    .single()
202                    .unwrap_or_default(),
203                worktree_path: PathBuf::from(worktree_path),
204            })
205        });
206        let rows = match rows {
207            Ok(mapped) => mapped.collect::<Result<Vec<_>, _>>(),
208            Err(e) => {
209                eprintln!("warning: index cached_loops query failed: {e:#}");
210                return None;
211            }
212        };
213        match rows {
214            Ok(v) => Some(v),
215            Err(e) => {
216                eprintln!("warning: index cached_loops row decode failed: {e:#}");
217                None
218            }
219        }
220    }
221
222    /// Write-through for a completed scan of one repo: upserts the `repos` row
223    /// (default branch/SHA, refs fingerprint, last_indexed) and REPLACES the
224    /// repo's `loops` rows — all in a single transaction.
225    ///
226    /// On any index error, prints a warning and continues (git is the source of
227    /// truth; the index is disposable).
228    #[allow(clippy::too_many_arguments)]
229    pub fn put_loops(
230        &self,
231        hash: &str,
232        path: &Path,
233        common_dir: &Path,
234        default_branch: &str,
235        default_sha: &str,
236        refs_fp: i64,
237        rows: &[LoopRow],
238    ) {
239        if let Err(e) = self.put_loops_tx(
240            hash,
241            path,
242            common_dir,
243            default_branch,
244            default_sha,
245            refs_fp,
246            rows,
247        ) {
248            eprintln!("warning: index put_loops failed: {e:#}");
249        }
250    }
251
252    /// Inner fallible body of [`Self::put_loops`], run inside one transaction.
253    #[allow(clippy::too_many_arguments)]
254    fn put_loops_tx(
255        &self,
256        hash: &str,
257        path: &Path,
258        common_dir: &Path,
259        default_branch: &str,
260        default_sha: &str,
261        refs_fp: i64,
262        rows: &[LoopRow],
263    ) -> Result<(), rusqlite::Error> {
264        let path_str = path.to_string_lossy();
265        let cd_str = common_dir.to_string_lossy();
266        let now = Utc::now().timestamp();
267
268        self.conn.execute_batch("BEGIN")?;
269        let res = (|| -> Result<(), rusqlite::Error> {
270            // Upsert the repos row. Key on common_dir_hash (PK) so a row that
271            // already exists from put_repo_common_dir is updated in place; also
272            // resolve a possible path UNIQUE conflict the same way.
273            self.conn.execute(
274                "INSERT INTO repos
275                     (common_dir_hash, path, common_dir, default_branch,
276                      default_sha, refs_fingerprint, last_indexed)
277                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
278                 ON CONFLICT(common_dir_hash) DO UPDATE SET
279                     path             = excluded.path,
280                     common_dir       = excluded.common_dir,
281                     default_branch   = excluded.default_branch,
282                     default_sha      = excluded.default_sha,
283                     refs_fingerprint = excluded.refs_fingerprint,
284                     last_indexed     = excluded.last_indexed",
285                rusqlite::params![
286                    hash,
287                    path_str.as_ref(),
288                    cd_str.as_ref(),
289                    default_branch,
290                    default_sha,
291                    refs_fp,
292                    now,
293                ],
294            )?;
295
296            // Replace the repo's loops rows: delete then re-insert.
297            self.conn.execute(
298                "DELETE FROM loops WHERE common_dir_hash = ?1",
299                rusqlite::params![hash],
300            )?;
301            for row in rows {
302                self.conn.execute(
303                    "INSERT INTO loops
304                         (common_dir_hash, branch, head_sha, base_sha,
305                          ahead, behind, last_commit, worktree_path)
306                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
307                    rusqlite::params![
308                        hash,
309                        row.branch,
310                        row.head_sha,
311                        row.base_sha,
312                        row.ahead.map(i64::from),
313                        row.behind.map(i64::from),
314                        row.last_commit.timestamp(),
315                        row.worktree_path.to_string_lossy().as_ref(),
316                    ],
317                )?;
318            }
319            Ok(())
320        })();
321
322        match res {
323            Ok(()) => {
324                self.conn.execute_batch("COMMIT")?;
325                Ok(())
326            }
327            Err(e) => {
328                // Best-effort rollback; report the original error.
329                let _ = self.conn.execute_batch("ROLLBACK");
330                Err(e)
331            }
332        }
333    }
334
335    // -------------------------------------------------------------------------
336    // Public session accessors (Task 4): FTS index for mention probe
337    // -------------------------------------------------------------------------
338
339    /// Upserts a session's bounded tail text into the `sessions` table and the
340    /// `sessions_fts` virtual table.
341    ///
342    /// Reindexes ONLY when the stored `(mtime, size)` for `path` differs from
343    /// the supplied values. Unchanged files are skipped (no I/O, no FTS write).
344    /// `size` is compared alongside `mtime` so a same-second append that grows
345    /// the file still forces a reindex (closes the same-second FTS staleness
346    /// window, I-2). On any index error, prints a warning and continues.
347    pub fn upsert_session(&self, path: &Path, repo_path: &Path, mtime: i64, size: i64, text: &str) {
348        if let Err(e) = self.upsert_session_inner(path, repo_path, mtime, size, text) {
349            eprintln!("warning: index upsert_session failed: {e:#}");
350        }
351    }
352
353    fn upsert_session_inner(
354        &self,
355        path: &Path,
356        repo_path: &Path,
357        mtime: i64,
358        size: i64,
359        text: &str,
360    ) -> Result<(), rusqlite::Error> {
361        let path_str = path.to_string_lossy();
362        let repo_str = repo_path.to_string_lossy();
363
364        // Check whether a row with the same (path, mtime, size) already exists.
365        // Also retrieve the rowid so we can delete the old FTS entry by rowid.
366        // `size` is compared alongside `mtime` to close the same-second
367        // staleness window (I-2): a file appended to twice within one wall-clock
368        // second keeps the same whole-second mtime, so mtime alone would skip the
369        // reindex and serve a stale tail. Any change to size (or mtime) reindexes.
370        let existing: Option<(i64, i64, i64)> = match self.conn.query_row(
371            "SELECT rowid, mtime, size FROM sessions WHERE path = ?1",
372            rusqlite::params![path_str.as_ref()],
373            |row| {
374                Ok((
375                    row.get::<_, i64>(0)?,
376                    row.get::<_, i64>(1)?,
377                    row.get::<_, i64>(2)?,
378                ))
379            },
380        ) {
381            Ok(triple) => Some(triple),
382            Err(rusqlite::Error::QueryReturnedNoRows) => None,
383            Err(e) => return Err(e),
384        };
385
386        if existing.map(|(_, m, s)| (m, s)) == Some((mtime, size)) {
387            // Neither mtime nor size changed — skip reindex.
388            return Ok(());
389        }
390
391        // If a previous row exists, remove the old FTS entry by rowid.
392        if let Some((old_rowid, _, _)) = existing {
393            self.conn.execute(
394                "DELETE FROM sessions_fts WHERE rowid = ?1",
395                rusqlite::params![old_rowid],
396            )?;
397        }
398
399        // Upsert the metadata row.
400        self.conn.execute(
401            "INSERT INTO sessions (path, repo_path, mtime, size)
402             VALUES (?1, ?2, ?3, ?4)
403             ON CONFLICT(path) DO UPDATE SET
404                 repo_path = excluded.repo_path,
405                 mtime     = excluded.mtime,
406                 size      = excluded.size",
407            rusqlite::params![path_str.as_ref(), repo_str.as_ref(), mtime, size,],
408        )?;
409
410        // Get the rowid of the upserted sessions row so we can link it to FTS.
411        let sessions_rowid: i64 = self.conn.query_row(
412            "SELECT rowid FROM sessions WHERE path = ?1",
413            rusqlite::params![path_str.as_ref()],
414            |row| row.get(0),
415        )?;
416
417        // Insert the new FTS row with the same rowid as the sessions row.
418        // This lets us join sessions_fts.rowid = sessions.rowid in queries.
419        self.conn.execute(
420            "INSERT INTO sessions_fts (rowid, text) VALUES (?1, ?2)",
421            rusqlite::params![sessions_rowid, text],
422        )?;
423
424        Ok(())
425    }
426
427    /// Returns the set of session file paths (scoped to `repo_path`) whose
428    /// indexed text matches `branch` via FTS5.
429    ///
430    /// No file reads. On any index error, returns an empty set.
431    pub fn session_mentions(
432        &self,
433        repo_path: &Path,
434        branch: &str,
435    ) -> std::collections::HashSet<PathBuf> {
436        match self.session_mentions_inner(repo_path, branch) {
437            Ok(set) => set,
438            Err(e) => {
439                eprintln!("warning: index session_mentions failed: {e:#}");
440                std::collections::HashSet::new()
441            }
442        }
443    }
444
445    fn session_mentions_inner(
446        &self,
447        repo_path: &Path,
448        branch: &str,
449    ) -> Result<std::collections::HashSet<PathBuf>, rusqlite::Error> {
450        let repo_str = repo_path.to_string_lossy();
451        // Wrap in double-quotes for FTS5 phrase/literal match.
452        // Double any embedded double-quotes to escape them.
453        let fts_query = format!("\"{}\"", branch.replace('"', "\"\""));
454
455        // Join sessions_fts to sessions via rowid (FTS5 always exposes rowid).
456        // Join via rowid to recover `path` alongside the FTS MATCH.
457        let mut stmt = self.conn.prepare(
458            "SELECT s.path FROM sessions_fts f
459             JOIN sessions s ON s.rowid = f.rowid
460             WHERE sessions_fts MATCH ?1
461               AND s.repo_path = ?2",
462        )?;
463        let paths = stmt.query_map(rusqlite::params![fts_query, repo_str.as_ref()], |row| {
464            let p: String = row.get(0)?;
465            Ok(PathBuf::from(p))
466        })?;
467        paths.collect::<Result<std::collections::HashSet<_>, _>>()
468    }
469
470    // -------------------------------------------------------------------------
471    // Public maintenance (Task 5): prune orphans
472    // -------------------------------------------------------------------------
473
474    /// Deletes `repos` rows (and their dependent `loops` rows) whose repo is gone
475    /// from disk. This is **stricter** than `inventory::prune_orphans`, which
476    /// prunes on a single `repo_path` existence check (and also reclaims
477    /// unreadable files); here a row is removed only when BOTH the scanned `path`
478    /// AND the `common_dir` are gone.
479    ///
480    /// A repo is an orphan only when BOTH its scanned `path` and its `common_dir`
481    /// no longer exist: a worktree directory may be removed while the shared bare
482    /// store under `common_dir` survives (its branches are still real), so we must
483    /// keep the row in that case. Removal is self-healing — a returning repo is
484    /// simply re-discovered and re-indexed on the next scan.
485    ///
486    /// On any index error, prints a warning and continues (git is the source of
487    /// truth; the index is disposable).
488    pub fn prune_missing_repos(&self) {
489        if let Err(e) = self.prune_missing_repos_inner() {
490            eprintln!("warning: index prune_missing_repos failed: {e:#}");
491        }
492    }
493
494    fn prune_missing_repos_inner(&self) -> Result<(), rusqlite::Error> {
495        // Collect candidate rows first so we don't mutate while iterating a stmt.
496        let rows: Vec<(String, String, String)> = {
497            let mut stmt = self
498                .conn
499                .prepare("SELECT common_dir_hash, path, common_dir FROM repos")?;
500            let mapped = stmt.query_map([], |row| {
501                Ok((
502                    row.get::<_, String>(0)?,
503                    row.get::<_, String>(1)?,
504                    row.get::<_, String>(2)?,
505                ))
506            })?;
507            mapped.collect::<Result<Vec<_>, _>>()?
508        };
509
510        for (hash, path, common_dir) in rows {
511            let path_gone = !Path::new(&path).exists();
512            let common_gone = !Path::new(&common_dir).exists();
513            // Orphan only when the worktree path AND the shared store are both gone.
514            if path_gone && common_gone {
515                self.conn.execute(
516                    "DELETE FROM loops WHERE common_dir_hash = ?1",
517                    rusqlite::params![hash],
518                )?;
519                self.conn.execute(
520                    "DELETE FROM repos WHERE common_dir_hash = ?1",
521                    rusqlite::params![hash],
522                )?;
523                eprintln!("warning: removed orphan index entry for {path}");
524            }
525        }
526        Ok(())
527    }
528
529    // -------------------------------------------------------------------------
530    // Internal helpers
531    // -------------------------------------------------------------------------
532
533    /// Attempts to open the db at `path`, apply pragmas, run migrations, and
534    /// verify integrity. Returns an error string on any failure.
535    fn try_open_disk(db_path: &Path) -> Result<Self, anyhow::Error> {
536        // Ensure the parent directory exists.
537        if let Some(parent) = db_path.parent() {
538            std::fs::create_dir_all(parent)
539                .map_err(|e| anyhow::anyhow!("creating index dir {}: {e}", parent.display()))?;
540        }
541
542        let conn = Connection::open_with_flags(
543            db_path,
544            OpenFlags::SQLITE_OPEN_READ_WRITE
545                | OpenFlags::SQLITE_OPEN_CREATE
546                | OpenFlags::SQLITE_OPEN_NO_MUTEX,
547        )
548        .map_err(|e| anyhow::anyhow!("opening {}: {e}", db_path.display()))?;
549
550        let mut index = Self { conn };
551        index.apply_pragmas()?;
552        index.run_migrations()?;
553        index.check_integrity()?;
554        Ok(index)
555    }
556
557    /// Sets WAL mode and enables foreign keys.
558    fn apply_pragmas(&mut self) -> Result<(), anyhow::Error> {
559        self.conn
560            .execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
561            .map_err(|e| anyhow::anyhow!("applying pragmas: {e}"))
562    }
563
564    /// Reads `user_version` and applies all pending migrations in order.
565    ///
566    /// * `user_version = 0` → v1 schema (all four tables) → v1→v2 FTS heal = end at 2.
567    /// * `user_version = 1` → stale contentless `sessions_fts` from an intermediate
568    ///   build of this branch; v1→v2 migration drops and recreates it contentful.
569    /// * `user_version ≥ 2` → up to date; no-op.
570    fn run_migrations(&mut self) -> Result<(), anyhow::Error> {
571        let version: i32 = self
572            .conn
573            .query_row("PRAGMA user_version", [], |row| row.get(0))
574            .map_err(|e| anyhow::anyhow!("reading user_version: {e}"))?;
575
576        if version < 1 {
577            self.create_schema_v1()?;
578        }
579        if version < 2 {
580            self.migrate_v1_to_v2()?;
581        }
582        Ok(())
583    }
584
585    /// Heals a stale `sessions_fts` created by earlier builds of this branch
586    /// that used `content=''` (contentless). Drops the old virtual table,
587    /// recreates it as a contentful FTS5 table, and bumps `user_version` to 2.
588    ///
589    /// When coming from a fresh create (`user_version` was 0), `create_schema_v1`
590    /// already built the contentful table, so this migration's DROP + recreate is
591    /// a fast no-op in terms of data: it leaves the schema at version 2 without
592    /// touching `repos`, `loops`, or `sessions`.
593    fn migrate_v1_to_v2(&mut self) -> Result<(), anyhow::Error> {
594        self.conn
595            .execute_batch(
596                "
597                BEGIN;
598                DROP TABLE IF EXISTS sessions_fts;
599                CREATE VIRTUAL TABLE sessions_fts USING fts5(
600                    text,
601                    path UNINDEXED
602                );
603                PRAGMA user_version = 2;
604                COMMIT;
605                ",
606            )
607            .map_err(|e| anyhow::anyhow!("migrating v1→v2 (FTS heal): {e}"))
608    }
609
610    /// Creates all four tables and sets `user_version = 1`.
611    ///
612    /// Executed in a single `execute_batch` so it is atomic.
613    fn create_schema_v1(&mut self) -> Result<(), anyhow::Error> {
614        self.conn
615            .execute_batch(
616                "
617                BEGIN;
618
619                CREATE TABLE repos (
620                    common_dir_hash TEXT PRIMARY KEY,
621                    path            TEXT NOT NULL UNIQUE,
622                    common_dir      TEXT NOT NULL,
623                    default_branch  TEXT,
624                    default_sha     TEXT,
625                    refs_fingerprint INTEGER,
626                    last_indexed    INTEGER
627                );
628
629                CREATE TABLE loops (
630                    common_dir_hash TEXT NOT NULL,
631                    branch          TEXT NOT NULL,
632                    head_sha        TEXT NOT NULL,
633                    base_sha        TEXT NOT NULL,
634                    ahead           INTEGER,
635                    behind          INTEGER,
636                    last_commit     INTEGER NOT NULL,
637                    worktree_path   TEXT NOT NULL,
638                    PRIMARY KEY (common_dir_hash, branch)
639                );
640
641                CREATE TABLE sessions (
642                    path        TEXT PRIMARY KEY,
643                    repo_path   TEXT NOT NULL,
644                    mtime       INTEGER NOT NULL,
645                    size        INTEGER NOT NULL
646                );
647
648                -- NOT a contentless table: contentless FTS5 (content='') rejects
649                -- `DELETE ... WHERE rowid = ?`, which the reindex path needs when a
650                -- session file changes (I-2 same-second size bump). Letting the
651                -- table own its `text` keeps row-level delete/replace working; the
652                -- per-row text is tiny (a bounded tail) so the storage cost is
653                -- negligible. `path` stays UNINDEXED (stored, not tokenized).
654                CREATE VIRTUAL TABLE sessions_fts USING fts5(
655                    text,
656                    path UNINDEXED
657                );
658
659                PRAGMA user_version = 1;
660
661                COMMIT;
662                ",
663            )
664            .map_err(|e| anyhow::anyhow!("creating schema v1: {e}"))
665    }
666
667    /// Runs `PRAGMA integrity_check` and returns an error if it reports problems.
668    fn check_integrity(&self) -> Result<(), anyhow::Error> {
669        let result: String = self
670            .conn
671            .query_row("PRAGMA integrity_check", [], |row| row.get(0))
672            .map_err(|e| anyhow::anyhow!("integrity_check query failed: {e}"))?;
673
674        if result != "ok" {
675            return Err(anyhow::anyhow!("integrity_check: {result}"));
676        }
677        Ok(())
678    }
679
680    /// Deletes `index.db`, `index.db-wal`, and `index.db-shm` from `base`.
681    ///
682    /// Missing files are silently ignored (may already be absent on a fresh dir).
683    fn delete_db_files(base: &Path) {
684        for suffix in &["index.db", "index.db-wal", "index.db-shm"] {
685            let path = base.join(suffix);
686            match std::fs::remove_file(&path) {
687                Ok(()) => {}
688                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
689                Err(e) => {
690                    eprintln!("warning: failed to remove {}: {e:#}", path.display());
691                }
692            }
693        }
694    }
695}
696
697#[cfg(test)]
698mod tests {
699    use super::*;
700    use tempfile::TempDir;
701
702    // -----------------------------------------------------------------------
703    // Helpers
704    // -----------------------------------------------------------------------
705
706    /// Returns every table/virtual-table name present in the connection.
707    fn get_tables(conn: &Connection) -> Vec<String> {
708        let mut stmt = conn
709            .prepare("SELECT name FROM sqlite_master WHERE type IN ('table') ORDER BY name")
710            .unwrap();
711        stmt.query_map([], |row| row.get::<_, String>(0))
712            .unwrap()
713            .map(|r| r.unwrap())
714            .collect()
715    }
716
717    fn all_four_tables_present(tables: &[String]) -> bool {
718        ["loops", "repos", "sessions", "sessions_fts"]
719            .iter()
720            .all(|t| tables.iter().any(|n| n == t))
721    }
722
723    fn user_version(conn: &Connection) -> i32 {
724        conn.query_row("PRAGMA user_version", [], |r| r.get(0))
725            .unwrap()
726    }
727
728    // -----------------------------------------------------------------------
729    // (a) Fresh dir: open creates db with all four tables
730    // -----------------------------------------------------------------------
731
732    #[test]
733    fn open_fresh_dir_creates_all_four_tables() {
734        let tmp = TempDir::new().unwrap();
735        let index = Index::open(tmp.path());
736        let tables = get_tables(&index.conn);
737        assert!(
738            all_four_tables_present(&tables),
739            "expected repos, loops, sessions, sessions_fts — got: {tables:?}"
740        );
741        assert_eq!(user_version(&index.conn), 2);
742        assert!(tmp.path().join("index.db").exists());
743    }
744
745    // -----------------------------------------------------------------------
746    // (b) Reopening is idempotent (user_version stays 1, no error)
747    // -----------------------------------------------------------------------
748
749    #[test]
750    fn reopen_is_idempotent() {
751        let tmp = TempDir::new().unwrap();
752        {
753            let _first = Index::open(tmp.path());
754        }
755        // Drop first connection, then reopen.
756        let second = Index::open(tmp.path());
757        assert_eq!(user_version(&second.conn), 2);
758        let tables = get_tables(&second.conn);
759        assert!(
760            all_four_tables_present(&tables),
761            "tables missing after reopen: {tables:?}"
762        );
763    }
764
765    // -----------------------------------------------------------------------
766    // (c) Corrupt db file → open rebuilds and tables exist
767    // -----------------------------------------------------------------------
768
769    #[test]
770    fn corrupt_db_is_rebuilt() {
771        let tmp = TempDir::new().unwrap();
772        let db_path = tmp.path().join("index.db");
773
774        // Write garbage bytes where the db would be.
775        std::fs::write(&db_path, b"not a sqlite database at all").unwrap();
776
777        // open must recover, not panic.
778        let index = Index::open(tmp.path());
779        let tables = get_tables(&index.conn);
780        assert!(
781            all_four_tables_present(&tables),
782            "tables missing after corrupt-rebuild: {tables:?}"
783        );
784        assert_eq!(user_version(&index.conn), 2);
785    }
786
787    // -----------------------------------------------------------------------
788    // (d) open_in_memory has the schema
789    // -----------------------------------------------------------------------
790
791    #[test]
792    fn open_in_memory_has_schema() {
793        let index = Index::open_in_memory();
794        let tables = get_tables(&index.conn);
795        assert!(
796            all_four_tables_present(&tables),
797            "in-memory index missing tables: {tables:?}"
798        );
799        assert_eq!(user_version(&index.conn), 2);
800    }
801
802    // -----------------------------------------------------------------------
803    // (d2) v1 contentless FTS → migrated to v2 contentful FTS
804    // -----------------------------------------------------------------------
805
806    /// Simulates a DB created by an earlier build of this branch that used
807    /// `content=''` (contentless) for `sessions_fts` and left `user_version = 1`.
808    /// After `run_migrations` the DB must be at `user_version = 2` with a
809    /// contentful `sessions_fts`, so that a DELETE-then-insert (reindex) no longer
810    /// errors and the session is findable via `session_mentions`.
811    #[test]
812    fn migrate_v1_contentless_fts_to_v2_contentful() {
813        // Build the stale "v1 contentless" state manually in a temp-file DB.
814        let tmp = TempDir::new().unwrap();
815        let db_path = tmp.path().join("index.db");
816
817        // Phase A: create the stale schema in its own connection, then close it.
818        {
819            let conn = Connection::open(&db_path).unwrap();
820            conn.execute_batch(
821                "
822                PRAGMA journal_mode=WAL;
823                BEGIN;
824                CREATE TABLE repos (
825                    common_dir_hash TEXT PRIMARY KEY,
826                    path            TEXT NOT NULL UNIQUE,
827                    common_dir      TEXT NOT NULL,
828                    default_branch  TEXT,
829                    default_sha     TEXT,
830                    refs_fingerprint INTEGER,
831                    last_indexed    INTEGER
832                );
833                CREATE TABLE loops (
834                    common_dir_hash TEXT NOT NULL,
835                    branch          TEXT NOT NULL,
836                    head_sha        TEXT NOT NULL,
837                    base_sha        TEXT NOT NULL,
838                    ahead           INTEGER,
839                    behind          INTEGER,
840                    last_commit     INTEGER NOT NULL,
841                    worktree_path   TEXT NOT NULL,
842                    PRIMARY KEY (common_dir_hash, branch)
843                );
844                CREATE TABLE sessions (
845                    path        TEXT PRIMARY KEY,
846                    repo_path   TEXT NOT NULL,
847                    mtime       INTEGER NOT NULL,
848                    size        INTEGER NOT NULL
849                );
850                CREATE VIRTUAL TABLE sessions_fts USING fts5(
851                    text,
852                    path UNINDEXED,
853                    content=''
854                );
855                PRAGMA user_version = 1;
856                COMMIT;
857                ",
858            )
859            .unwrap();
860        } // conn dropped / file closed
861
862        // Phase B: open via Index::open — migration must heal the stale FTS.
863        let index = Index::open(tmp.path());
864
865        // (a) user_version must be 2 after migration.
866        assert_eq!(
867            user_version(&index.conn),
868            2,
869            "migration must bump user_version to 2"
870        );
871
872        // (b) upsert_session + session_mentions must work (DELETE-then-insert no longer errors).
873        let path = std::path::Path::new("/fake/migrated-sess.jsonl");
874        let repo = std::path::Path::new("/home/g/app");
875        index.upsert_session(
876            path,
877            repo,
878            1_700_000_000,
879            100,
880            "[user] working on feat/migrated",
881        );
882        let mentions = index.session_mentions(repo, "feat/migrated");
883        assert!(
884            mentions.contains(&path.to_path_buf()),
885            "session must be findable via FTS after v1→v2 migration"
886        );
887
888        // Also verify that a second upsert (triggers DELETE old rowid + reinsert) works.
889        index.upsert_session(
890            path,
891            repo,
892            1_700_000_000,
893            200, // size grew → forces reindex DELETE
894            "[user] working on feat/migrated — extended",
895        );
896        assert!(
897            index
898                .session_mentions(repo, "feat/migrated")
899                .contains(&path.to_path_buf()),
900            "reindex DELETE must succeed on contentful FTS after migration"
901        );
902    }
903
904    // -----------------------------------------------------------------------
905    // (e) cached_common_dir / put_repo_common_dir round-trip
906    // -----------------------------------------------------------------------
907
908    #[test]
909    fn put_then_get_common_dir() {
910        let index = Index::open_in_memory();
911        let path = std::path::Path::new("/home/u/project");
912        let common_dir = std::path::Path::new("/home/u/project/.git");
913        let hash = "aabbccddeeff0011";
914
915        // Miss before insert.
916        assert!(index.cached_common_dir(path).is_none());
917
918        index.put_repo_common_dir(path, hash, common_dir);
919
920        let (got_hash, got_cd) = index.cached_common_dir(path).expect("should hit after put");
921        assert_eq!(got_hash, hash);
922        assert_eq!(got_cd, common_dir);
923    }
924
925    #[test]
926    fn put_is_idempotent_upsert() {
927        let index = Index::open_in_memory();
928        let path = std::path::Path::new("/home/u/project");
929        let cd1 = std::path::Path::new("/home/u/project/.git");
930        let cd2 = std::path::Path::new("/home/u/project/.bare");
931
932        index.put_repo_common_dir(path, "hash1", cd1);
933        index.put_repo_common_dir(path, "hash2", cd2);
934
935        let (h, cd) = index.cached_common_dir(path).unwrap();
936        assert_eq!(h, "hash2");
937        assert_eq!(cd, cd2);
938    }
939
940    // -----------------------------------------------------------------------
941    // (f) Task 3: put_loops / cached_loops refs-fingerprint gate
942    // -----------------------------------------------------------------------
943
944    fn sample_rows() -> Vec<LoopRow> {
945        vec![
946            LoopRow {
947                branch: "feat/a".into(),
948                head_sha: "a".repeat(40),
949                base_sha: "d".repeat(40),
950                ahead: Some(3),
951                behind: Some(1),
952                last_commit: Utc.timestamp_opt(1_700_000_000, 0).single().unwrap(),
953                worktree_path: PathBuf::from("/wt/a"),
954            },
955            LoopRow {
956                branch: "feat/b".into(),
957                head_sha: "b".repeat(40),
958                base_sha: "d".repeat(40),
959                ahead: Some(7),
960                behind: Some(0),
961                last_commit: Utc.timestamp_opt(1_700_000_100, 0).single().unwrap(),
962                worktree_path: PathBuf::from("/wt/b"),
963            },
964        ]
965    }
966
967    #[test]
968    fn put_loops_then_cached_loops_round_trip_on_matching_gate() {
969        let index = Index::open_in_memory();
970        let hash = "deadbeef00000000";
971        let default_sha = "d".repeat(40);
972        let rows = sample_rows();
973
974        // Miss before any write.
975        assert!(index.cached_loops(hash, 42, &default_sha).is_none());
976
977        index.put_loops(
978            hash,
979            std::path::Path::new("/repo"),
980            std::path::Path::new("/repo/.git"),
981            "main",
982            &default_sha,
983            42,
984            &rows,
985        );
986
987        let got = index
988            .cached_loops(hash, 42, &default_sha)
989            .expect("matching fingerprint + default_sha must hit");
990        assert_eq!(got, rows);
991    }
992
993    #[test]
994    fn cached_loops_misses_on_fingerprint_mismatch() {
995        let index = Index::open_in_memory();
996        let hash = "deadbeef00000001";
997        let default_sha = "d".repeat(40);
998        index.put_loops(
999            hash,
1000            std::path::Path::new("/repo"),
1001            std::path::Path::new("/repo/.git"),
1002            "main",
1003            &default_sha,
1004            42,
1005            &sample_rows(),
1006        );
1007        // Different fingerprint → miss.
1008        assert!(index.cached_loops(hash, 43, &default_sha).is_none());
1009        // Same fingerprint → hit.
1010        assert!(index.cached_loops(hash, 42, &default_sha).is_some());
1011    }
1012
1013    #[test]
1014    fn cached_loops_misses_on_default_sha_mismatch() {
1015        let index = Index::open_in_memory();
1016        let hash = "deadbeef00000002";
1017        index.put_loops(
1018            hash,
1019            std::path::Path::new("/repo"),
1020            std::path::Path::new("/repo/.git"),
1021            "main",
1022            &"d".repeat(40),
1023            42,
1024            &sample_rows(),
1025        );
1026        // Same fingerprint but a different default_sha (base moved) → miss.
1027        assert!(index.cached_loops(hash, 42, &"e".repeat(40)).is_none());
1028    }
1029
1030    #[test]
1031    fn cached_loops_unpopulated_repos_row_is_clean_miss() {
1032        let index = Index::open_in_memory();
1033        let path = std::path::Path::new("/repo");
1034        let cd = std::path::Path::new("/repo/.git");
1035        let hash = "deadbeef00000003";
1036        // Insert a repos row WITHOUT loops data (NULL default_sha / fingerprint).
1037        index.put_repo_common_dir(path, hash, cd);
1038        // Must be a clean miss (no panic, no spurious behaviour).
1039        assert!(index.cached_loops(hash, 0, "").is_none());
1040        assert!(index.cached_loops(hash, 42, &"d".repeat(40)).is_none());
1041    }
1042
1043    #[test]
1044    fn put_loops_replaces_previous_rows_in_one_transaction() {
1045        let index = Index::open_in_memory();
1046        let hash = "deadbeef00000004";
1047        let default_sha = "d".repeat(40);
1048        index.put_loops(
1049            hash,
1050            std::path::Path::new("/repo"),
1051            std::path::Path::new("/repo/.git"),
1052            "main",
1053            &default_sha,
1054            42,
1055            &sample_rows(), // 2 rows
1056        );
1057        // Re-write with a single row and a new fingerprint.
1058        let one = vec![LoopRow {
1059            branch: "feat/only".into(),
1060            head_sha: "c".repeat(40),
1061            base_sha: default_sha.clone(),
1062            ahead: Some(1),
1063            behind: Some(0),
1064            last_commit: Utc.timestamp_opt(1_700_000_500, 0).single().unwrap(),
1065            worktree_path: PathBuf::from("/wt/only"),
1066        }];
1067        index.put_loops(
1068            hash,
1069            std::path::Path::new("/repo"),
1070            std::path::Path::new("/repo/.git"),
1071            "main",
1072            &default_sha,
1073            99,
1074            &one,
1075        );
1076        let got = index.cached_loops(hash, 99, &default_sha).unwrap();
1077        assert_eq!(got, one, "old rows must be replaced, not appended");
1078    }
1079
1080    #[test]
1081    fn put_loops_upgrades_existing_common_dir_row() {
1082        // A repos row created by put_repo_common_dir (Task 2, NULL gate columns)
1083        // must be upgraded in place by put_loops — same common_dir_hash PK — so
1084        // the gate hits afterwards and no duplicate row is created.
1085        let index = Index::open_in_memory();
1086        let path = std::path::Path::new("/repo");
1087        let cd = std::path::Path::new("/repo/.git");
1088        let hash = "deadbeef00000005";
1089        index.put_repo_common_dir(path, hash, cd);
1090        // Pre-upgrade: repos row exists but gate columns are NULL → clean miss.
1091        let default_sha = "d".repeat(40);
1092        assert!(index.cached_loops(hash, 7, &default_sha).is_none());
1093
1094        index.put_loops(hash, path, cd, "main", &default_sha, 7, &sample_rows());
1095
1096        // Exactly one repos row for this hash, now populated → gate hits.
1097        let repo_count: i64 = index
1098            .conn
1099            .query_row(
1100                "SELECT COUNT(*) FROM repos WHERE common_dir_hash = ?1",
1101                rusqlite::params![hash],
1102                |r| r.get(0),
1103            )
1104            .unwrap();
1105        assert_eq!(
1106            repo_count, 1,
1107            "put_loops must upgrade in place, not duplicate"
1108        );
1109        assert!(index.cached_loops(hash, 7, &default_sha).is_some());
1110    }
1111
1112    #[test]
1113    fn cached_loops_preserves_null_ahead_behind() {
1114        // Light-phase rows (no ahead/behind) round-trip as None.
1115        let index = Index::open_in_memory();
1116        let hash = "deadbeef00000006";
1117        let default_sha = "d".repeat(40);
1118        let rows = vec![LoopRow {
1119            branch: "feat/light".into(),
1120            head_sha: "a".repeat(40),
1121            base_sha: default_sha.clone(),
1122            ahead: None,
1123            behind: None,
1124            last_commit: Utc.timestamp_opt(1_700_000_000, 0).single().unwrap(),
1125            worktree_path: PathBuf::from("/wt/light"),
1126        }];
1127        index.put_loops(
1128            hash,
1129            std::path::Path::new("/repo"),
1130            std::path::Path::new("/repo/.git"),
1131            "main",
1132            &default_sha,
1133            1,
1134            &rows,
1135        );
1136        let got = index.cached_loops(hash, 1, &default_sha).unwrap();
1137        assert_eq!(got[0].ahead, None);
1138        assert_eq!(got[0].behind, None);
1139    }
1140
1141    // -----------------------------------------------------------------------
1142    // Task 4: upsert_session / session_mentions
1143    // -----------------------------------------------------------------------
1144
1145    // -----------------------------------------------------------------------
1146    // Task 5: prune_missing_repos
1147    // -----------------------------------------------------------------------
1148
1149    fn repos_count(index: &Index) -> i64 {
1150        index
1151            .conn
1152            .query_row("SELECT COUNT(*) FROM repos", [], |r| r.get(0))
1153            .unwrap()
1154    }
1155
1156    fn loops_count(index: &Index, hash: &str) -> i64 {
1157        index
1158            .conn
1159            .query_row(
1160                "SELECT COUNT(*) FROM loops WHERE common_dir_hash = ?1",
1161                rusqlite::params![hash],
1162                |r| r.get(0),
1163            )
1164            .unwrap()
1165    }
1166
1167    #[test]
1168    fn prune_missing_repos_removes_gone_repo_and_keeps_live_one() {
1169        // A live repo (its dir exists on disk) must survive; a gone repo (path and
1170        // common_dir both absent) must be deleted along with its loops.
1171        let tmp = TempDir::new().unwrap();
1172        let live_dir = tmp.path().join("live");
1173        let live_common = live_dir.join(".git");
1174        std::fs::create_dir_all(&live_common).unwrap();
1175
1176        let index = Index::open_in_memory();
1177        let default_sha = "d".repeat(40);
1178
1179        // Live repo: real dir on disk.
1180        let live_hash = "live000000000000";
1181        index.put_loops(
1182            live_hash,
1183            &live_dir,
1184            &live_common,
1185            "main",
1186            &default_sha,
1187            1,
1188            &sample_rows(),
1189        );
1190
1191        // Gone repo: paths that do not exist.
1192        let gone_hash = "gone000000000000";
1193        index.put_loops(
1194            gone_hash,
1195            std::path::Path::new("/no/such/repo"),
1196            std::path::Path::new("/no/such/repo/.git"),
1197            "main",
1198            &default_sha,
1199            1,
1200            &sample_rows(),
1201        );
1202
1203        assert_eq!(repos_count(&index), 2);
1204        assert_eq!(loops_count(&index, gone_hash), 2);
1205
1206        index.prune_missing_repos();
1207
1208        assert_eq!(repos_count(&index), 1, "only the live repo must remain");
1209        assert!(
1210            index.cached_loops(live_hash, 1, &default_sha).is_some(),
1211            "live repo loops must survive prune"
1212        );
1213        assert_eq!(
1214            loops_count(&index, gone_hash),
1215            0,
1216            "gone repo loops must be deleted"
1217        );
1218    }
1219
1220    #[test]
1221    fn prune_missing_repos_keeps_repo_when_common_dir_survives() {
1222        // A worktree dir removed while the shared common-dir store still exists is
1223        // NOT an orphan: its branches are still real, so the row must be kept.
1224        let tmp = TempDir::new().unwrap();
1225        let common = tmp.path().join("my-app/.bare");
1226        std::fs::create_dir_all(&common).unwrap();
1227
1228        let index = Index::open_in_memory();
1229        let hash = "wtstore000000000";
1230        index.put_loops(
1231            hash,
1232            std::path::Path::new("/gone/worktree"), // path gone
1233            &common,                                // common_dir survives
1234            "main",
1235            &"d".repeat(40),
1236            1,
1237            &sample_rows(),
1238        );
1239
1240        index.prune_missing_repos();
1241
1242        assert_eq!(
1243            repos_count(&index),
1244            1,
1245            "row must survive while common_dir exists"
1246        );
1247    }
1248
1249    #[test]
1250    fn upsert_and_session_mentions_basic() {
1251        let index = Index::open_in_memory();
1252        let path = std::path::Path::new("/fake/sess.jsonl");
1253        let repo = std::path::Path::new("/home/g/app");
1254        index.upsert_session(path, repo, 12345, 100, "[user] working on feat/login");
1255        let mentions = index.session_mentions(repo, "feat/login");
1256        assert!(
1257            mentions.contains(&path.to_path_buf()),
1258            "FTS must find the session"
1259        );
1260    }
1261
1262    /// I-2: a same-second append that GROWS the file (mtime unchanged, size up)
1263    /// must force a reindex so the newly written branch mention is findable via
1264    /// the mention probe. Comparing only `(path, mtime)` would skip the reindex
1265    /// and leave a stale tail — the identical same-second staleness window the
1266    /// refs-fingerprint gate closed with nanoseconds.
1267    #[test]
1268    fn upsert_session_reindexes_on_same_second_size_change() {
1269        let index = Index::open_in_memory();
1270        let path = std::path::Path::new("/fake/hot-session.jsonl");
1271        let repo = std::path::Path::new("/home/g/app");
1272        let mtime: i64 = 1_700_000_000; // identical across both writes
1273
1274        // First write: a short tail that does NOT mention the new branch.
1275        index.upsert_session(path, repo, mtime, 50, "[user] starting work");
1276        assert!(
1277            !index
1278                .session_mentions(repo, "feat/just-written")
1279                .contains(&path.to_path_buf()),
1280            "branch is not mentioned yet"
1281        );
1282
1283        // Same-second append grows the file and adds the branch mention.
1284        index.upsert_session(
1285            path,
1286            repo,
1287            mtime, // SAME second
1288            200,   // size grew
1289            "[user] starting work\n[assistant] pushing feat/just-written",
1290        );
1291
1292        assert!(
1293            index
1294                .session_mentions(repo, "feat/just-written")
1295                .contains(&path.to_path_buf()),
1296            "size change in the same second must force reindex so the new mention is findable"
1297        );
1298    }
1299}