Skip to main content

codelens_engine/db/
mod.rs

1use anyhow::{Context, Result};
2use rusqlite::{Connection, OptionalExtension};
3use sha2::{Digest, Sha256};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8mod ops;
9
10#[cfg(test)]
11mod tests;
12
13const SCHEMA_VERSION: i64 = 6;
14
15/// SQLite-backed symbol and import index for a single project.
16pub struct IndexDb {
17    pub(super) conn: Connection,
18}
19
20#[derive(Debug, Clone)]
21pub struct FileRow {
22    pub id: i64,
23    pub relative_path: String,
24    pub mtime_ms: i64,
25    pub content_hash: String,
26    pub size_bytes: i64,
27    pub language: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31pub struct SymbolRow {
32    pub id: i64,
33    pub file_id: i64,
34    pub name: String,
35    pub kind: String,
36    pub line: i64,
37    pub column_num: i64,
38    pub start_byte: i64,
39    pub end_byte: i64,
40    pub signature: String,
41    pub name_path: String,
42    pub parent_id: Option<i64>,
43}
44
45/// Symbol with resolved file path — for embedding pipeline batch processing.
46#[derive(Debug, Clone)]
47pub struct SymbolWithFile {
48    pub name: String,
49    pub kind: String,
50    pub file_path: String,
51    pub line: i64,
52    pub signature: String,
53    pub name_path: String,
54    pub start_byte: i64,
55    pub end_byte: i64,
56}
57
58#[derive(Debug, Clone)]
59pub struct ImportRow {
60    pub source_file_id: i64,
61    pub target_path: String,
62    pub raw_import: String,
63}
64
65#[derive(Debug, Clone, Default, serde::Serialize)]
66pub struct IndexFailureSummary {
67    pub total_failures: usize,
68    pub recent_failures: usize,
69    pub stale_failures: usize,
70    pub persistent_failures: usize,
71}
72
73/// Per-directory aggregate: file count, symbol count, import count.
74#[derive(Debug, Clone, serde::Serialize)]
75pub struct DirStats {
76    pub dir: String,
77    pub files: usize,
78    pub symbols: usize,
79    pub imports_from_others: usize,
80}
81
82/// Symbol data for insertion (no id yet).
83/// Uses borrowed references to avoid String clones during bulk insert.
84#[derive(Debug, Clone)]
85pub struct NewSymbol<'a> {
86    pub name: &'a str,
87    pub kind: &'a str,
88    pub line: i64,
89    pub column_num: i64,
90    pub start_byte: i64,
91    pub end_byte: i64,
92    pub signature: &'a str,
93    pub name_path: &'a str,
94    pub parent_id: Option<i64>,
95}
96
97/// Import data for insertion.
98#[derive(Debug, Clone)]
99pub struct NewImport {
100    pub target_path: String,
101    pub raw_import: String,
102}
103
104/// Call edge data for insertion.
105#[derive(Debug, Clone)]
106pub struct NewCall {
107    pub caller_name: String,
108    pub callee_name: String,
109    pub line: i64,
110}
111
112// Re-export free functions for crate-internal use (e.g. symbols::writer uses db::upsert_file)
113pub(crate) use ops::{
114    all_file_paths, clear_symbol_index, delete_file, get_fresh_file, insert_calls, insert_imports,
115    insert_symbols, upsert_file,
116};
117
118impl IndexDb {
119    /// Open or create the index database at the given path.
120    pub fn open(db_path: &Path) -> Result<Self> {
121        open_derived_sqlite_with_recovery(db_path, "symbol index", || {
122            let conn = Connection::open(db_path)
123                .with_context(|| format!("failed to open db at {}", db_path.display()))?;
124            // `busy_timeout` first — every subsequent PRAGMA (especially
125            // `journal_mode = WAL`, which takes a schema-level write lock)
126            // would otherwise fail with `SQLITE_BUSY` immediately under
127            // contention. See #332 (Error code 5). `page_size` is a silent
128            // no-op on existing files and applies only at DB creation, so
129            // its placement after `busy_timeout` is harmless. `mmap_size`/
130            // `cache_size`/`wal_autocheckpoint` are tuned for the 1+ GB
131            // symbol index on 16 KB Apple Silicon pages (cold-start page-
132            // fault burst was the main pain point).
133            conn.execute_batch(
134                "PRAGMA busy_timeout = 5000; PRAGMA page_size = 16384; PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; PRAGMA foreign_keys = ON; PRAGMA cache_size = -32000; PRAGMA mmap_size = 268435456; PRAGMA wal_autocheckpoint = 8000; PRAGMA auto_vacuum = INCREMENTAL;",
135            )?;
136            let mut db = Self { conn };
137            db.migrate()?;
138            Ok(db)
139        })
140    }
141
142    /// Open existing database in read-only mode (no migration, no WAL creation).
143    /// Returns None if the DB file does not exist.
144    pub fn open_readonly(db_path: &Path) -> Result<Option<Self>> {
145        if !db_path.is_file() {
146            return Ok(None);
147        }
148        let conn = Connection::open_with_flags(
149            db_path,
150            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
151        )
152        .with_context(|| format!("failed to open db readonly at {}", db_path.display()))?;
153        // Read-only path: mmap + larger page cache shrinks the cold-start
154        // b-tree-traversal page-fault burst on 1+ GB indexes (16 KB pages).
155        conn.execute_batch(
156            "PRAGMA busy_timeout = 5000; PRAGMA mmap_size = 268435456; PRAGMA cache_size = -32000;",
157        )?;
158        Ok(Some(Self { conn }))
159    }
160
161    /// Open an in-memory database (for testing).
162    pub fn open_memory() -> Result<Self> {
163        let conn = Connection::open_in_memory()?;
164        conn.execute_batch("PRAGMA foreign_keys = ON;")?;
165        let mut db = Self { conn };
166        db.migrate()?;
167        Ok(db)
168    }
169
170    /// Sequential migrations. Each entry is (version, SQL).
171    /// Applied in order; only migrations newer than the current version run.
172    const MIGRATIONS: &'static [(i64, &'static str)] = &[
173        (
174            1,
175            "CREATE TABLE IF NOT EXISTS files (
176                id INTEGER PRIMARY KEY,
177                relative_path TEXT UNIQUE NOT NULL,
178                mtime_ms INTEGER NOT NULL,
179                content_hash TEXT NOT NULL,
180                size_bytes INTEGER NOT NULL,
181                language TEXT,
182                indexed_at INTEGER NOT NULL
183            );
184            CREATE TABLE IF NOT EXISTS symbols (
185                id INTEGER PRIMARY KEY,
186                file_id INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
187                name TEXT NOT NULL,
188                kind TEXT NOT NULL,
189                line INTEGER NOT NULL,
190                column_num INTEGER NOT NULL,
191                start_byte INTEGER NOT NULL,
192                end_byte INTEGER NOT NULL,
193                signature TEXT NOT NULL,
194                name_path TEXT NOT NULL,
195                parent_id INTEGER REFERENCES symbols(id)
196            );
197            CREATE TABLE IF NOT EXISTS imports (
198                source_file_id INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
199                target_path TEXT NOT NULL,
200                raw_import TEXT NOT NULL,
201                PRIMARY KEY (source_file_id, target_path)
202            );
203            CREATE INDEX IF NOT EXISTS idx_symbols_name ON symbols(name);
204            CREATE INDEX IF NOT EXISTS idx_symbols_file ON symbols(file_id);
205            CREATE INDEX IF NOT EXISTS idx_symbols_name_path ON symbols(name_path);
206            CREATE INDEX IF NOT EXISTS idx_imports_target ON imports(target_path);",
207        ),
208        (
209            2,
210            "CREATE TABLE IF NOT EXISTS calls (
211                id INTEGER PRIMARY KEY,
212                caller_file_id INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
213                caller_name TEXT NOT NULL,
214                callee_name TEXT NOT NULL,
215                line INTEGER NOT NULL
216            );
217            CREATE INDEX IF NOT EXISTS idx_calls_callee ON calls(callee_name);
218            CREATE INDEX IF NOT EXISTS idx_calls_caller ON calls(caller_name);
219            CREATE INDEX IF NOT EXISTS idx_calls_file ON calls(caller_file_id);",
220        ),
221        (
222            3,
223            "CREATE TABLE IF NOT EXISTS index_failures (
224                id INTEGER PRIMARY KEY,
225                file_path TEXT NOT NULL,
226                error_type TEXT NOT NULL,
227                error_message TEXT NOT NULL,
228                failed_at INTEGER NOT NULL,
229                retry_count INTEGER NOT NULL DEFAULT 0,
230                UNIQUE(file_path)
231            );
232            CREATE INDEX IF NOT EXISTS idx_failures_path ON index_failures(file_path);",
233        ),
234        (
235            4,
236            "CREATE VIRTUAL TABLE IF NOT EXISTS symbols_fts USING fts5(
237                name, name_path, signature,
238                content=symbols, content_rowid=id,
239                tokenize='unicode61 remove_diacritics 2 separators _'
240            );",
241        ),
242        (
243            5,
244            // Composite index: eliminates TEMP B-TREE sort for ranked_context / all_symbols_with_bytes
245            // Kind index: accelerates files_with_symbol_kinds (type_hierarchy, etc.)
246            "CREATE INDEX IF NOT EXISTS idx_symbols_file_byte ON symbols(file_id, start_byte);
247             CREATE INDEX IF NOT EXISTS idx_symbols_kind ON symbols(kind);",
248        ),
249        (
250            6,
251            // Rebuild FTS with underscore separator so snake_case names are tokenized:
252            // "parse_symbols" → ["parse", "symbols"] enabling FTS match on individual words.
253            "DROP TABLE IF EXISTS symbols_fts;
254             CREATE VIRTUAL TABLE IF NOT EXISTS symbols_fts USING fts5(
255                name, name_path, signature,
256                content=symbols, content_rowid=id,
257                tokenize='unicode61 remove_diacritics 2 separators _'
258             );",
259        ),
260    ];
261
262    fn migrate(&mut self) -> Result<()> {
263        self.conn.execute_batch(
264            "CREATE TABLE IF NOT EXISTS meta (
265                key TEXT PRIMARY KEY,
266                value TEXT NOT NULL
267            );",
268        )?;
269
270        let version: Option<i64> = self
271            .conn
272            .query_row(
273                "SELECT CAST(value AS INTEGER) FROM meta WHERE key = 'schema_version'",
274                [],
275                |row| row.get(0),
276            )
277            .optional()?;
278        let current = version.unwrap_or(0);
279
280        if current >= SCHEMA_VERSION {
281            return Ok(());
282        }
283
284        let tx = self.conn.transaction()?;
285        for &(ver, sql) in Self::MIGRATIONS {
286            if current < ver {
287                tx.execute_batch(sql)?;
288                tx.execute(
289                    "INSERT OR REPLACE INTO meta (key, value) VALUES ('schema_version', ?1)",
290                    rusqlite::params![ver.to_string()],
291                )?;
292            }
293        }
294        tx.commit()?;
295        Ok(())
296    }
297
298    // ---- Transaction support ----
299
300    /// Execute a closure within an RAII transaction.
301    /// Automatically rolls back on error or panic; commits only on success.
302    pub fn with_transaction<F, T>(&mut self, mut f: F) -> Result<T>
303    where
304        F: FnMut(&Connection) -> Result<T>,
305    {
306        const MAX_ATTEMPTS: usize = 4;
307        const BACKOFF_MS: [u64; MAX_ATTEMPTS - 1] = [25, 75, 150];
308
309        let mut attempt = 0usize;
310        loop {
311            let tx = match self.conn.transaction() {
312                Ok(tx) => tx,
313                Err(error) if is_lock_contention(&error) && attempt + 1 < MAX_ATTEMPTS => {
314                    std::thread::sleep(Duration::from_millis(BACKOFF_MS[attempt]));
315                    attempt += 1;
316                    continue;
317                }
318                Err(error) => return Err(error.into()),
319            };
320
321            match f(&tx) {
322                Ok(result) => match tx.commit() {
323                    Ok(()) => return Ok(result),
324                    Err(error) if is_lock_contention(&error) && attempt + 1 < MAX_ATTEMPTS => {
325                        std::thread::sleep(Duration::from_millis(BACKOFF_MS[attempt]));
326                        attempt += 1;
327                    }
328                    Err(error) => return Err(error.into()),
329                },
330                Err(error) if is_lock_contention_anyhow(&error) && attempt + 1 < MAX_ATTEMPTS => {
331                    drop(tx);
332                    std::thread::sleep(Duration::from_millis(BACKOFF_MS[attempt]));
333                    attempt += 1;
334                }
335                Err(error) => return Err(error),
336            }
337        }
338    }
339
340    pub(crate) fn checkpoint_wal_passive(&self) -> Result<(i64, i64, i64)> {
341        let summary = self
342            .conn
343            .query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |row| {
344                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
345            })?;
346        Ok(summary)
347    }
348}
349
350pub(crate) fn open_derived_sqlite_with_recovery<T, F>(
351    db_path: &Path,
352    kind: &str,
353    mut init: F,
354) -> Result<T>
355where
356    F: FnMut() -> Result<T>,
357{
358    ensure_db_parent_dir(db_path)?;
359
360    match init() {
361        Ok(value) => Ok(value),
362        Err(error) if is_recoverable_sqlite_anyhow(&error) => {
363            let backups = quarantine_corrupt_sqlite_files(db_path)?;
364            tracing::warn!(
365                path = %db_path.display(),
366                kind,
367                backups = ?backups,
368                error = %error,
369                "recovering derived sqlite index from corruption"
370            );
371            init().with_context(|| {
372                format!(
373                    "failed to recreate recovered {} at {}",
374                    kind,
375                    db_path.display()
376                )
377            })
378        }
379        Err(error) => Err(error),
380    }
381}
382
383fn is_lock_contention(error: &rusqlite::Error) -> bool {
384    matches!(
385        error,
386        rusqlite::Error::SqliteFailure(code, _)
387            if matches!(
388                code.code,
389                rusqlite::ErrorCode::DatabaseBusy | rusqlite::ErrorCode::DatabaseLocked
390            )
391    )
392}
393
394fn is_lock_contention_anyhow(error: &anyhow::Error) -> bool {
395    error.chain().any(|cause| {
396        cause
397            .downcast_ref::<rusqlite::Error>()
398            .is_some_and(is_lock_contention)
399    })
400}
401
402fn ensure_db_parent_dir(db_path: &Path) -> Result<()> {
403    if let Some(parent) = db_path.parent() {
404        fs::create_dir_all(parent)
405            .with_context(|| format!("failed to create {}", parent.display()))?;
406    }
407    Ok(())
408}
409
410fn is_recoverable_sqlite_error(error: &rusqlite::Error) -> bool {
411    matches!(
412        error,
413        rusqlite::Error::SqliteFailure(code, maybe_msg)
414            if matches!(
415                code.code,
416                rusqlite::ErrorCode::SystemIoFailure
417                    | rusqlite::ErrorCode::DatabaseCorrupt
418                    | rusqlite::ErrorCode::NotADatabase
419            ) || maybe_msg
420                .as_deref()
421                .is_some_and(sqlite_message_suggests_recovery)
422    )
423}
424
425fn is_recoverable_sqlite_anyhow(error: &anyhow::Error) -> bool {
426    error.chain().any(|cause| {
427        cause
428            .downcast_ref::<rusqlite::Error>()
429            .is_some_and(is_recoverable_sqlite_error)
430            || sqlite_message_suggests_recovery(&cause.to_string())
431    })
432}
433
434fn sqlite_message_suggests_recovery(message: &str) -> bool {
435    let message = message.to_ascii_lowercase();
436    message.contains("disk i/o error")
437        || message.contains("database disk image is malformed")
438        || message.contains("file is not a database")
439}
440
441fn quarantine_corrupt_sqlite_files(db_path: &Path) -> Result<Vec<PathBuf>> {
442    let suffix = format!(
443        "corrupt-{}-{}",
444        SystemTime::now()
445            .duration_since(UNIX_EPOCH)
446            .unwrap_or_default()
447            .as_millis(),
448        std::process::id()
449    );
450    let mut backups = Vec::new();
451
452    for path in sqlite_related_paths(db_path) {
453        if !path.exists() {
454            continue;
455        }
456
457        let file_name = path
458            .file_name()
459            .map(|name| name.to_string_lossy().into_owned())
460            .unwrap_or_else(|| "sqlite-index".to_owned());
461        let backup_path = path.with_file_name(format!("{file_name}.{suffix}"));
462
463        match fs::rename(&path, &backup_path) {
464            Ok(()) => backups.push(backup_path),
465            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
466            Err(error) => {
467                return Err(error).with_context(|| {
468                    format!(
469                        "failed to quarantine corrupt sqlite file {}",
470                        path.display()
471                    )
472                });
473            }
474        }
475    }
476
477    Ok(backups)
478}
479
480fn sqlite_related_paths(db_path: &Path) -> [PathBuf; 3] {
481    let file_name = db_path.file_name().unwrap_or_default();
482
483    let mut wal_name = file_name.to_os_string();
484    wal_name.push("-wal");
485
486    let mut shm_name = file_name.to_os_string();
487    shm_name.push("-shm");
488
489    [
490        db_path.to_path_buf(),
491        db_path.with_file_name(wal_name),
492        db_path.with_file_name(shm_name),
493    ]
494}
495
496/// Compute SHA-256 hex digest of content.
497pub fn content_hash(content: &[u8]) -> String {
498    let mut hasher = Sha256::new();
499    hasher.update(content);
500    format!("{:x}", hasher.finalize())
501}
502
503/// Standard path for the index database within a project.
504pub fn index_db_path(project_root: &Path) -> PathBuf {
505    project_root.join(".codelens/index/symbols.db")
506}