Skip to main content

codesearch/fts/
tantivy_store.rs

1//! Tantivy-based full-text search store
2//!
3//! Provides BM25 full-text search for hybrid search with RRF fusion.
4//!
5//! # Architecture Note
6//! Always use `FtsStore::new()` which opens in R/W mode. This ensures only one
7//! connection type exists, avoiding Windows file locking issues between readers
8//! and writers. The writer is lazy-initialized on first write operation.
9
10use anyhow::{anyhow, Result};
11use std::path::Path;
12use tantivy::{
13    collector::TopDocs,
14    directory::MmapDirectory,
15    merge_policy::NoMergePolicy,
16    query::QueryParser,
17    schema::{Field, NumericOptions, Schema, Value, STORED, STRING, TEXT},
18    Index, IndexReader, IndexSettings, IndexWriter, TantivyDocument, Term,
19};
20
21use crate::chunker::ChunkKind;
22
23/// Result from FTS search
24#[derive(Debug, Clone)]
25pub struct FtsResult {
26    /// Chunk ID that matches
27    pub chunk_id: u32,
28    /// BM25 score from Tantivy
29    pub score: f32,
30}
31
32/// Full-text search store using Tantivy
33///
34/// Single connection type that supports both read and write operations.
35/// Writer is lazy-initialized on first write to avoid unnecessary locks.
36pub struct FtsStore {
37    index: Index,
38    reader: IndexReader,
39    writer: Option<IndexWriter>,
40    #[allow(dead_code)]
41    schema: Schema,
42    // Field handles
43    chunk_id_field: Field,
44    content_field: Field,
45    path_field: Field,
46    signature_field: Field,
47    kind_field: Field,
48}
49
50impl FtsStore {
51    /// Create or open an FTS index at the given path.
52    ///
53    /// Opens in a mode that supports both reading and writing.
54    /// Writer is lazy-initialized on first write operation.
55    pub fn new(db_path: &Path) -> Result<Self> {
56        let fts_path = db_path.join("fts");
57        std::fs::create_dir_all(&fts_path)?;
58
59        // Build schema
60        let mut schema_builder = Schema::builder();
61
62        // Chunk ID - stored and indexed for retrieval and deletion
63        let chunk_id_field = schema_builder.add_u64_field(
64            "chunk_id",
65            NumericOptions::default().set_indexed().set_stored(),
66        );
67
68        // Content - full text indexed for BM25 search
69        let content_field = schema_builder.add_text_field("content", TEXT);
70
71        // Path - stored and string indexed for filtering
72        let path_field = schema_builder.add_text_field("path", STRING | STORED);
73
74        // Signature - indexed for function/method name search
75        let signature_field = schema_builder.add_text_field("signature", TEXT);
76
77        // Kind - stored for filtering (function, class, etc)
78        let kind_field = schema_builder.add_text_field("kind", STRING | STORED);
79
80        let schema = schema_builder.build();
81
82        // Open or create index with retry logic for Windows file locking
83        let index = Self::open_or_create_index_with_retry(&fts_path, &schema)?;
84
85        // Create reader for searching
86        let reader = index.reader()?;
87
88        Ok(Self {
89            index,
90            reader,
91            writer: None, // Lazy-initialized on first write
92            schema,
93            chunk_id_field,
94            content_field,
95            path_field,
96            signature_field,
97            kind_field,
98        })
99    }
100
101    /// Create or open an FTS index with writer ready for indexing.
102    ///
103    /// Use this when you know you'll be writing immediately (e.g., during indexing).
104    /// For search-only or mixed workloads, use `new()` instead.
105    pub fn new_with_writer(db_path: &Path) -> Result<Self> {
106        let mut store = Self::new(db_path)?;
107        store.ensure_writer()?;
108        Ok(store)
109    }
110
111    /// Open or create index with retry logic for Windows file locking issues
112    fn open_or_create_index_with_retry(fts_path: &Path, schema: &Schema) -> Result<Index> {
113        let max_retries = 3;
114        let mut last_error: Option<String> = None;
115
116        for attempt in 0..max_retries {
117            if attempt > 0 {
118                // Wait before retry (exponential backoff)
119                std::thread::sleep(std::time::Duration::from_millis(100 * (1 << attempt)));
120            }
121
122            let result: Result<Index, _> = if fts_path.join("meta.json").exists() {
123                Index::open_in_dir(fts_path).map_err(|e| e.to_string())
124            } else {
125                MmapDirectory::open(fts_path)
126                    .map_err(|e| e.to_string())
127                    .and_then(|dir| {
128                        Index::create(dir, schema.clone(), IndexSettings::default())
129                            .map_err(|e| e.to_string())
130                    })
131            };
132
133            match result {
134                Ok(index) => return Ok(index),
135                Err(e) => {
136                    last_error = Some(e);
137                    // On Windows, try to clear lock files if permission denied
138                    if attempt < max_retries - 1 {
139                        Self::try_clear_lock_files(fts_path);
140                    }
141                }
142            }
143        }
144
145        Err(anyhow!(
146            "Failed to open FTS index after {} retries: {}",
147            max_retries,
148            last_error.unwrap_or_default()
149        ))
150    }
151
152    /// Create writer with retry logic for Windows file locking issues
153    /// Increased retry count and initial wait to handle slow file handle release
154    fn create_writer_with_retry(index: &Index) -> Result<IndexWriter> {
155        let max_retries = 5; // Increased from 3 to handle Windows timing issues
156        let mut last_error: Option<String> = None;
157
158        for attempt in 0..max_retries {
159            if attempt > 0 {
160                // Wait before retry (exponential backoff)
161                // Increased initial wait from 100ms to 200ms for better Windows compatibility
162                std::thread::sleep(std::time::Duration::from_millis(200 * (1 << attempt)));
163            }
164
165            // 50MB writer heap (tantivy default).
166            //
167            // CRITICAL: Set NoMergePolicy to prevent tantivy from spawning background
168            // merge threads. On Windows, these threads encounter I/O errors (antivirus
169            // interference, file locking on mmap'd segment files) which panic the merge
170            // thread and kill the IndexWriter — causing the intermittent
171            // "An index writer was killed" error (~1/5 indexing runs).
172            //
173            // With NoMergePolicy, all segment management is explicit: we accumulate
174            // segments during indexing and they're consolidated at commit points.
175            // This trades slightly more segments for 100% reliability.
176            match index.writer(50_000_000) {
177                Ok(writer) => {
178                    writer.set_merge_policy(Box::new(NoMergePolicy));
179                    return Ok(writer);
180                }
181                Err(e) => {
182                    last_error = Some(e.to_string());
183                }
184            }
185        }
186
187        Err(anyhow!(
188            "Failed to create FTS writer after {} retries: {}",
189            max_retries,
190            last_error.unwrap_or_default()
191        ))
192    }
193
194    /// Try to clear stale lock files on Windows
195    fn try_clear_lock_files(fts_path: &Path) {
196        // Try to remove stale lock files
197        let lock_files = [".tantivy-writer.lock", ".tantivy-meta.lock"];
198        for lock_file in &lock_files {
199            let lock_path = fts_path.join(lock_file);
200            if lock_path.exists() {
201                let _ = std::fs::remove_file(&lock_path);
202            }
203        }
204    }
205
206    /// Ensure writer is initialized for indexing
207    fn ensure_writer(&mut self) -> Result<()> {
208        if self.writer.is_none() {
209            // Use retry logic for Windows file locking issues
210            let writer = Self::create_writer_with_retry(&self.index)?;
211            self.writer = Some(writer);
212        }
213        Ok(())
214    }
215
216    /// Add a chunk to the FTS index
217    ///
218    /// Includes writer recovery: if the writer was killed (e.g., by a background
219    /// merge thread panic), it will be recreated and the operation retried once.
220    pub fn add_chunk(
221        &mut self,
222        chunk_id: u32,
223        content: &str,
224        path: &str,
225        signature: Option<&str>,
226        kind: &str,
227    ) -> Result<()> {
228        self.ensure_writer()?;
229
230        // Copy field handles before mutable borrow
231        let chunk_id_field = self.chunk_id_field;
232        let content_field = self.content_field;
233        let path_field = self.path_field;
234        let signature_field = self.signature_field;
235        let kind_field = self.kind_field;
236
237        let mut doc = TantivyDocument::new();
238        doc.add_u64(chunk_id_field, chunk_id as u64);
239        doc.add_text(content_field, content);
240        doc.add_text(path_field, path);
241        doc.add_text(kind_field, kind);
242        if let Some(sig) = signature {
243            doc.add_text(signature_field, sig);
244        }
245
246        let writer = self.writer.as_mut().unwrap();
247        match writer.add_document(doc) {
248            Ok(_) => Ok(()),
249            Err(e) => {
250                let error_str = e.to_string();
251                if error_str.contains("writer was killed")
252                    || error_str.contains("index writer was killed")
253                {
254                    tracing::debug!(
255                        "FTS writer was killed, recreating and retrying add_chunk for chunk {}",
256                        chunk_id
257                    );
258
259                    // Drop the dead writer and recreate
260                    self.writer = None;
261                    self.ensure_writer()?;
262
263                    // Rebuild the document for retry
264                    let mut retry_doc = TantivyDocument::new();
265                    retry_doc.add_u64(chunk_id_field, chunk_id as u64);
266                    retry_doc.add_text(content_field, content);
267                    retry_doc.add_text(path_field, path);
268                    retry_doc.add_text(kind_field, kind);
269                    if let Some(sig) = signature {
270                        retry_doc.add_text(signature_field, sig);
271                    }
272
273                    let writer = self.writer.as_mut().unwrap();
274                    writer.add_document(retry_doc).map_err(|e| {
275                        anyhow!("FTS add_document failed after writer recovery: {}", e)
276                    })?;
277                    Ok(())
278                } else {
279                    Err(anyhow!("FTS add_document failed: {}", error_str))
280                }
281            }
282        }
283    }
284
285    /// Delete a chunk by ID
286    pub fn delete_chunk(&mut self, chunk_id: u32) -> Result<()> {
287        self.ensure_writer()?;
288        let chunk_id_field = self.chunk_id_field;
289        let writer = self.writer.as_mut().unwrap();
290        let term = Term::from_field_u64(chunk_id_field, chunk_id as u64);
291        writer.delete_term(term);
292        Ok(())
293    }
294
295    /// Delete all chunks for a file path
296    #[allow(dead_code)] // Reserved for file-level deletion
297    pub fn delete_by_path(&mut self, path: &str) -> Result<()> {
298        self.ensure_writer()?;
299        let path_field = self.path_field;
300        let writer = self.writer.as_mut().unwrap();
301        let term = Term::from_field_text(path_field, path);
302        writer.delete_term(term);
303        Ok(())
304    }
305
306    /// Commit pending changes with retry logic for Windows file locking.
307    ///
308    /// If the writer was killed (background merge panic), it is recreated.
309    /// Data since the last successful commit will be lost in that case, but
310    /// indexing can continue rather than aborting entirely.
311    pub fn commit(&mut self) -> Result<()> {
312        if self.writer.is_none() {
313            return Ok(());
314        }
315
316        let max_retries = 5;
317        let mut last_error: Option<String> = None;
318
319        for attempt in 0..max_retries {
320            if attempt > 0 {
321                // Wait before retry (exponential backoff: 100ms, 200ms, 400ms, 800ms)
322                std::thread::sleep(std::time::Duration::from_millis(100 * (1 << attempt)));
323            }
324
325            let writer = self.writer.as_mut().unwrap();
326            match writer.commit() {
327                Ok(_) => {
328                    // Reload reader to see changes
329                    if let Err(e) = self.reader.reload() {
330                        // Non-fatal: reader will eventually catch up
331                        tracing::debug!("Reader reload warning: {}", e);
332                    }
333                    return Ok(());
334                }
335                Err(e) => {
336                    let error_str = e.to_string();
337                    last_error = Some(error_str.clone());
338
339                    // Writer was killed by background thread panic — recreate it
340                    if error_str.contains("writer was killed")
341                        || error_str.contains("index writer was killed")
342                    {
343                        tracing::debug!(
344                            "FTS writer was killed during commit (attempt {}/{}). \
345                             Recreating writer. Data since last commit may be lost.",
346                            attempt + 1,
347                            max_retries
348                        );
349                        self.writer = None;
350                        self.ensure_writer()?;
351                        // After recreating, the pending data is gone, so commit
352                        // the new (empty) writer to ensure a clean state
353                        if let Some(ref mut w) = self.writer {
354                            w.commit()
355                                .map_err(|e| anyhow!("FTS commit after recovery failed: {}", e))?;
356                        }
357                        if let Err(e) = self.reader.reload() {
358                            tracing::debug!("Reader reload warning: {}", e);
359                        }
360                        return Ok(());
361                    }
362
363                    // File locking error — retry with backoff
364                    if error_str.contains("Access is denied")
365                        || error_str.contains("PermissionDenied")
366                        || error_str.contains("IoError")
367                    {
368                        tracing::debug!(
369                            "FTS commit retry {}/{}: {}",
370                            attempt + 1,
371                            max_retries,
372                            error_str
373                        );
374                        // Continue to retry
375                    } else {
376                        // Non-recoverable error, fail immediately
377                        return Err(anyhow!("FTS commit failed: {}", error_str));
378                    }
379                }
380            }
381        }
382
383        // All retries exhausted
384        Err(anyhow!(
385            "FTS commit failed after {} retries: {}",
386            max_retries,
387            last_error.unwrap_or_default()
388        ))
389    }
390
391    /// Search using BM25
392    ///
393    /// If `target_kind` is provided, boosts results matching that ChunkKind (e.g., "class", "function").
394    pub fn search(
395        &self,
396        query: &str,
397        limit: usize,
398        target_kind: Option<ChunkKind>,
399    ) -> Result<Vec<FtsResult>> {
400        let searcher = self.reader.searcher();
401
402        // Parse query against content, signature, and kind fields
403        let mut query_parser = QueryParser::for_index(
404            &self.index,
405            vec![self.content_field, self.signature_field, self.kind_field],
406        );
407
408        // Boost signature field for better matching of function names, class names, etc.
409        query_parser.set_field_boost(self.signature_field, 2.0);
410
411        // Boost kind field when structural intent is detected
412        if let Some(ref _kind) = target_kind {
413            query_parser.set_field_boost(self.kind_field, 3.0); // High boost for kind field
414        }
415
416        // Parse query, fall back to match-all on error
417        let parsed_query = match query_parser.parse_query(query) {
418            Ok(q) => q,
419            Err(_) => {
420                // Try escaping special characters
421                let escaped = query.replace(
422                    [
423                        ':', '(', ')', '[', ']', '{', '}', '^', '"', '~', '*', '?', '\\', '/',
424                    ],
425                    " ",
426                );
427                query_parser.parse_query(&escaped)?
428            }
429        };
430
431        // Execute search
432        let top_docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
433
434        // Convert to results
435        let mut results = Vec::with_capacity(top_docs.len());
436        for (score, doc_address) in top_docs {
437            let doc: TantivyDocument = searcher.doc(doc_address)?;
438
439            if let Some(chunk_id) = doc.get_first(self.chunk_id_field) {
440                if let Some(id) = chunk_id.as_u64() {
441                    results.push(FtsResult {
442                        chunk_id: id as u32,
443                        score,
444                    });
445                }
446            }
447        }
448
449        Ok(results)
450    }
451
452    /// Search for exact identifier matches (boosted)
453    ///
454    /// Used for improving exact name matching (e.g., "BaseRestClient", "UserService").
455    ///
456    /// If `target_kind` is provided, uses selective boosting:
457    /// - When both identifier AND kind are present, applies MUST constraint: items must match
458    ///   the identifier in the signature field AND the kind (prevents boosting ALL items of that kind)
459    /// - Otherwise, uses standard boost on the kind field
460    pub fn search_exact(
461        &self,
462        identifier: &str,
463        limit: usize,
464        target_kind: Option<ChunkKind>,
465    ) -> Result<Vec<FtsResult>> {
466        use tantivy::query::{BooleanQuery, BoostQuery, Occur, TermQuery};
467        use tantivy::schema::IndexRecordOption;
468
469        let searcher = self.reader.searcher();
470
471        // Search signature field with exact term
472        let term = Term::from_field_text(self.signature_field, identifier);
473        let term_query = TermQuery::new(term, IndexRecordOption::Basic);
474
475        // Also search content field for the identifier as a phrase
476        let content_term = Term::from_field_text(self.content_field, identifier);
477        let content_query = TermQuery::new(content_term, IndexRecordOption::Basic);
478
479        // Boost signature matches 3x over content matches
480        let boosted_sig = BoostQuery::new(Box::new(term_query), 3.0);
481
482        // Build query based on whether we have both identifier and kind
483        let combined = if let Some(ref kind) = target_kind {
484            // SELECTIVE MODE: When both identifier AND kind are detected,
485            // create a MUST query that requires BOTH conditions.
486            // This prevents boosting ALL items of that kind (e.g., all enums when searching for "ChunkKind enum")
487            let kind_str = format!("{:?}", kind);
488            let kind_term = Term::from_field_text(self.kind_field, &kind_str);
489            let kind_query = TermQuery::new(kind_term, IndexRecordOption::Basic);
490
491            // Combine: (signature OR content) AND kind
492            // This means: only items that match the identifier AND are of the target kind get boosted
493            let sig_or_content =
494                BooleanQuery::union(vec![Box::new(boosted_sig), Box::new(content_query)]);
495            let mut and_queries: Vec<(Occur, Box<dyn tantivy::query::Query>)> = vec![];
496            and_queries.push((Occur::Must, Box::new(sig_or_content)));
497            and_queries.push((Occur::Must, Box::new(kind_query)));
498            BooleanQuery::new(and_queries)
499        } else {
500            // STANDARD MODE: Just search for the identifier in signature and content
501            BooleanQuery::union(vec![Box::new(boosted_sig), Box::new(content_query)])
502        };
503
504        let top_docs = searcher.search(&combined, &TopDocs::with_limit(limit))?;
505
506        // Convert to results
507        let mut results = Vec::with_capacity(top_docs.len());
508        for (score, doc_address) in top_docs {
509            let doc: TantivyDocument = searcher.doc(doc_address)?;
510
511            if let Some(chunk_id) = doc.get_first(self.chunk_id_field) {
512                if let Some(id) = chunk_id.as_u64() {
513                    results.push(FtsResult {
514                        chunk_id: id as u32,
515                        score,
516                    });
517                }
518            }
519        }
520
521        Ok(results)
522    }
523
524    /// Get statistics about the index
525    pub fn stats(&self) -> Result<FtsStats> {
526        let searcher = self.reader.searcher();
527        let num_docs = searcher.num_docs() as usize;
528
529        Ok(FtsStats {
530            num_documents: num_docs,
531        })
532    }
533
534    /// Clear the entire index
535    #[allow(dead_code)] // Reserved for index reset
536    pub fn clear(&mut self) -> Result<()> {
537        self.ensure_writer()?;
538        let writer = self.writer.as_mut().unwrap();
539        writer.delete_all_documents()?;
540        writer.commit()?;
541        self.reader.reload()?;
542        Ok(())
543    }
544}
545
546/// Statistics about the FTS index
547#[derive(Debug, Clone)]
548#[allow(dead_code)] // Part of public API for debugging/monitoring
549pub struct FtsStats {
550    #[allow(dead_code)] // Part of public API for debugging/monitoring
551    pub num_documents: usize,
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557    use tempfile::tempdir;
558
559    #[test]
560    fn test_fts_basic() -> Result<()> {
561        let dir = tempdir()?;
562        let db_path = dir.path().to_path_buf();
563
564        let mut store = FtsStore::new(&db_path)?;
565
566        // Add some chunks
567        store.add_chunk(
568            1,
569            "fn hello_world() { println!(\"Hello!\"); }",
570            "src/main.rs",
571            Some("hello_world"),
572            "function",
573        )?;
574        store.add_chunk(
575            2,
576            "struct UserConfig { name: String, age: u32 }",
577            "src/config.rs",
578            Some("UserConfig"),
579            "struct",
580        )?;
581        store.add_chunk(
582            3,
583            "fn process_data(data: Vec<u8>) -> Result<()>",
584            "src/processor.rs",
585            Some("process_data"),
586            "function",
587        )?;
588
589        store.commit()?;
590
591        // Search for hello
592        let results = store.search("hello", 10, None)?;
593        assert!(!results.is_empty());
594        assert_eq!(results[0].chunk_id, 1);
595
596        // Search for UserConfig
597        let results = store.search("UserConfig", 10, None)?;
598        assert!(!results.is_empty());
599        assert_eq!(results[0].chunk_id, 2);
600
601        // Search for process
602        let results = store.search("process data", 10, None)?;
603        assert!(!results.is_empty());
604        assert_eq!(results[0].chunk_id, 3);
605
606        Ok(())
607    }
608
609    #[test]
610    fn test_fts_delete() -> Result<()> {
611        let dir = tempdir()?;
612        let db_path = dir.path().to_path_buf();
613
614        let mut store = FtsStore::new(&db_path)?;
615
616        store.add_chunk(1, "test content one", "file1.rs", None, "block")?;
617        store.add_chunk(2, "test content two", "file2.rs", None, "block")?;
618        store.commit()?;
619
620        // Should find both
621        let results = store.search("test content", 10, None)?;
622        assert_eq!(results.len(), 2);
623
624        // Delete one
625        store.delete_chunk(1)?;
626        store.commit()?;
627
628        // Should find only one
629        let results = store.search("test content", 10, None)?;
630        assert_eq!(results.len(), 1);
631        assert_eq!(results[0].chunk_id, 2);
632
633        Ok(())
634    }
635}