Skip to main content

codelens_engine/embedding/engine_impl/
index.rs

1use anyhow::{Context, Result};
2use fastembed::TextEmbedding;
3use std::collections::{HashMap, HashSet};
4
5use super::super::cache::{
6    ReusableEmbeddingKey, reusable_embedding_key_for_chunk, reusable_embedding_key_for_symbol,
7};
8use super::super::ffi;
9use super::super::prompt::{
10    build_embedding_text, extract_leading_doc, is_test_only_symbol, split_identifier,
11};
12use super::super::runtime::{
13    configured_embedding_text_cache_size, embed_batch_size, load_codesearch_model,
14    max_embed_symbols,
15};
16use super::super::vec_store::{EMBEDDING_STORE_SCHEMA_VERSION, SqliteVecStore};
17use super::super::{
18    CHANGED_FILE_QUERY_CHUNK, EmbeddingEngine, EmbeddingFreshnessReport, EmbeddingIndexInfo,
19    EmbeddingRuntimeInfo,
20};
21use crate::db::IndexDb;
22use crate::embedding_store::EmbeddingChunk;
23use crate::project::ProjectRoot;
24use rusqlite::Connection;
25
26struct IndexingFlagGuard<'a>(&'a std::sync::atomic::AtomicBool);
27
28impl Drop for IndexingFlagGuard<'_> {
29    fn drop(&mut self) {
30        self.0.store(false, std::sync::atomic::Ordering::Release);
31    }
32}
33
34impl EmbeddingEngine {
35    pub fn new(project: &ProjectRoot) -> Result<Self> {
36        let (model, dimension, model_name, runtime_info) = load_codesearch_model()?;
37
38        let db_dir = project.as_path().join(".codelens/index");
39        std::fs::create_dir_all(&db_dir)?;
40        let db_path = db_dir.join("embeddings.db");
41
42        let store = SqliteVecStore::new(&db_path, dimension, &model_name)?;
43
44        Ok(Self {
45            model: std::sync::Mutex::new(model),
46            store,
47            model_name,
48            runtime_info,
49            text_embed_cache: std::sync::Mutex::new(super::super::cache::TextEmbeddingCache::new(
50                configured_embedding_text_cache_size(),
51            )),
52            indexing: std::sync::atomic::AtomicBool::new(false),
53        })
54    }
55
56    pub fn model_name(&self) -> &str {
57        &self.model_name
58    }
59
60    pub fn runtime_info(&self) -> &EmbeddingRuntimeInfo {
61        &self.runtime_info
62    }
63
64    /// Returns true if a full reindex is currently in progress.
65    pub fn is_indexing(&self) -> bool {
66        self.indexing.load(std::sync::atomic::Ordering::Relaxed)
67    }
68
69    pub fn index_from_project(&self, project: &ProjectRoot) -> Result<usize> {
70        // Guard against concurrent full reindex (14s+ operation)
71        if self
72            .indexing
73            .compare_exchange(
74                false,
75                true,
76                std::sync::atomic::Ordering::AcqRel,
77                std::sync::atomic::Ordering::Relaxed,
78            )
79            .is_err()
80        {
81            anyhow::bail!(
82                "Embedding indexing already in progress — wait for the current run to complete before retrying."
83            );
84        }
85        let _guard = IndexingFlagGuard(&self.indexing);
86
87        let db_path = crate::db::index_db_path(project.as_path());
88        let symbol_db = IndexDb::open(&db_path)?;
89        let batch_size = embed_batch_size();
90        let max_symbols = max_embed_symbols();
91        let mut total_indexed = 0usize;
92        let mut total_seen = 0usize;
93        let mut model = None;
94        let mut existing_embeddings: HashMap<
95            String,
96            HashMap<ReusableEmbeddingKey, EmbeddingChunk>,
97        > = HashMap::new();
98        let mut current_db_files = HashSet::new();
99        let mut capped = false;
100
101        self.store
102            .for_each_file_embeddings(&mut |file_path, chunks| {
103                existing_embeddings.insert(
104                    file_path,
105                    chunks
106                        .into_iter()
107                        .map(|chunk| (reusable_embedding_key_for_chunk(&chunk), chunk))
108                        .collect(),
109                );
110                Ok(())
111            })?;
112
113        symbol_db.for_each_file_symbols_with_bytes(|file_path, symbols| {
114            current_db_files.insert(file_path.clone());
115            if capped {
116                return Ok(());
117            }
118
119            let source = std::fs::read_to_string(project.as_path().join(&file_path)).ok();
120            let relevant_symbols: Vec<_> = symbols
121                .into_iter()
122                .filter(|sym| !is_test_only_symbol(sym, source.as_deref()))
123                .collect();
124
125            if relevant_symbols.is_empty() {
126                self.store.delete_by_file(&[file_path.as_str()])?;
127                existing_embeddings.remove(&file_path);
128                return Ok(());
129            }
130
131            if total_seen + relevant_symbols.len() > max_symbols {
132                capped = true;
133                return Ok(());
134            }
135            total_seen += relevant_symbols.len();
136
137            let existing_for_file = existing_embeddings.remove(&file_path).unwrap_or_default();
138            total_indexed += self.reconcile_file_embeddings(
139                &file_path,
140                relevant_symbols,
141                source.as_deref(),
142                existing_for_file,
143                batch_size,
144                &mut model,
145            )?;
146            Ok(())
147        })?;
148
149        let removed_files: Vec<String> = existing_embeddings
150            .into_keys()
151            .filter(|file_path| !current_db_files.contains(file_path))
152            .collect();
153        if !removed_files.is_empty() {
154            let removed_refs: Vec<&str> = removed_files.iter().map(String::as_str).collect();
155            self.store.delete_by_file(&removed_refs)?;
156        }
157
158        Ok(total_indexed)
159    }
160
161    pub fn ensure_index_fresh_for_project(
162        &self,
163        project: &ProjectRoot,
164    ) -> Result<EmbeddingFreshnessReport> {
165        if self
166            .indexing
167            .compare_exchange(
168                false,
169                true,
170                std::sync::atomic::Ordering::AcqRel,
171                std::sync::atomic::Ordering::Relaxed,
172            )
173            .is_err()
174        {
175            anyhow::bail!(
176                "Embedding indexing already in progress — wait for the current run to complete before retrying."
177            );
178        }
179
180        let _guard = IndexingFlagGuard(&self.indexing);
181
182        let db_path = crate::db::index_db_path(project.as_path());
183        let symbol_db = IndexDb::open(&db_path)?;
184        let batch_size = embed_batch_size();
185        let mut report = EmbeddingFreshnessReport::default();
186        let mut existing_embeddings: HashMap<
187            String,
188            HashMap<ReusableEmbeddingKey, EmbeddingChunk>,
189        > = HashMap::new();
190        let mut current_db_files = HashSet::new();
191        let mut model = None;
192
193        self.store
194            .for_each_file_embeddings(&mut |file_path, chunks| {
195                existing_embeddings.insert(
196                    file_path,
197                    chunks
198                        .into_iter()
199                        .map(|chunk| (reusable_embedding_key_for_chunk(&chunk), chunk))
200                        .collect(),
201                );
202                Ok(())
203            })?;
204
205        if existing_embeddings.is_empty() {
206            return Ok(report);
207        }
208
209        symbol_db.for_each_file_symbols_with_bytes(|file_path, symbols| {
210            current_db_files.insert(file_path.clone());
211            let Some(existing_for_file) = existing_embeddings.get(&file_path) else {
212                report.skipped_new_files += 1;
213                return Ok(());
214            };
215
216            report.checked_files += 1;
217            let source = std::fs::read_to_string(project.as_path().join(&file_path)).ok();
218            let relevant_symbols: Vec<_> = symbols
219                .into_iter()
220                .filter(|sym| !is_test_only_symbol(sym, source.as_deref()))
221                .collect();
222
223            if relevant_symbols.is_empty() {
224                self.store.delete_by_file(&[file_path.as_str()])?;
225                existing_embeddings.remove(&file_path);
226                report.refreshed_files += 1;
227                return Ok(());
228            }
229
230            let current_keys = relevant_symbols
231                .iter()
232                .map(|sym| {
233                    let text = build_embedding_text(sym, source.as_deref());
234                    reusable_embedding_key_for_symbol(sym, &text)
235                })
236                .collect::<HashSet<_>>();
237            let stored_keys = existing_for_file.keys().cloned().collect::<HashSet<_>>();
238
239            if current_keys == stored_keys {
240                existing_embeddings.remove(&file_path);
241                report.unchanged_files += 1;
242                return Ok(());
243            }
244
245            let existing_for_file = existing_embeddings.remove(&file_path).unwrap_or_default();
246            report.indexed_symbols += self.reconcile_file_embeddings(
247                &file_path,
248                relevant_symbols,
249                source.as_deref(),
250                existing_for_file,
251                batch_size,
252                &mut model,
253            )?;
254            report.refreshed_files += 1;
255            Ok(())
256        })?;
257
258        let removed_files: Vec<String> = existing_embeddings
259            .into_keys()
260            .filter(|file_path| !current_db_files.contains(file_path))
261            .collect();
262        if !removed_files.is_empty() {
263            let removed_refs: Vec<&str> = removed_files.iter().map(String::as_str).collect();
264            report.removed_files = self.store.delete_by_file(&removed_refs)?;
265        }
266
267        Ok(report)
268    }
269
270    /// Extract NL→code bridge candidates from indexed symbols.
271    /// For each symbol with a docstring, produces a (docstring_first_line, symbol_name) pair.
272    /// The caller writes these to `.codelens/bridges.json` for project-specific NL bridging.
273    pub fn generate_bridge_candidates(
274        &self,
275        project: &ProjectRoot,
276    ) -> Result<Vec<(String, String)>> {
277        let db_path = crate::db::index_db_path(project.as_path());
278        let symbol_db = IndexDb::open(&db_path)?;
279        let mut bridges: Vec<(String, String)> = Vec::new();
280        let mut seen_nl = HashSet::new();
281
282        symbol_db.for_each_file_symbols_with_bytes(|file_path, symbols| {
283            let source = std::fs::read_to_string(project.as_path().join(&file_path)).ok();
284            for sym in &symbols {
285                if is_test_only_symbol(sym, source.as_deref()) {
286                    continue;
287                }
288                let doc = source.as_deref().and_then(|src| {
289                    extract_leading_doc(src, sym.start_byte as usize, sym.end_byte as usize)
290                });
291                let doc = match doc {
292                    Some(d) if !d.is_empty() => d,
293                    _ => continue,
294                };
295
296                // Build code term: symbol_name + split words
297                let split = split_identifier(&sym.name);
298                let code_term = if split != sym.name {
299                    format!("{} {}", sym.name, split)
300                } else {
301                    sym.name.clone()
302                };
303
304                // Extract short NL phrases (3-6 words) from the docstring.
305                // This produces multiple bridge entries per symbol, each matching
306                // common NL query patterns like "render template" or "parse url".
307                let first_line = doc.lines().next().unwrap_or("").trim().to_lowercase();
308                // Remove trailing period/punctuation
309                let clean = first_line.trim_end_matches(|c: char| c.is_ascii_punctuation());
310                let words: Vec<&str> = clean.split_whitespace().collect();
311                if words.len() < 2 {
312                    continue;
313                }
314
315                // Generate short N-gram keys (2-4 words from the start)
316                for window in 2..=words.len().min(4) {
317                    let key = words[..window].join(" ");
318                    if key.len() < 5 || key.len() > 60 {
319                        continue;
320                    }
321                    if seen_nl.insert(key.clone()) {
322                        bridges.push((key, code_term.clone()));
323                    }
324                }
325
326                // Also add split_identifier words as a bridge key
327                // so "render template" → render_template
328                if split != sym.name && !seen_nl.contains(&split.to_lowercase()) {
329                    let lowered = split.to_lowercase();
330                    if lowered.split_whitespace().count() >= 2 && seen_nl.insert(lowered.clone()) {
331                        bridges.push((lowered, code_term.clone()));
332                    }
333                }
334            }
335            Ok(())
336        })?;
337
338        Ok(bridges)
339    }
340
341    fn reconcile_file_embeddings<'a>(
342        &'a self,
343        file_path: &str,
344        symbols: Vec<crate::db::SymbolWithFile>,
345        source: Option<&str>,
346        mut existing_embeddings: HashMap<ReusableEmbeddingKey, EmbeddingChunk>,
347        batch_size: usize,
348        model: &mut Option<std::sync::MutexGuard<'a, TextEmbedding>>,
349    ) -> Result<usize> {
350        let mut reconciled_chunks = Vec::with_capacity(symbols.len());
351        let mut batch_texts: Vec<String> = Vec::with_capacity(batch_size);
352        let mut batch_meta: Vec<crate::db::SymbolWithFile> = Vec::with_capacity(batch_size);
353
354        for sym in symbols {
355            let text = build_embedding_text(&sym, source);
356            if let Some(existing) =
357                existing_embeddings.remove(&reusable_embedding_key_for_symbol(&sym, &text))
358            {
359                reconciled_chunks.push(EmbeddingChunk {
360                    file_path: sym.file_path.clone(),
361                    symbol_name: sym.name.clone(),
362                    kind: sym.kind.clone(),
363                    line: sym.line as usize,
364                    signature: sym.signature.clone(),
365                    name_path: sym.name_path.clone(),
366                    text,
367                    embedding: existing.embedding,
368                    doc_embedding: existing.doc_embedding,
369                });
370                continue;
371            }
372
373            batch_texts.push(text);
374            batch_meta.push(sym);
375
376            if batch_texts.len() >= batch_size {
377                if model.is_none() {
378                    *model = Some(
379                        self.model
380                            .lock()
381                            .map_err(|_| anyhow::anyhow!("model lock"))?,
382                    );
383                }
384                reconciled_chunks.extend(Self::embed_chunks(
385                    model.as_mut().expect("model lock initialized"),
386                    &batch_texts,
387                    &batch_meta,
388                )?);
389                batch_texts.clear();
390                batch_meta.clear();
391            }
392        }
393
394        if !batch_texts.is_empty() {
395            if model.is_none() {
396                *model = Some(
397                    self.model
398                        .lock()
399                        .map_err(|_| anyhow::anyhow!("model lock"))?,
400                );
401            }
402            reconciled_chunks.extend(Self::embed_chunks(
403                model.as_mut().expect("model lock initialized"),
404                &batch_texts,
405                &batch_meta,
406            )?);
407        }
408
409        self.store.delete_by_file(&[file_path])?;
410        if reconciled_chunks.is_empty() {
411            return Ok(0);
412        }
413        self.store.insert(&reconciled_chunks)
414    }
415
416    fn embed_chunks(
417        model: &mut TextEmbedding,
418        texts: &[String],
419        meta: &[crate::db::SymbolWithFile],
420    ) -> Result<Vec<EmbeddingChunk>> {
421        let batch_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
422        let embeddings = model.embed(batch_refs, None).context("embedding failed")?;
423
424        Ok(meta
425            .iter()
426            .zip(embeddings)
427            .zip(texts.iter())
428            .map(|((sym, emb), text)| EmbeddingChunk {
429                file_path: sym.file_path.clone(),
430                symbol_name: sym.name.clone(),
431                kind: sym.kind.clone(),
432                line: sym.line as usize,
433                signature: sym.signature.clone(),
434                name_path: sym.name_path.clone(),
435                text: text.clone(),
436                embedding: emb,
437                doc_embedding: None,
438            })
439            .collect())
440    }
441
442    /// Embed one batch of texts and upsert immediately, then the caller drops the batch.
443    fn flush_batch(
444        model: &mut TextEmbedding,
445        store: &SqliteVecStore,
446        texts: &[String],
447        meta: &[crate::db::SymbolWithFile],
448    ) -> Result<usize> {
449        let chunks = Self::embed_chunks(model, texts, meta)?;
450        store.insert(&chunks)
451    }
452
453    /// Incrementally re-index only the given files.
454    pub fn index_changed_files(
455        &self,
456        project: &ProjectRoot,
457        changed_files: &[&str],
458    ) -> Result<usize> {
459        if changed_files.is_empty() {
460            return Ok(0);
461        }
462        let batch_size = embed_batch_size();
463        let mut existing_embeddings: HashMap<ReusableEmbeddingKey, EmbeddingChunk> = HashMap::new();
464        for file_chunk in changed_files.chunks(CHANGED_FILE_QUERY_CHUNK) {
465            for chunk in self.store.embeddings_for_files(file_chunk)? {
466                existing_embeddings.insert(reusable_embedding_key_for_chunk(&chunk), chunk);
467            }
468        }
469        self.store.delete_by_file(changed_files)?;
470
471        let db_path = crate::db::index_db_path(project.as_path());
472        let symbol_db = IndexDb::open(&db_path)?;
473
474        let mut total_indexed = 0usize;
475        let mut batch_texts: Vec<String> = Vec::with_capacity(batch_size);
476        let mut batch_meta: Vec<crate::db::SymbolWithFile> = Vec::with_capacity(batch_size);
477        let mut batch_reused: Vec<EmbeddingChunk> = Vec::with_capacity(batch_size);
478        let mut file_cache: std::collections::HashMap<String, Option<String>> =
479            std::collections::HashMap::new();
480        let mut model = None;
481
482        for file_chunk in changed_files.chunks(CHANGED_FILE_QUERY_CHUNK) {
483            let relevant = symbol_db.symbols_for_files(file_chunk)?;
484            for sym in relevant {
485                let source = file_cache.entry(sym.file_path.clone()).or_insert_with(|| {
486                    std::fs::read_to_string(project.as_path().join(&sym.file_path)).ok()
487                });
488                if is_test_only_symbol(&sym, source.as_deref()) {
489                    continue;
490                }
491                let text = build_embedding_text(&sym, source.as_deref());
492                if let Some(existing) =
493                    existing_embeddings.remove(&reusable_embedding_key_for_symbol(&sym, &text))
494                {
495                    batch_reused.push(EmbeddingChunk {
496                        file_path: sym.file_path.clone(),
497                        symbol_name: sym.name.clone(),
498                        kind: sym.kind.clone(),
499                        line: sym.line as usize,
500                        signature: sym.signature.clone(),
501                        name_path: sym.name_path.clone(),
502                        text,
503                        embedding: existing.embedding,
504                        doc_embedding: existing.doc_embedding,
505                    });
506                    if batch_reused.len() >= batch_size {
507                        total_indexed += self.store.insert(&batch_reused)?;
508                        batch_reused.clear();
509                    }
510                    continue;
511                }
512                batch_texts.push(text);
513                batch_meta.push(sym);
514
515                if batch_texts.len() >= batch_size {
516                    if model.is_none() {
517                        model = Some(
518                            self.model
519                                .lock()
520                                .map_err(|_| anyhow::anyhow!("model lock"))?,
521                        );
522                    }
523                    total_indexed += Self::flush_batch(
524                        model.as_mut().expect("model lock initialized"),
525                        &self.store,
526                        &batch_texts,
527                        &batch_meta,
528                    )?;
529                    batch_texts.clear();
530                    batch_meta.clear();
531                }
532            }
533        }
534
535        if !batch_reused.is_empty() {
536            total_indexed += self.store.insert(&batch_reused)?;
537        }
538
539        if !batch_texts.is_empty() {
540            if model.is_none() {
541                model = Some(
542                    self.model
543                        .lock()
544                        .map_err(|_| anyhow::anyhow!("model lock"))?,
545                );
546            }
547            total_indexed += Self::flush_batch(
548                model.as_mut().expect("model lock initialized"),
549                &self.store,
550                &batch_texts,
551                &batch_meta,
552            )?;
553        }
554
555        Ok(total_indexed)
556    }
557
558    /// Whether the embedding index has been populated.
559    pub fn is_indexed(&self) -> bool {
560        self.store.count().unwrap_or(0) > 0
561    }
562
563    pub fn index_info(&self) -> EmbeddingIndexInfo {
564        EmbeddingIndexInfo {
565            model_name: self.model_name.clone(),
566            indexed_symbols: self.store.count().unwrap_or(0),
567        }
568    }
569
570    pub fn inspect_existing_index(project: &ProjectRoot) -> Result<Option<EmbeddingIndexInfo>> {
571        let db_path = project.as_path().join(".codelens/index/embeddings.db");
572        if !db_path.exists() {
573            return Ok(None);
574        }
575
576        let conn = crate::db::open_derived_sqlite_with_recovery(
577            &db_path,
578            "embedding index",
579            || {
580                ffi::register_sqlite_vec()?;
581                let conn = Connection::open(&db_path)?;
582                // Read-only metadata probe (model name + symbol count); aligns
583                // mmap_size / cache_size with `vec_store.rs` so this transient
584                // connection benefits from the same OS page cache state. WAL /
585                // synchronous / wal_autocheckpoint deliberately omitted — this
586                // path never writes, and grabbing the schema-level lock for a
587                // mode change would race with a live store connection.
588                conn.execute_batch(
589                    "PRAGMA busy_timeout = 5000; PRAGMA mmap_size = 67108864; PRAGMA cache_size = -16000;",
590                )?;
591                conn.query_row("PRAGMA schema_version", [], |_row| Ok(()))?;
592                Ok(conn)
593            },
594        )?;
595
596        let model_name: Option<String> = conn
597            .query_row(
598                "SELECT value FROM meta WHERE key = 'model' LIMIT 1",
599                [],
600                |row| row.get(0),
601            )
602            .ok();
603        let schema_version: Option<i64> = conn
604            .query_row(
605                "SELECT CAST(value AS INTEGER) FROM meta WHERE key = 'schema_version' LIMIT 1",
606                [],
607                |row| row.get(0),
608            )
609            .ok();
610        if schema_version != Some(EMBEDDING_STORE_SCHEMA_VERSION) {
611            return Ok(None);
612        }
613        let indexed_symbols: usize = conn
614            .query_row("SELECT COUNT(*) FROM symbols", [], |row| {
615                row.get::<_, i64>(0)
616            })
617            .map(|count| count.max(0) as usize)
618            .unwrap_or(0);
619
620        Ok(model_name.map(|model_name| EmbeddingIndexInfo {
621            model_name,
622            indexed_symbols,
623        }))
624    }
625}