1use std::collections::hash_map::DefaultHasher;
2use std::ffi::{c_char, c_int};
3use std::fs;
4use std::hash::{Hash, Hasher};
5use std::path::{Path, PathBuf};
6use std::sync::{Mutex, Once, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use rusqlite::{params, Connection, OptionalExtension, Transaction};
10use walkdir::WalkDir;
11
12use crate::chunk::{chunk_text_for_path, supported_kind};
13use crate::embed::{
14 matryoshka_truncate, vector_blob, Embedder, EmbeddingConfig, Reranker, EMBEDDING_DIMS,
15 PREVIEW_DIMS,
16};
17use crate::search::{search_impl, SearchOptions};
18use crate::types::{Document, IndexStats, LexaError, SearchHit};
19use crate::Result;
20
21static SQLITE_VEC: Once = Once::new();
22const MAX_FILE_BYTES: u64 = 10 * 1024 * 1024;
23
24pub struct LexaDb {
25 path: PathBuf,
26 conn: Connection,
27 embedding_config: EmbeddingConfig,
28 embedder: OnceLock<Mutex<Embedder>>,
29 reranker: OnceLock<Mutex<Reranker>>,
30}
31
32pub fn default_db_path() -> PathBuf {
33 std::env::var_os("HOME")
34 .map(PathBuf::from)
35 .unwrap_or_else(|| PathBuf::from("."))
36 .join(".lexa")
37 .join("index.sqlite")
38}
39
40pub fn open(path: impl AsRef<Path>, embedding_config: EmbeddingConfig) -> Result<LexaDb> {
41 LexaDb::open(path, embedding_config)
42}
43
44impl LexaDb {
45 pub fn open(path: impl AsRef<Path>, embedding_config: EmbeddingConfig) -> Result<Self> {
46 register_sqlite_vec();
47 let path = path.as_ref().to_path_buf();
48 if let Some(parent) = path.parent() {
49 fs::create_dir_all(parent)?;
50 }
51 let conn = Connection::open(&path)?;
52 apply_pragmas(&conn)?;
53 migrate(&conn)?;
54 Ok(Self {
55 path,
56 conn,
57 embedding_config,
58 embedder: OnceLock::new(),
59 reranker: OnceLock::new(),
60 })
61 }
62
63 pub fn path(&self) -> &Path {
64 &self.path
65 }
66
67 pub fn embedder(&self) -> Result<&Mutex<Embedder>> {
68 if let Some(cached) = self.embedder.get() {
69 return Ok(cached);
70 }
71 let embedder = Embedder::new(&self.embedding_config)?;
72 Ok(self.embedder.get_or_init(|| Mutex::new(embedder)))
73 }
74
75 pub fn reranker(&self) -> Result<&Mutex<Reranker>> {
76 if let Some(cached) = self.reranker.get() {
77 return Ok(cached);
78 }
79 let reranker = Reranker::new(&self.embedding_config)?;
80 Ok(self.reranker.get_or_init(|| Mutex::new(reranker)))
81 }
82
83 pub fn index_path(&mut self, path: impl AsRef<Path>) -> Result<usize> {
84 self.index_path_with_preprocessor::<()>(
85 path,
86 None::<&dyn Preprocessor<Payload = ()>>,
87 |_, _, _| Ok(()),
88 )
89 }
90
91 pub fn index_path_with_preprocessor<P>(
107 &mut self,
108 path: impl AsRef<Path>,
109 preprocessor: Option<&dyn Preprocessor<Payload = P>>,
110 commit_sidecar: impl Fn(&Transaction<'_>, i64, &P) -> Result<()>,
111 ) -> Result<usize>
112 where
113 P: Default,
114 {
115 const BATCH: usize = 64;
116 let files = collect_files(path.as_ref())?;
117 let mut prepared: Vec<PreparedDoc<P>> = Vec::new();
118 let mut pending_texts: Vec<String> = Vec::new();
119 let mut indexed = 0;
120
121 for file in files {
122 let Some(doc) = prepare_document_with(&file, preprocessor)? else {
123 continue;
124 };
125 if self.is_unchanged(&doc)? {
126 continue;
127 }
128 for chunk in &doc.chunks {
129 pending_texts.push(match &chunk.context {
130 Some(context) => format!("{context}\n{}", chunk.text),
131 None => chunk.text.clone(),
132 });
133 }
134 prepared.push(doc);
135
136 if prepared.len() >= BATCH {
137 indexed += self.flush_batch(&mut prepared, &mut pending_texts, &commit_sidecar)?;
138 }
139 }
140 if !prepared.is_empty() {
141 indexed += self.flush_batch(&mut prepared, &mut pending_texts, &commit_sidecar)?;
142 }
143 Ok(indexed)
144 }
145
146 fn is_unchanged<P>(&self, doc: &PreparedDoc<P>) -> Result<bool> {
147 let row: Option<String> = self
148 .conn
149 .query_row(
150 "SELECT content_hash FROM documents WHERE path = ?1",
151 params![doc.path],
152 |row| row.get(0),
153 )
154 .optional()?;
155 Ok(matches!(row, Some(hash) if hash == doc.content_hash))
156 }
157
158 fn flush_batch<P>(
159 &mut self,
160 prepared: &mut Vec<PreparedDoc<P>>,
161 pending_texts: &mut Vec<String>,
162 commit_sidecar: &dyn Fn(&Transaction<'_>, i64, &P) -> Result<()>,
163 ) -> Result<usize> {
164 if prepared.is_empty() {
165 return Ok(0);
166 }
167 let embeddings = {
168 let lock = self.embedder()?;
169 let mut guard = lock
170 .lock()
171 .map_err(|err| LexaError::Embedding(err.to_string()))?;
172 guard.embed_documents(pending_texts)?
173 };
174 pending_texts.clear();
175
176 for embedding in &embeddings {
179 if embedding.len() != EMBEDDING_DIMS {
180 return Err(LexaError::Embedding(format!(
181 "expected {EMBEDDING_DIMS} embedding dims, got {}",
182 embedding.len()
183 )));
184 }
185 }
186
187 let tx = self.conn.transaction()?;
188 let mut cursor = 0usize;
189 let mut indexed = 0;
190 for doc in prepared.drain(..) {
191 let count = doc.chunks.len();
192 let slice = &embeddings[cursor..cursor + count];
193 cursor += count;
194 let doc_id = insert_document(&tx, &doc, slice)?;
195 commit_sidecar(&tx, doc_id, &doc.payload)?;
196 indexed += 1;
197 }
198 tx.commit()?;
199 Ok(indexed)
200 }
201
202 pub fn purge_path(&mut self, path: impl AsRef<Path>) -> Result<usize> {
203 let root = canonical(path.as_ref())?;
204 let tx = self.conn.transaction()?;
205 let docs = matching_docs(&tx, &root)?;
206 for doc in &docs {
207 delete_document(&tx, doc.id)?;
208 }
209 tx.commit()?;
210 Ok(docs.len())
211 }
212
213 pub fn search(&self, options: &SearchOptions) -> Result<Vec<SearchHit>> {
214 search_impl(self, options)
215 }
216
217 pub fn conn(&self) -> &Connection {
223 &self.conn
224 }
225
226 pub fn list_documents(&self) -> Result<Vec<Document>> {
227 let mut stmt = self.conn.prepare(
228 "SELECT id, path, mtime, size, content_hash, indexed_at FROM documents ORDER BY path",
229 )?;
230 let rows = stmt.query_map([], |row| {
231 Ok(Document {
232 id: row.get(0)?,
233 path: row.get(1)?,
234 mtime: row.get(2)?,
235 size: row.get(3)?,
236 content_hash: row.get(4)?,
237 indexed_at: row.get(5)?,
238 })
239 })?;
240 rows.collect::<std::result::Result<Vec<_>, _>>()
241 .map_err(Into::into)
242 }
243
244 pub fn stats(&self) -> Result<IndexStats> {
245 let documents = self
246 .conn
247 .query_row("SELECT count(*) FROM documents", [], |row| row.get(0))?;
248 let chunks = self
249 .conn
250 .query_row("SELECT count(*) FROM chunks", [], |row| row.get(0))?;
251 Ok(IndexStats {
252 db_path: self.path.clone(),
253 documents,
254 chunks,
255 })
256 }
257}
258
259pub trait Preprocessor {
265 type Payload: Default;
266
267 fn preprocess(
268 &self,
269 path: &Path,
270 bytes: &[u8],
271 ) -> Result<Option<PreprocessOutput<Self::Payload>>>;
272}
273
274pub struct PreprocessOutput<P> {
276 pub text: String,
278 pub payload: P,
282}
283
284struct PreparedDoc<P> {
285 path: String,
286 mtime: i64,
287 size: i64,
288 content_hash: String,
289 indexed_at: i64,
290 chunks: Vec<crate::chunk::RawChunk>,
291 payload: P,
292}
293
294fn prepare_document_with<P>(
295 path: &Path,
296 preprocessor: Option<&dyn Preprocessor<Payload = P>>,
297) -> Result<Option<PreparedDoc<P>>>
298where
299 P: Default,
300{
301 let Some(kind) = supported_kind(path) else {
302 return Ok(None);
303 };
304 let metadata = fs::metadata(path)?;
305 if !metadata.is_file() || metadata.len() > MAX_FILE_BYTES {
306 return Ok(None);
307 }
308 let bytes = fs::read(path)?;
309 let raw_text = if kind == "pdf" {
310 pdf_extract::extract_text(path).map_err(|error| LexaError::Pdf(error.to_string()))?
311 } else {
312 if bytes.iter().take(4096).any(|byte| *byte == 0) {
313 return Ok(None);
314 }
315 String::from_utf8_lossy(&bytes).replace("\r\n", "\n")
316 };
317
318 let (text, payload): (String, P) = match preprocessor {
319 Some(pp) => match pp.preprocess(path, &bytes)? {
320 Some(out) => (out.text, out.payload),
321 None => return Ok(None),
322 },
323 None => (raw_text, P::default()),
324 };
325
326 let raw_chunks = chunk_text_for_path(&text, kind, Some(path));
327 if raw_chunks.is_empty() {
328 return Ok(None);
329 }
330 Ok(Some(PreparedDoc {
331 path: canonical(path)?,
332 mtime: metadata
333 .modified()
334 .ok()
335 .and_then(epoch_secs)
336 .unwrap_or_default() as i64,
337 size: metadata.len() as i64,
338 content_hash: stable_hash_hex(&bytes),
339 indexed_at: epoch_secs(SystemTime::now()).unwrap_or_default() as i64,
340 chunks: raw_chunks,
341 payload,
342 }))
343}
344
345fn insert_document<P>(
346 tx: &Transaction<'_>,
347 doc: &PreparedDoc<P>,
348 embeddings: &[Vec<f32>],
349) -> Result<i64> {
350 if let Some(existing_id) = tx
351 .query_row(
352 "SELECT id FROM documents WHERE path = ?1",
353 params![doc.path],
354 |row| row.get::<_, i64>(0),
355 )
356 .optional()?
357 {
358 delete_document(tx, existing_id)?;
359 }
360 tx.execute(
361 "INSERT INTO documents(path, mtime, size, content_hash, indexed_at) VALUES(?1, ?2, ?3, ?4, ?5)",
362 params![doc.path, doc.mtime, doc.size, doc.content_hash, doc.indexed_at],
363 )?;
364 let doc_id = tx.last_insert_rowid();
365
366 for (idx, (chunk, embedding)) in doc.chunks.iter().zip(embeddings.iter()).enumerate() {
367 tx.execute(
368 "INSERT INTO chunks(doc_id, ord, byte_start, byte_end, line_start, line_end, kind, text, context)
369 VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
370 params![
371 doc_id,
372 idx as i64,
373 chunk.byte_start as i64,
374 chunk.byte_end as i64,
375 chunk.line_start as i64,
376 chunk.line_end as i64,
377 chunk.kind,
378 chunk.text,
379 chunk.context
380 ],
381 )?;
382 let chunk_id = tx.last_insert_rowid();
383 let full_blob = vector_blob(embedding);
384 let preview_blob = vector_blob(&matryoshka_truncate(embedding, PREVIEW_DIMS));
385 tx.execute(
386 "INSERT INTO chunks_fts(rowid, text, context) VALUES(?1, ?2, ?3)",
387 params![chunk_id, chunk.text, chunk.context.as_deref().unwrap_or("")],
388 )?;
389 tx.execute(
390 "INSERT INTO vectors_bin(rowid, embedding) VALUES(?1, vec_quantize_binary(?2))",
391 params![chunk_id, full_blob],
392 )?;
393 tx.execute(
394 "INSERT INTO vectors_bin_preview(rowid, embedding) VALUES(?1, vec_quantize_binary(?2))",
395 params![chunk_id, preview_blob],
396 )?;
397 }
398 Ok(doc_id)
399}
400
401fn register_sqlite_vec() {
402 SQLITE_VEC.call_once(|| unsafe {
403 type ExtensionEntry = unsafe extern "C" fn(
404 *mut rusqlite::ffi::sqlite3,
405 *mut *const c_char,
406 *const rusqlite::ffi::sqlite3_api_routines,
407 ) -> c_int;
408 let init = std::mem::transmute::<*const (), ExtensionEntry>(
409 sqlite_vec::sqlite3_vec_init as *const (),
410 );
411 rusqlite::ffi::sqlite3_auto_extension(Some(init));
412 });
413}
414
415fn apply_pragmas(conn: &Connection) -> Result<()> {
416 conn.pragma_update(None, "journal_mode", "WAL")?;
417 conn.pragma_update(None, "synchronous", "NORMAL")?;
418 conn.pragma_update(None, "temp_store", "MEMORY")?;
419 conn.pragma_update(None, "foreign_keys", "ON")?;
420 conn.pragma_update(None, "mmap_size", 268_435_456i64)?;
421 Ok(())
422}
423
424fn migrate(conn: &Connection) -> Result<()> {
428 conn.execute_batch(&format!(
429 "
430 CREATE TABLE IF NOT EXISTS documents (
431 id INTEGER PRIMARY KEY,
432 path TEXT UNIQUE NOT NULL,
433 mtime INTEGER NOT NULL,
434 size INTEGER NOT NULL,
435 content_hash TEXT NOT NULL,
436 indexed_at INTEGER NOT NULL
437 );
438 CREATE INDEX IF NOT EXISTS idx_documents_path ON documents(path);
439
440 CREATE TABLE IF NOT EXISTS chunks (
441 id INTEGER PRIMARY KEY,
442 doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
443 ord INTEGER NOT NULL,
444 byte_start INTEGER NOT NULL,
445 byte_end INTEGER NOT NULL,
446 line_start INTEGER NOT NULL,
447 line_end INTEGER NOT NULL,
448 kind TEXT NOT NULL,
449 text TEXT NOT NULL,
450 context TEXT
451 );
452 CREATE INDEX IF NOT EXISTS idx_chunks_doc_id ON chunks(doc_id);
453
454 CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
455 text,
456 context,
457 tokenize='porter unicode61'
458 );
459
460 CREATE VIRTUAL TABLE IF NOT EXISTS vectors_bin USING vec0(embedding bit[{EMBEDDING_DIMS}]);
461 CREATE VIRTUAL TABLE IF NOT EXISTS vectors_bin_preview USING vec0(embedding bit[{PREVIEW_DIMS}]);
462 "
463 ))?;
464 Ok(())
465}
466
467fn delete_document(tx: &Transaction<'_>, doc_id: i64) -> Result<()> {
468 let mut stmt = tx.prepare("SELECT id FROM chunks WHERE doc_id = ?1")?;
469 let ids = stmt
470 .query_map(params![doc_id], |row| row.get::<_, i64>(0))?
471 .collect::<std::result::Result<Vec<_>, _>>()?;
472 drop(stmt);
473 for id in ids {
474 tx.execute("DELETE FROM chunks_fts WHERE rowid = ?1", params![id])?;
475 tx.execute("DELETE FROM vectors_bin WHERE rowid = ?1", params![id])?;
476 tx.execute(
477 "DELETE FROM vectors_bin_preview WHERE rowid = ?1",
478 params![id],
479 )?;
480 }
481 tx.execute("DELETE FROM documents WHERE id = ?1", params![doc_id])?;
482 Ok(())
483}
484
485fn matching_docs(tx: &Transaction<'_>, root: &str) -> Result<Vec<Document>> {
486 let pattern = format!("{root}/%");
487 let mut stmt = tx.prepare(
488 "SELECT id, path, mtime, size, content_hash, indexed_at
489 FROM documents WHERE path = ?1 OR path LIKE ?2",
490 )?;
491 let rows = stmt.query_map(params![root, pattern], |row| {
492 Ok(Document {
493 id: row.get(0)?,
494 path: row.get(1)?,
495 mtime: row.get(2)?,
496 size: row.get(3)?,
497 content_hash: row.get(4)?,
498 indexed_at: row.get(5)?,
499 })
500 })?;
501 rows.collect::<std::result::Result<Vec<_>, _>>()
502 .map_err(Into::into)
503}
504
505fn collect_files(path: &Path) -> Result<Vec<PathBuf>> {
506 let metadata = fs::metadata(path)?;
507 if metadata.is_file() {
508 return Ok(vec![path.to_path_buf()]);
509 }
510 if !metadata.is_dir() {
511 return Ok(Vec::new());
512 }
513 let files = WalkDir::new(path)
514 .into_iter()
515 .filter_entry(|entry| !skip_name(entry.file_name().to_string_lossy().as_ref()))
516 .filter_map(std::result::Result::ok)
517 .filter(|entry| entry.file_type().is_file())
518 .map(|entry| entry.into_path())
519 .collect();
520 Ok(files)
521}
522
523fn skip_name(name: &str) -> bool {
524 matches!(
525 name,
526 ".git" | "target" | "node_modules" | ".next" | "dist" | "build" | ".venv"
527 )
528}
529
530fn canonical(path: &Path) -> Result<String> {
531 fs::canonicalize(path)
532 .map(|path| path.to_string_lossy().into_owned())
533 .map_err(Into::into)
534}
535
536fn epoch_secs(time: SystemTime) -> Option<u64> {
537 time.duration_since(UNIX_EPOCH)
538 .ok()
539 .map(|duration| duration.as_secs())
540}
541
542fn stable_hash_hex(bytes: &[u8]) -> String {
543 let mut hasher = DefaultHasher::new();
544 bytes.hash(&mut hasher);
545 format!("{:016x}", hasher.finish())
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::{EmbeddingBackend, SearchTier};
552
553 fn config() -> EmbeddingConfig {
554 EmbeddingConfig {
555 backend: EmbeddingBackend::Hash,
556 show_download_progress: false,
557 }
558 }
559
560 #[test]
561 fn migrations_create_expected_tables() {
562 let dir = tempfile::tempdir().unwrap();
563 let db = LexaDb::open(dir.path().join("index.sqlite"), config()).unwrap();
564 let stats = db.stats().unwrap();
565 assert_eq!(stats.documents, 0);
566 assert_eq!(stats.chunks, 0);
567 }
568
569 #[test]
570 fn reindex_replaces_stale_chunks() {
571 let dir = tempfile::tempdir().unwrap();
572 let source = dir.path().join("repo");
573 fs::create_dir_all(&source).unwrap();
574 let file = source.join("README.md");
575 fs::write(&file, "# Lexa\n\nold search text").unwrap();
576 let mut db = LexaDb::open(dir.path().join("index.sqlite"), config()).unwrap();
577 assert_eq!(db.index_path(&source).unwrap(), 1);
578 fs::write(&file, "# Lexa\n\nconfig validation function").unwrap();
579 assert_eq!(db.index_path(&source).unwrap(), 1);
580 let stats = db.stats().unwrap();
581 assert_eq!(stats.documents, 1);
582 assert!(stats.chunks >= 1);
583 let hits = db
584 .search(&SearchOptions {
585 query: "config validation function".to_string(),
586 tier: SearchTier::Fast,
587 limit: 3,
588 additional_queries: Vec::new(),
589 })
590 .unwrap();
591 assert!(!hits.is_empty());
592 assert!(hits[0].excerpt.contains("config validation"));
593 }
594}