Skip to main content

codelens_engine/db/
mod.rs

1use anyhow::{Context, Result};
2use rusqlite::{Connection, OptionalExtension};
3use sha2::{Digest, Sha256};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8mod ops;
9
10#[cfg(test)]
11mod tests;
12
13const SCHEMA_VERSION: i64 = 6;
14
15/// SQLite-backed symbol and import index for a single project.
16pub struct IndexDb {
17    pub(super) conn: Connection,
18}
19
20#[derive(Debug, Clone)]
21pub struct FileRow {
22    pub id: i64,
23    pub relative_path: String,
24    pub mtime_ms: i64,
25    pub content_hash: String,
26    pub size_bytes: i64,
27    pub language: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31pub struct SymbolRow {
32    pub id: i64,
33    pub file_id: i64,
34    pub name: String,
35    pub kind: String,
36    pub line: i64,
37    pub column_num: i64,
38    pub start_byte: i64,
39    pub end_byte: i64,
40    pub signature: String,
41    pub name_path: String,
42    pub parent_id: Option<i64>,
43}
44
45/// Symbol with resolved file path — for embedding pipeline batch processing.
46#[derive(Debug, Clone)]
47pub struct SymbolWithFile {
48    pub name: String,
49    pub kind: String,
50    pub file_path: String,
51    pub line: i64,
52    pub signature: String,
53    pub name_path: String,
54    pub start_byte: i64,
55    pub end_byte: i64,
56}
57
58#[derive(Debug, Clone)]
59pub struct ImportRow {
60    pub source_file_id: i64,
61    pub target_path: String,
62    pub raw_import: String,
63}
64
65#[derive(Debug, Clone, Default, serde::Serialize)]
66pub struct IndexFailureSummary {
67    pub total_failures: usize,
68    pub recent_failures: usize,
69    pub stale_failures: usize,
70    pub persistent_failures: usize,
71}
72
73/// Per-directory aggregate: file count, symbol count, import count.
74#[derive(Debug, Clone, serde::Serialize)]
75pub struct DirStats {
76    pub dir: String,
77    pub files: usize,
78    pub symbols: usize,
79    pub imports_from_others: usize,
80}
81
82/// Symbol data for insertion (no id yet).
83/// Uses borrowed references to avoid String clones during bulk insert.
84#[derive(Debug, Clone)]
85pub struct NewSymbol<'a> {
86    pub name: &'a str,
87    pub kind: &'a str,
88    pub line: i64,
89    pub column_num: i64,
90    pub start_byte: i64,
91    pub end_byte: i64,
92    pub signature: &'a str,
93    pub name_path: &'a str,
94    pub parent_id: Option<i64>,
95}
96
97/// Import data for insertion.
98#[derive(Debug, Clone)]
99pub struct NewImport {
100    pub target_path: String,
101    pub raw_import: String,
102}
103
104/// Call edge data for insertion.
105#[derive(Debug, Clone)]
106pub struct NewCall {
107    pub caller_name: String,
108    pub callee_name: String,
109    pub line: i64,
110}
111
112// Re-export free functions for crate-internal use (e.g. symbols::writer uses db::upsert_file)
113pub(crate) use ops::{
114    all_file_paths, delete_file, get_fresh_file, insert_calls, insert_imports, insert_symbols,
115    upsert_file,
116};
117
118impl IndexDb {
119    /// Open or create the index database at the given path.
120    pub fn open(db_path: &Path) -> Result<Self> {
121        open_derived_sqlite_with_recovery(db_path, "symbol index", || {
122            let conn = Connection::open(db_path)
123                .with_context(|| format!("failed to open db at {}", db_path.display()))?;
124            conn.execute_batch(
125                "PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; PRAGMA foreign_keys = ON; PRAGMA busy_timeout = 5000; PRAGMA cache_size = -8000; PRAGMA auto_vacuum = INCREMENTAL;",
126            )?;
127            let mut db = Self { conn };
128            db.migrate()?;
129            Ok(db)
130        })
131    }
132
133    /// Open existing database in read-only mode (no migration, no WAL creation).
134    /// Returns None if the DB file does not exist.
135    pub fn open_readonly(db_path: &Path) -> Result<Option<Self>> {
136        if !db_path.is_file() {
137            return Ok(None);
138        }
139        let conn = Connection::open_with_flags(
140            db_path,
141            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
142        )
143        .with_context(|| format!("failed to open db readonly at {}", db_path.display()))?;
144        conn.execute_batch("PRAGMA busy_timeout = 5000;")?;
145        Ok(Some(Self { conn }))
146    }
147
148    /// Open an in-memory database (for testing).
149    pub fn open_memory() -> Result<Self> {
150        let conn = Connection::open_in_memory()?;
151        conn.execute_batch("PRAGMA foreign_keys = ON;")?;
152        let mut db = Self { conn };
153        db.migrate()?;
154        Ok(db)
155    }
156
157    /// Sequential migrations. Each entry is (version, SQL).
158    /// Applied in order; only migrations newer than the current version run.
159    const MIGRATIONS: &'static [(i64, &'static str)] = &[
160        (
161            1,
162            "CREATE TABLE IF NOT EXISTS files (
163                id INTEGER PRIMARY KEY,
164                relative_path TEXT UNIQUE NOT NULL,
165                mtime_ms INTEGER NOT NULL,
166                content_hash TEXT NOT NULL,
167                size_bytes INTEGER NOT NULL,
168                language TEXT,
169                indexed_at INTEGER NOT NULL
170            );
171            CREATE TABLE IF NOT EXISTS symbols (
172                id INTEGER PRIMARY KEY,
173                file_id INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
174                name TEXT NOT NULL,
175                kind TEXT NOT NULL,
176                line INTEGER NOT NULL,
177                column_num INTEGER NOT NULL,
178                start_byte INTEGER NOT NULL,
179                end_byte INTEGER NOT NULL,
180                signature TEXT NOT NULL,
181                name_path TEXT NOT NULL,
182                parent_id INTEGER REFERENCES symbols(id)
183            );
184            CREATE TABLE IF NOT EXISTS imports (
185                source_file_id INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
186                target_path TEXT NOT NULL,
187                raw_import TEXT NOT NULL,
188                PRIMARY KEY (source_file_id, target_path)
189            );
190            CREATE INDEX IF NOT EXISTS idx_symbols_name ON symbols(name);
191            CREATE INDEX IF NOT EXISTS idx_symbols_file ON symbols(file_id);
192            CREATE INDEX IF NOT EXISTS idx_symbols_name_path ON symbols(name_path);
193            CREATE INDEX IF NOT EXISTS idx_imports_target ON imports(target_path);",
194        ),
195        (
196            2,
197            "CREATE TABLE IF NOT EXISTS calls (
198                id INTEGER PRIMARY KEY,
199                caller_file_id INTEGER NOT NULL REFERENCES files(id) ON DELETE CASCADE,
200                caller_name TEXT NOT NULL,
201                callee_name TEXT NOT NULL,
202                line INTEGER NOT NULL
203            );
204            CREATE INDEX IF NOT EXISTS idx_calls_callee ON calls(callee_name);
205            CREATE INDEX IF NOT EXISTS idx_calls_caller ON calls(caller_name);
206            CREATE INDEX IF NOT EXISTS idx_calls_file ON calls(caller_file_id);",
207        ),
208        (
209            3,
210            "CREATE TABLE IF NOT EXISTS index_failures (
211                id INTEGER PRIMARY KEY,
212                file_path TEXT NOT NULL,
213                error_type TEXT NOT NULL,
214                error_message TEXT NOT NULL,
215                failed_at INTEGER NOT NULL,
216                retry_count INTEGER NOT NULL DEFAULT 0,
217                UNIQUE(file_path)
218            );
219            CREATE INDEX IF NOT EXISTS idx_failures_path ON index_failures(file_path);",
220        ),
221        (
222            4,
223            "CREATE VIRTUAL TABLE IF NOT EXISTS symbols_fts USING fts5(
224                name, name_path, signature,
225                content=symbols, content_rowid=id,
226                tokenize='unicode61 remove_diacritics 2 separators _'
227            );",
228        ),
229        (
230            5,
231            // Composite index: eliminates TEMP B-TREE sort for ranked_context / all_symbols_with_bytes
232            // Kind index: accelerates files_with_symbol_kinds (type_hierarchy, etc.)
233            "CREATE INDEX IF NOT EXISTS idx_symbols_file_byte ON symbols(file_id, start_byte);
234             CREATE INDEX IF NOT EXISTS idx_symbols_kind ON symbols(kind);",
235        ),
236        (
237            6,
238            // Rebuild FTS with underscore separator so snake_case names are tokenized:
239            // "parse_symbols" → ["parse", "symbols"] enabling FTS match on individual words.
240            "DROP TABLE IF EXISTS symbols_fts;
241             CREATE VIRTUAL TABLE IF NOT EXISTS symbols_fts USING fts5(
242                name, name_path, signature,
243                content=symbols, content_rowid=id,
244                tokenize='unicode61 remove_diacritics 2 separators _'
245             );",
246        ),
247    ];
248
249    fn migrate(&mut self) -> Result<()> {
250        self.conn.execute_batch(
251            "CREATE TABLE IF NOT EXISTS meta (
252                key TEXT PRIMARY KEY,
253                value TEXT NOT NULL
254            );",
255        )?;
256
257        let version: Option<i64> = self
258            .conn
259            .query_row(
260                "SELECT CAST(value AS INTEGER) FROM meta WHERE key = 'schema_version'",
261                [],
262                |row| row.get(0),
263            )
264            .optional()?;
265        let current = version.unwrap_or(0);
266
267        if current >= SCHEMA_VERSION {
268            return Ok(());
269        }
270
271        let tx = self.conn.transaction()?;
272        for &(ver, sql) in Self::MIGRATIONS {
273            if current < ver {
274                tx.execute_batch(sql)?;
275                tx.execute(
276                    "INSERT OR REPLACE INTO meta (key, value) VALUES ('schema_version', ?1)",
277                    rusqlite::params![ver.to_string()],
278                )?;
279            }
280        }
281        tx.commit()?;
282        Ok(())
283    }
284
285    // ---- Transaction support ----
286
287    /// Execute a closure within an RAII transaction.
288    /// Automatically rolls back on error or panic; commits only on success.
289    pub fn with_transaction<F, T>(&mut self, mut f: F) -> Result<T>
290    where
291        F: FnMut(&Connection) -> Result<T>,
292    {
293        const MAX_ATTEMPTS: usize = 4;
294        const BACKOFF_MS: [u64; MAX_ATTEMPTS - 1] = [25, 75, 150];
295
296        let mut attempt = 0usize;
297        loop {
298            let tx = match self.conn.transaction() {
299                Ok(tx) => tx,
300                Err(error) if is_lock_contention(&error) && attempt + 1 < MAX_ATTEMPTS => {
301                    std::thread::sleep(Duration::from_millis(BACKOFF_MS[attempt]));
302                    attempt += 1;
303                    continue;
304                }
305                Err(error) => return Err(error.into()),
306            };
307
308            match f(&tx) {
309                Ok(result) => match tx.commit() {
310                    Ok(()) => return Ok(result),
311                    Err(error) if is_lock_contention(&error) && attempt + 1 < MAX_ATTEMPTS => {
312                        std::thread::sleep(Duration::from_millis(BACKOFF_MS[attempt]));
313                        attempt += 1;
314                    }
315                    Err(error) => return Err(error.into()),
316                },
317                Err(error) if is_lock_contention_anyhow(&error) && attempt + 1 < MAX_ATTEMPTS => {
318                    drop(tx);
319                    std::thread::sleep(Duration::from_millis(BACKOFF_MS[attempt]));
320                    attempt += 1;
321                }
322                Err(error) => return Err(error),
323            }
324        }
325    }
326}
327
328pub(crate) fn open_derived_sqlite_with_recovery<T, F>(
329    db_path: &Path,
330    kind: &str,
331    mut init: F,
332) -> Result<T>
333where
334    F: FnMut() -> Result<T>,
335{
336    ensure_db_parent_dir(db_path)?;
337
338    match init() {
339        Ok(value) => Ok(value),
340        Err(error) if is_recoverable_sqlite_anyhow(&error) => {
341            let backups = quarantine_corrupt_sqlite_files(db_path)?;
342            tracing::warn!(
343                path = %db_path.display(),
344                kind,
345                backups = ?backups,
346                error = %error,
347                "recovering derived sqlite index from corruption"
348            );
349            init().with_context(|| {
350                format!(
351                    "failed to recreate recovered {} at {}",
352                    kind,
353                    db_path.display()
354                )
355            })
356        }
357        Err(error) => Err(error),
358    }
359}
360
361fn is_lock_contention(error: &rusqlite::Error) -> bool {
362    matches!(
363        error,
364        rusqlite::Error::SqliteFailure(code, _)
365            if matches!(
366                code.code,
367                rusqlite::ErrorCode::DatabaseBusy | rusqlite::ErrorCode::DatabaseLocked
368            )
369    )
370}
371
372fn is_lock_contention_anyhow(error: &anyhow::Error) -> bool {
373    error.chain().any(|cause| {
374        cause
375            .downcast_ref::<rusqlite::Error>()
376            .is_some_and(is_lock_contention)
377    })
378}
379
380fn ensure_db_parent_dir(db_path: &Path) -> Result<()> {
381    if let Some(parent) = db_path.parent() {
382        fs::create_dir_all(parent)
383            .with_context(|| format!("failed to create {}", parent.display()))?;
384    }
385    Ok(())
386}
387
388fn is_recoverable_sqlite_error(error: &rusqlite::Error) -> bool {
389    matches!(
390        error,
391        rusqlite::Error::SqliteFailure(code, maybe_msg)
392            if matches!(
393                code.code,
394                rusqlite::ErrorCode::SystemIoFailure
395                    | rusqlite::ErrorCode::DatabaseCorrupt
396                    | rusqlite::ErrorCode::NotADatabase
397            ) || maybe_msg
398                .as_deref()
399                .is_some_and(sqlite_message_suggests_recovery)
400    )
401}
402
403fn is_recoverable_sqlite_anyhow(error: &anyhow::Error) -> bool {
404    error.chain().any(|cause| {
405        cause
406            .downcast_ref::<rusqlite::Error>()
407            .is_some_and(is_recoverable_sqlite_error)
408            || sqlite_message_suggests_recovery(&cause.to_string())
409    })
410}
411
412fn sqlite_message_suggests_recovery(message: &str) -> bool {
413    let message = message.to_ascii_lowercase();
414    message.contains("disk i/o error")
415        || message.contains("database disk image is malformed")
416        || message.contains("file is not a database")
417}
418
419fn quarantine_corrupt_sqlite_files(db_path: &Path) -> Result<Vec<PathBuf>> {
420    let suffix = format!(
421        "corrupt-{}-{}",
422        SystemTime::now()
423            .duration_since(UNIX_EPOCH)
424            .unwrap_or_default()
425            .as_millis(),
426        std::process::id()
427    );
428    let mut backups = Vec::new();
429
430    for path in sqlite_related_paths(db_path) {
431        if !path.exists() {
432            continue;
433        }
434
435        let file_name = path
436            .file_name()
437            .map(|name| name.to_string_lossy().into_owned())
438            .unwrap_or_else(|| "sqlite-index".to_owned());
439        let backup_path = path.with_file_name(format!("{file_name}.{suffix}"));
440
441        match fs::rename(&path, &backup_path) {
442            Ok(()) => backups.push(backup_path),
443            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
444            Err(error) => {
445                return Err(error).with_context(|| {
446                    format!(
447                        "failed to quarantine corrupt sqlite file {}",
448                        path.display()
449                    )
450                });
451            }
452        }
453    }
454
455    Ok(backups)
456}
457
458fn sqlite_related_paths(db_path: &Path) -> [PathBuf; 3] {
459    let file_name = db_path.file_name().unwrap_or_default();
460
461    let mut wal_name = file_name.to_os_string();
462    wal_name.push("-wal");
463
464    let mut shm_name = file_name.to_os_string();
465    shm_name.push("-shm");
466
467    [
468        db_path.to_path_buf(),
469        db_path.with_file_name(wal_name),
470        db_path.with_file_name(shm_name),
471    ]
472}
473
474/// Compute SHA-256 hex digest of content.
475pub fn content_hash(content: &[u8]) -> String {
476    let mut hasher = Sha256::new();
477    hasher.update(content);
478    format!("{:x}", hasher.finalize())
479}
480
481/// Standard path for the index database within a project.
482pub fn index_db_path(project_root: &Path) -> PathBuf {
483    project_root.join(".codelens/index/symbols.db")
484}