Skip to main content

lexa_core/
db.rs

1use std::collections::hash_map::DefaultHasher;
2use std::ffi::{c_char, c_int};
3use std::fs;
4use std::hash::{Hash, Hasher};
5use std::path::{Path, PathBuf};
6use std::sync::{Mutex, Once, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use rusqlite::{params, Connection, OptionalExtension, Transaction};
10use walkdir::WalkDir;
11
12use crate::chunk::{chunk_text_for_path, supported_kind};
13use crate::embed::{
14    matryoshka_truncate, vector_blob, Embedder, EmbeddingConfig, Reranker, EMBEDDING_DIMS,
15    PREVIEW_DIMS,
16};
17use crate::search::{search_impl, SearchOptions};
18use crate::types::{Document, IndexStats, LexaError, SearchHit};
19use crate::Result;
20
21static SQLITE_VEC: Once = Once::new();
22const MAX_FILE_BYTES: u64 = 10 * 1024 * 1024;
23
24pub struct LexaDb {
25    path: PathBuf,
26    conn: Connection,
27    embedding_config: EmbeddingConfig,
28    embedder: OnceLock<Mutex<Embedder>>,
29    reranker: OnceLock<Mutex<Reranker>>,
30}
31
32pub fn default_db_path() -> PathBuf {
33    std::env::var_os("HOME")
34        .map(PathBuf::from)
35        .unwrap_or_else(|| PathBuf::from("."))
36        .join(".lexa")
37        .join("index.sqlite")
38}
39
40pub fn open(path: impl AsRef<Path>, embedding_config: EmbeddingConfig) -> Result<LexaDb> {
41    LexaDb::open(path, embedding_config)
42}
43
44impl LexaDb {
45    pub fn open(path: impl AsRef<Path>, embedding_config: EmbeddingConfig) -> Result<Self> {
46        register_sqlite_vec();
47        let path = path.as_ref().to_path_buf();
48        if let Some(parent) = path.parent() {
49            fs::create_dir_all(parent)?;
50        }
51        let conn = Connection::open(&path)?;
52        apply_pragmas(&conn)?;
53        migrate(&conn)?;
54        Ok(Self {
55            path,
56            conn,
57            embedding_config,
58            embedder: OnceLock::new(),
59            reranker: OnceLock::new(),
60        })
61    }
62
63    pub fn path(&self) -> &Path {
64        &self.path
65    }
66
67    pub fn embedder(&self) -> Result<&Mutex<Embedder>> {
68        if let Some(cached) = self.embedder.get() {
69            return Ok(cached);
70        }
71        let embedder = Embedder::new(&self.embedding_config)?;
72        Ok(self.embedder.get_or_init(|| Mutex::new(embedder)))
73    }
74
75    pub fn reranker(&self) -> Result<&Mutex<Reranker>> {
76        if let Some(cached) = self.reranker.get() {
77            return Ok(cached);
78        }
79        let reranker = Reranker::new(&self.embedding_config)?;
80        Ok(self.reranker.get_or_init(|| Mutex::new(reranker)))
81    }
82
83    pub fn index_path(&mut self, path: impl AsRef<Path>) -> Result<usize> {
84        self.index_path_with_preprocessor::<()>(
85            path,
86            None::<&dyn Preprocessor<Payload = ()>>,
87            |_, _, _| Ok(()),
88        )
89    }
90
91    /// Index a path with a per-file **preprocessor** and **sidecar
92    /// commit hook**.
93    ///
94    /// `preprocessor` is invoked for every supported file before chunking;
95    /// it receives the raw bytes and may return `Some(PreprocessedDoc)` to
96    /// substitute the text used for chunking (e.g. strip Obsidian
97    /// frontmatter so it doesn't leak into the embedding) along with a
98    /// caller-defined `payload` of metadata. Returning `None` skips the
99    /// file. Returning the unmodified text + `Default::default()` payload
100    /// matches the plain `index_path` behaviour.
101    ///
102    /// `commit_sidecar` runs **inside** the same transaction as the
103    /// chunk inserts, so the caller's sidecar tables (e.g.
104    /// `note_metadata`, `note_links`, `note_tags`) stay consistent with
105    /// `documents` even on crash.
106    pub fn index_path_with_preprocessor<P>(
107        &mut self,
108        path: impl AsRef<Path>,
109        preprocessor: Option<&dyn Preprocessor<Payload = P>>,
110        commit_sidecar: impl Fn(&Transaction<'_>, i64, &P) -> Result<()>,
111    ) -> Result<usize>
112    where
113        P: Default,
114    {
115        const BATCH: usize = 64;
116        let files = collect_files(path.as_ref())?;
117        let mut prepared: Vec<PreparedDoc<P>> = Vec::new();
118        let mut pending_texts: Vec<String> = Vec::new();
119        let mut indexed = 0;
120
121        for file in files {
122            let Some(doc) = prepare_document_with(&file, preprocessor)? else {
123                continue;
124            };
125            if self.is_unchanged(&doc)? {
126                continue;
127            }
128            for chunk in &doc.chunks {
129                pending_texts.push(match &chunk.context {
130                    Some(context) => format!("{context}\n{}", chunk.text),
131                    None => chunk.text.clone(),
132                });
133            }
134            prepared.push(doc);
135
136            if prepared.len() >= BATCH {
137                indexed += self.flush_batch(&mut prepared, &mut pending_texts, &commit_sidecar)?;
138            }
139        }
140        if !prepared.is_empty() {
141            indexed += self.flush_batch(&mut prepared, &mut pending_texts, &commit_sidecar)?;
142        }
143        Ok(indexed)
144    }
145
146    fn is_unchanged<P>(&self, doc: &PreparedDoc<P>) -> Result<bool> {
147        let row: Option<String> = self
148            .conn
149            .query_row(
150                "SELECT content_hash FROM documents WHERE path = ?1",
151                params![doc.path],
152                |row| row.get(0),
153            )
154            .optional()?;
155        Ok(matches!(row, Some(hash) if hash == doc.content_hash))
156    }
157
158    fn flush_batch<P>(
159        &mut self,
160        prepared: &mut Vec<PreparedDoc<P>>,
161        pending_texts: &mut Vec<String>,
162        commit_sidecar: &dyn Fn(&Transaction<'_>, i64, &P) -> Result<()>,
163    ) -> Result<usize> {
164        if prepared.is_empty() {
165            return Ok(0);
166        }
167        let embeddings = {
168            let lock = self.embedder()?;
169            let mut guard = lock
170                .lock()
171                .map_err(|err| LexaError::Embedding(err.to_string()))?;
172            guard.embed_documents(pending_texts)?
173        };
174        pending_texts.clear();
175
176        // Defensive guard: an embedding of unexpected length would silently
177        // corrupt the binary-quantized vector table. Fail fast instead.
178        for embedding in &embeddings {
179            if embedding.len() != EMBEDDING_DIMS {
180                return Err(LexaError::Embedding(format!(
181                    "expected {EMBEDDING_DIMS} embedding dims, got {}",
182                    embedding.len()
183                )));
184            }
185        }
186
187        let tx = self.conn.transaction()?;
188        let mut cursor = 0usize;
189        let mut indexed = 0;
190        for doc in prepared.drain(..) {
191            let count = doc.chunks.len();
192            let slice = &embeddings[cursor..cursor + count];
193            cursor += count;
194            let doc_id = insert_document(&tx, &doc, slice)?;
195            commit_sidecar(&tx, doc_id, &doc.payload)?;
196            indexed += 1;
197        }
198        tx.commit()?;
199        Ok(indexed)
200    }
201
202    pub fn purge_path(&mut self, path: impl AsRef<Path>) -> Result<usize> {
203        let root = canonical(path.as_ref())?;
204        let tx = self.conn.transaction()?;
205        let docs = matching_docs(&tx, &root)?;
206        for doc in &docs {
207            delete_document(&tx, doc.id)?;
208        }
209        tx.commit()?;
210        Ok(docs.len())
211    }
212
213    pub fn search(&self, options: &SearchOptions) -> Result<Vec<SearchHit>> {
214        search_impl(self, options)
215    }
216
217    /// Borrow the underlying SQLite connection. Useful for crates that
218    /// extend the schema (e.g. `lexa-obsidian` adds `note_metadata`,
219    /// `note_links`, `note_tags`, `note_blocks` sidecar tables) and
220    /// need to run their own SQL on the same connection rather than
221    /// opening a second one (which would lock the WAL).
222    pub fn conn(&self) -> &Connection {
223        &self.conn
224    }
225
226    pub fn list_documents(&self) -> Result<Vec<Document>> {
227        let mut stmt = self.conn.prepare(
228            "SELECT id, path, mtime, size, content_hash, indexed_at FROM documents ORDER BY path",
229        )?;
230        let rows = stmt.query_map([], |row| {
231            Ok(Document {
232                id: row.get(0)?,
233                path: row.get(1)?,
234                mtime: row.get(2)?,
235                size: row.get(3)?,
236                content_hash: row.get(4)?,
237                indexed_at: row.get(5)?,
238            })
239        })?;
240        rows.collect::<std::result::Result<Vec<_>, _>>()
241            .map_err(Into::into)
242    }
243
244    pub fn stats(&self) -> Result<IndexStats> {
245        let documents = self
246            .conn
247            .query_row("SELECT count(*) FROM documents", [], |row| row.get(0))?;
248        let chunks = self
249            .conn
250            .query_row("SELECT count(*) FROM chunks", [], |row| row.get(0))?;
251        Ok(IndexStats {
252            db_path: self.path.clone(),
253            documents,
254            chunks,
255        })
256    }
257}
258
259/// Per-file callback signature used by `index_path_with_preprocessor`.
260///
261/// Receives the file path and the raw bytes; may return `Some(...)` to
262/// supply a substitute body (e.g. with frontmatter stripped) and a
263/// `payload` with sidecar metadata. Returning `None` skips the file.
264pub trait Preprocessor {
265    type Payload: Default;
266
267    fn preprocess(
268        &self,
269        path: &Path,
270        bytes: &[u8],
271    ) -> Result<Option<PreprocessOutput<Self::Payload>>>;
272}
273
274/// Output of [`Preprocessor::preprocess`].
275pub struct PreprocessOutput<P> {
276    /// Text used for chunking + embedding. Replaces the raw file body.
277    pub text: String,
278    /// Caller payload threaded into the `commit_sidecar` callback so
279    /// custom tables can be populated inside the same transaction as
280    /// the chunk insert.
281    pub payload: P,
282}
283
284struct PreparedDoc<P> {
285    path: String,
286    mtime: i64,
287    size: i64,
288    content_hash: String,
289    indexed_at: i64,
290    chunks: Vec<crate::chunk::RawChunk>,
291    payload: P,
292}
293
294fn prepare_document_with<P>(
295    path: &Path,
296    preprocessor: Option<&dyn Preprocessor<Payload = P>>,
297) -> Result<Option<PreparedDoc<P>>>
298where
299    P: Default,
300{
301    let Some(kind) = supported_kind(path) else {
302        return Ok(None);
303    };
304    let metadata = fs::metadata(path)?;
305    if !metadata.is_file() || metadata.len() > MAX_FILE_BYTES {
306        return Ok(None);
307    }
308    let bytes = fs::read(path)?;
309    let raw_text = if kind == "pdf" {
310        pdf_extract::extract_text(path).map_err(|error| LexaError::Pdf(error.to_string()))?
311    } else {
312        if bytes.iter().take(4096).any(|byte| *byte == 0) {
313            return Ok(None);
314        }
315        String::from_utf8_lossy(&bytes).replace("\r\n", "\n")
316    };
317
318    let (text, payload): (String, P) = match preprocessor {
319        Some(pp) => match pp.preprocess(path, &bytes)? {
320            Some(out) => (out.text, out.payload),
321            None => return Ok(None),
322        },
323        None => (raw_text, P::default()),
324    };
325
326    let raw_chunks = chunk_text_for_path(&text, kind, Some(path));
327    if raw_chunks.is_empty() {
328        return Ok(None);
329    }
330    Ok(Some(PreparedDoc {
331        path: canonical(path)?,
332        mtime: metadata
333            .modified()
334            .ok()
335            .and_then(epoch_secs)
336            .unwrap_or_default() as i64,
337        size: metadata.len() as i64,
338        content_hash: stable_hash_hex(&bytes),
339        indexed_at: epoch_secs(SystemTime::now()).unwrap_or_default() as i64,
340        chunks: raw_chunks,
341        payload,
342    }))
343}
344
345fn insert_document<P>(
346    tx: &Transaction<'_>,
347    doc: &PreparedDoc<P>,
348    embeddings: &[Vec<f32>],
349) -> Result<i64> {
350    if let Some(existing_id) = tx
351        .query_row(
352            "SELECT id FROM documents WHERE path = ?1",
353            params![doc.path],
354            |row| row.get::<_, i64>(0),
355        )
356        .optional()?
357    {
358        delete_document(tx, existing_id)?;
359    }
360    tx.execute(
361        "INSERT INTO documents(path, mtime, size, content_hash, indexed_at) VALUES(?1, ?2, ?3, ?4, ?5)",
362        params![doc.path, doc.mtime, doc.size, doc.content_hash, doc.indexed_at],
363    )?;
364    let doc_id = tx.last_insert_rowid();
365
366    for (idx, (chunk, embedding)) in doc.chunks.iter().zip(embeddings.iter()).enumerate() {
367        tx.execute(
368            "INSERT INTO chunks(doc_id, ord, byte_start, byte_end, line_start, line_end, kind, text, context)
369             VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
370            params![
371                doc_id,
372                idx as i64,
373                chunk.byte_start as i64,
374                chunk.byte_end as i64,
375                chunk.line_start as i64,
376                chunk.line_end as i64,
377                chunk.kind,
378                chunk.text,
379                chunk.context
380            ],
381        )?;
382        let chunk_id = tx.last_insert_rowid();
383        let full_blob = vector_blob(embedding);
384        let preview_blob = vector_blob(&matryoshka_truncate(embedding, PREVIEW_DIMS));
385        tx.execute(
386            "INSERT INTO chunks_fts(rowid, text, context) VALUES(?1, ?2, ?3)",
387            params![chunk_id, chunk.text, chunk.context.as_deref().unwrap_or("")],
388        )?;
389        tx.execute(
390            "INSERT INTO vectors_bin(rowid, embedding) VALUES(?1, vec_quantize_binary(?2))",
391            params![chunk_id, full_blob],
392        )?;
393        tx.execute(
394            "INSERT INTO vectors_bin_preview(rowid, embedding) VALUES(?1, vec_quantize_binary(?2))",
395            params![chunk_id, preview_blob],
396        )?;
397    }
398    Ok(doc_id)
399}
400
401fn register_sqlite_vec() {
402    SQLITE_VEC.call_once(|| unsafe {
403        type ExtensionEntry = unsafe extern "C" fn(
404            *mut rusqlite::ffi::sqlite3,
405            *mut *const c_char,
406            *const rusqlite::ffi::sqlite3_api_routines,
407        ) -> c_int;
408        let init = std::mem::transmute::<*const (), ExtensionEntry>(
409            sqlite_vec::sqlite3_vec_init as *const (),
410        );
411        rusqlite::ffi::sqlite3_auto_extension(Some(init));
412    });
413}
414
415fn apply_pragmas(conn: &Connection) -> Result<()> {
416    conn.pragma_update(None, "journal_mode", "WAL")?;
417    conn.pragma_update(None, "synchronous", "NORMAL")?;
418    conn.pragma_update(None, "temp_store", "MEMORY")?;
419    conn.pragma_update(None, "foreign_keys", "ON")?;
420    conn.pragma_update(None, "mmap_size", 268_435_456i64)?;
421    Ok(())
422}
423
424/// Schema migration. The `vec0` virtual table fixes its dim at CREATE time,
425/// so `EMBEDDING_DIMS` is interpolated into the DDL. Changing the dim in
426/// code requires re-indexing into a fresh DB.
427fn migrate(conn: &Connection) -> Result<()> {
428    conn.execute_batch(&format!(
429        "
430        CREATE TABLE IF NOT EXISTS documents (
431            id INTEGER PRIMARY KEY,
432            path TEXT UNIQUE NOT NULL,
433            mtime INTEGER NOT NULL,
434            size INTEGER NOT NULL,
435            content_hash TEXT NOT NULL,
436            indexed_at INTEGER NOT NULL
437        );
438        CREATE INDEX IF NOT EXISTS idx_documents_path ON documents(path);
439
440        CREATE TABLE IF NOT EXISTS chunks (
441            id INTEGER PRIMARY KEY,
442            doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
443            ord INTEGER NOT NULL,
444            byte_start INTEGER NOT NULL,
445            byte_end INTEGER NOT NULL,
446            line_start INTEGER NOT NULL,
447            line_end INTEGER NOT NULL,
448            kind TEXT NOT NULL,
449            text TEXT NOT NULL,
450            context TEXT
451        );
452        CREATE INDEX IF NOT EXISTS idx_chunks_doc_id ON chunks(doc_id);
453
454        CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
455            text,
456            context,
457            tokenize='porter unicode61'
458        );
459
460        CREATE VIRTUAL TABLE IF NOT EXISTS vectors_bin USING vec0(embedding bit[{EMBEDDING_DIMS}]);
461        CREATE VIRTUAL TABLE IF NOT EXISTS vectors_bin_preview USING vec0(embedding bit[{PREVIEW_DIMS}]);
462        "
463    ))?;
464    Ok(())
465}
466
467fn delete_document(tx: &Transaction<'_>, doc_id: i64) -> Result<()> {
468    let mut stmt = tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1")?;
469    let ids = stmt
470        .query_map(params![doc_id], |row| row.get::<_, i64>(0))?
471        .collect::<std::result::Result<Vec<_>, _>>()?;
472    drop(stmt);
473    for id in ids {
474        tx.execute("DELETE FROM chunks_fts WHERE rowid = ?1", params![id])?;
475        tx.execute("DELETE FROM vectors_bin WHERE rowid = ?1", params![id])?;
476        tx.execute(
477            "DELETE FROM vectors_bin_preview WHERE rowid = ?1",
478            params![id],
479        )?;
480    }
481    tx.execute("DELETE FROM documents WHERE id = ?1", params![doc_id])?;
482    Ok(())
483}
484
485fn matching_docs(tx: &Transaction<'_>, root: &str) -> Result<Vec<Document>> {
486    let pattern = format!("{root}/%");
487    let mut stmt = tx.prepare(
488        "SELECT id, path, mtime, size, content_hash, indexed_at
489         FROM documents WHERE path = ?1 OR path LIKE ?2",
490    )?;
491    let rows = stmt.query_map(params![root, pattern], |row| {
492        Ok(Document {
493            id: row.get(0)?,
494            path: row.get(1)?,
495            mtime: row.get(2)?,
496            size: row.get(3)?,
497            content_hash: row.get(4)?,
498            indexed_at: row.get(5)?,
499        })
500    })?;
501    rows.collect::<std::result::Result<Vec<_>, _>>()
502        .map_err(Into::into)
503}
504
505fn collect_files(path: &Path) -> Result<Vec<PathBuf>> {
506    let metadata = fs::metadata(path)?;
507    if metadata.is_file() {
508        return Ok(vec![path.to_path_buf()]);
509    }
510    if !metadata.is_dir() {
511        return Ok(Vec::new());
512    }
513    let files = WalkDir::new(path)
514        .into_iter()
515        .filter_entry(|entry| !skip_name(entry.file_name().to_string_lossy().as_ref()))
516        .filter_map(std::result::Result::ok)
517        .filter(|entry| entry.file_type().is_file())
518        .map(|entry| entry.into_path())
519        .collect();
520    Ok(files)
521}
522
523fn skip_name(name: &str) -> bool {
524    matches!(
525        name,
526        ".git" | "target" | "node_modules" | ".next" | "dist" | "build" | ".venv"
527    )
528}
529
530fn canonical(path: &Path) -> Result<String> {
531    fs::canonicalize(path)
532        .map(|path| path.to_string_lossy().into_owned())
533        .map_err(Into::into)
534}
535
536fn epoch_secs(time: SystemTime) -> Option<u64> {
537    time.duration_since(UNIX_EPOCH)
538        .ok()
539        .map(|duration| duration.as_secs())
540}
541
542fn stable_hash_hex(bytes: &[u8]) -> String {
543    let mut hasher = DefaultHasher::new();
544    bytes.hash(&mut hasher);
545    format!("{:016x}", hasher.finish())
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::{EmbeddingBackend, SearchTier};
552
553    fn config() -> EmbeddingConfig {
554        EmbeddingConfig {
555            backend: EmbeddingBackend::Hash,
556            show_download_progress: false,
557        }
558    }
559
560    #[test]
561    fn migrations_create_expected_tables() {
562        let dir = tempfile::tempdir().unwrap();
563        let db = LexaDb::open(dir.path().join("index.sqlite"), config()).unwrap();
564        let stats = db.stats().unwrap();
565        assert_eq!(stats.documents, 0);
566        assert_eq!(stats.chunks, 0);
567    }
568
569    #[test]
570    fn reindex_replaces_stale_chunks() {
571        let dir = tempfile::tempdir().unwrap();
572        let source = dir.path().join("repo");
573        fs::create_dir_all(&source).unwrap();
574        let file = source.join("README.md");
575        fs::write(&file, "# Lexa\n\nold search text").unwrap();
576        let mut db = LexaDb::open(dir.path().join("index.sqlite"), config()).unwrap();
577        assert_eq!(db.index_path(&source).unwrap(), 1);
578        fs::write(&file, "# Lexa\n\nconfig validation function").unwrap();
579        assert_eq!(db.index_path(&source).unwrap(), 1);
580        let stats = db.stats().unwrap();
581        assert_eq!(stats.documents, 1);
582        assert!(stats.chunks >= 1);
583        let hits = db
584            .search(&SearchOptions {
585                query: "config validation function".to_string(),
586                tier: SearchTier::Fast,
587                limit: 3,
588                additional_queries: Vec::new(),
589            })
590            .unwrap();
591        assert!(!hits.is_empty());
592        assert!(hits[0].excerpt.contains("config validation"));
593    }
594}