1use anyhow::{anyhow, Result};
11use std::path::Path;
12use tantivy::{
13 collector::TopDocs,
14 directory::MmapDirectory,
15 merge_policy::NoMergePolicy,
16 query::QueryParser,
17 schema::{Field, NumericOptions, Schema, Value, STORED, STRING, TEXT},
18 Index, IndexReader, IndexSettings, IndexWriter, TantivyDocument, Term,
19};
20
21use crate::chunker::ChunkKind;
22
23#[derive(Debug, Clone)]
25pub struct FtsResult {
26 pub chunk_id: u32,
28 pub score: f32,
30}
31
32pub struct FtsStore {
37 index: Index,
38 reader: IndexReader,
39 writer: Option<IndexWriter>,
40 #[allow(dead_code)]
41 schema: Schema,
42 chunk_id_field: Field,
44 content_field: Field,
45 path_field: Field,
46 signature_field: Field,
47 kind_field: Field,
48}
49
50impl FtsStore {
51 pub fn new(db_path: &Path) -> Result<Self> {
56 let fts_path = db_path.join("fts");
57 std::fs::create_dir_all(&fts_path)?;
58
59 let mut schema_builder = Schema::builder();
61
62 let chunk_id_field = schema_builder.add_u64_field(
64 "chunk_id",
65 NumericOptions::default().set_indexed().set_stored(),
66 );
67
68 let content_field = schema_builder.add_text_field("content", TEXT);
70
71 let path_field = schema_builder.add_text_field("path", STRING | STORED);
73
74 let signature_field = schema_builder.add_text_field("signature", TEXT);
76
77 let kind_field = schema_builder.add_text_field("kind", STRING | STORED);
79
80 let schema = schema_builder.build();
81
82 let index = Self::open_or_create_index_with_retry(&fts_path, &schema)?;
84
85 let reader = index.reader()?;
87
88 Ok(Self {
89 index,
90 reader,
91 writer: None, schema,
93 chunk_id_field,
94 content_field,
95 path_field,
96 signature_field,
97 kind_field,
98 })
99 }
100
101 pub fn new_with_writer(db_path: &Path) -> Result<Self> {
106 let mut store = Self::new(db_path)?;
107 store.ensure_writer()?;
108 Ok(store)
109 }
110
111 fn open_or_create_index_with_retry(fts_path: &Path, schema: &Schema) -> Result<Index> {
113 let max_retries = 3;
114 let mut last_error: Option<String> = None;
115
116 for attempt in 0..max_retries {
117 if attempt > 0 {
118 std::thread::sleep(std::time::Duration::from_millis(100 * (1 << attempt)));
120 }
121
122 let result: Result<Index, _> = if fts_path.join("meta.json").exists() {
123 Index::open_in_dir(fts_path).map_err(|e| e.to_string())
124 } else {
125 MmapDirectory::open(fts_path)
126 .map_err(|e| e.to_string())
127 .and_then(|dir| {
128 Index::create(dir, schema.clone(), IndexSettings::default())
129 .map_err(|e| e.to_string())
130 })
131 };
132
133 match result {
134 Ok(index) => return Ok(index),
135 Err(e) => {
136 last_error = Some(e);
137 if attempt < max_retries - 1 {
139 Self::try_clear_lock_files(fts_path);
140 }
141 }
142 }
143 }
144
145 Err(anyhow!(
146 "Failed to open FTS index after {} retries: {}",
147 max_retries,
148 last_error.unwrap_or_default()
149 ))
150 }
151
152 fn create_writer_with_retry(index: &Index) -> Result<IndexWriter> {
155 let max_retries = 5; let mut last_error: Option<String> = None;
157
158 for attempt in 0..max_retries {
159 if attempt > 0 {
160 std::thread::sleep(std::time::Duration::from_millis(200 * (1 << attempt)));
163 }
164
165 match index.writer(50_000_000) {
177 Ok(writer) => {
178 writer.set_merge_policy(Box::new(NoMergePolicy));
179 return Ok(writer);
180 }
181 Err(e) => {
182 last_error = Some(e.to_string());
183 }
184 }
185 }
186
187 Err(anyhow!(
188 "Failed to create FTS writer after {} retries: {}",
189 max_retries,
190 last_error.unwrap_or_default()
191 ))
192 }
193
194 fn try_clear_lock_files(fts_path: &Path) {
196 let lock_files = [".tantivy-writer.lock", ".tantivy-meta.lock"];
198 for lock_file in &lock_files {
199 let lock_path = fts_path.join(lock_file);
200 if lock_path.exists() {
201 let _ = std::fs::remove_file(&lock_path);
202 }
203 }
204 }
205
206 fn ensure_writer(&mut self) -> Result<()> {
208 if self.writer.is_none() {
209 let writer = Self::create_writer_with_retry(&self.index)?;
211 self.writer = Some(writer);
212 }
213 Ok(())
214 }
215
216 pub fn add_chunk(
221 &mut self,
222 chunk_id: u32,
223 content: &str,
224 path: &str,
225 signature: Option<&str>,
226 kind: &str,
227 ) -> Result<()> {
228 self.ensure_writer()?;
229
230 let chunk_id_field = self.chunk_id_field;
232 let content_field = self.content_field;
233 let path_field = self.path_field;
234 let signature_field = self.signature_field;
235 let kind_field = self.kind_field;
236
237 let mut doc = TantivyDocument::new();
238 doc.add_u64(chunk_id_field, chunk_id as u64);
239 doc.add_text(content_field, content);
240 doc.add_text(path_field, path);
241 doc.add_text(kind_field, kind);
242 if let Some(sig) = signature {
243 doc.add_text(signature_field, sig);
244 }
245
246 let writer = self.writer.as_mut().unwrap();
247 match writer.add_document(doc) {
248 Ok(_) => Ok(()),
249 Err(e) => {
250 let error_str = e.to_string();
251 if error_str.contains("writer was killed")
252 || error_str.contains("index writer was killed")
253 {
254 tracing::debug!(
255 "FTS writer was killed, recreating and retrying add_chunk for chunk {}",
256 chunk_id
257 );
258
259 self.writer = None;
261 self.ensure_writer()?;
262
263 let mut retry_doc = TantivyDocument::new();
265 retry_doc.add_u64(chunk_id_field, chunk_id as u64);
266 retry_doc.add_text(content_field, content);
267 retry_doc.add_text(path_field, path);
268 retry_doc.add_text(kind_field, kind);
269 if let Some(sig) = signature {
270 retry_doc.add_text(signature_field, sig);
271 }
272
273 let writer = self.writer.as_mut().unwrap();
274 writer.add_document(retry_doc).map_err(|e| {
275 anyhow!("FTS add_document failed after writer recovery: {}", e)
276 })?;
277 Ok(())
278 } else {
279 Err(anyhow!("FTS add_document failed: {}", error_str))
280 }
281 }
282 }
283 }
284
285 pub fn delete_chunk(&mut self, chunk_id: u32) -> Result<()> {
287 self.ensure_writer()?;
288 let chunk_id_field = self.chunk_id_field;
289 let writer = self.writer.as_mut().unwrap();
290 let term = Term::from_field_u64(chunk_id_field, chunk_id as u64);
291 writer.delete_term(term);
292 Ok(())
293 }
294
295 #[allow(dead_code)] pub fn delete_by_path(&mut self, path: &str) -> Result<()> {
298 self.ensure_writer()?;
299 let path_field = self.path_field;
300 let writer = self.writer.as_mut().unwrap();
301 let term = Term::from_field_text(path_field, path);
302 writer.delete_term(term);
303 Ok(())
304 }
305
306 pub fn commit(&mut self) -> Result<()> {
312 if self.writer.is_none() {
313 return Ok(());
314 }
315
316 let max_retries = 5;
317 let mut last_error: Option<String> = None;
318
319 for attempt in 0..max_retries {
320 if attempt > 0 {
321 std::thread::sleep(std::time::Duration::from_millis(100 * (1 << attempt)));
323 }
324
325 let writer = self.writer.as_mut().unwrap();
326 match writer.commit() {
327 Ok(_) => {
328 if let Err(e) = self.reader.reload() {
330 tracing::debug!("Reader reload warning: {}", e);
332 }
333 return Ok(());
334 }
335 Err(e) => {
336 let error_str = e.to_string();
337 last_error = Some(error_str.clone());
338
339 if error_str.contains("writer was killed")
341 || error_str.contains("index writer was killed")
342 {
343 tracing::debug!(
344 "FTS writer was killed during commit (attempt {}/{}). \
345 Recreating writer. Data since last commit may be lost.",
346 attempt + 1,
347 max_retries
348 );
349 self.writer = None;
350 self.ensure_writer()?;
351 if let Some(ref mut w) = self.writer {
354 w.commit()
355 .map_err(|e| anyhow!("FTS commit after recovery failed: {}", e))?;
356 }
357 if let Err(e) = self.reader.reload() {
358 tracing::debug!("Reader reload warning: {}", e);
359 }
360 return Ok(());
361 }
362
363 if error_str.contains("Access is denied")
365 || error_str.contains("PermissionDenied")
366 || error_str.contains("IoError")
367 {
368 tracing::debug!(
369 "FTS commit retry {}/{}: {}",
370 attempt + 1,
371 max_retries,
372 error_str
373 );
374 } else {
376 return Err(anyhow!("FTS commit failed: {}", error_str));
378 }
379 }
380 }
381 }
382
383 Err(anyhow!(
385 "FTS commit failed after {} retries: {}",
386 max_retries,
387 last_error.unwrap_or_default()
388 ))
389 }
390
391 pub fn search(
395 &self,
396 query: &str,
397 limit: usize,
398 target_kind: Option<ChunkKind>,
399 ) -> Result<Vec<FtsResult>> {
400 let searcher = self.reader.searcher();
401
402 let mut query_parser = QueryParser::for_index(
404 &self.index,
405 vec![self.content_field, self.signature_field, self.kind_field],
406 );
407
408 query_parser.set_field_boost(self.signature_field, 2.0);
410
411 if let Some(ref _kind) = target_kind {
413 query_parser.set_field_boost(self.kind_field, 3.0); }
415
416 let parsed_query = match query_parser.parse_query(query) {
418 Ok(q) => q,
419 Err(_) => {
420 let escaped = query.replace(
422 [
423 ':', '(', ')', '[', ']', '{', '}', '^', '"', '~', '*', '?', '\\', '/',
424 ],
425 " ",
426 );
427 query_parser.parse_query(&escaped)?
428 }
429 };
430
431 let top_docs = searcher.search(&parsed_query, &TopDocs::with_limit(limit))?;
433
434 let mut results = Vec::with_capacity(top_docs.len());
436 for (score, doc_address) in top_docs {
437 let doc: TantivyDocument = searcher.doc(doc_address)?;
438
439 if let Some(chunk_id) = doc.get_first(self.chunk_id_field) {
440 if let Some(id) = chunk_id.as_u64() {
441 results.push(FtsResult {
442 chunk_id: id as u32,
443 score,
444 });
445 }
446 }
447 }
448
449 Ok(results)
450 }
451
452 pub fn search_exact(
461 &self,
462 identifier: &str,
463 limit: usize,
464 target_kind: Option<ChunkKind>,
465 ) -> Result<Vec<FtsResult>> {
466 use tantivy::query::{BooleanQuery, BoostQuery, Occur, TermQuery};
467 use tantivy::schema::IndexRecordOption;
468
469 let searcher = self.reader.searcher();
470
471 let term = Term::from_field_text(self.signature_field, identifier);
473 let term_query = TermQuery::new(term, IndexRecordOption::Basic);
474
475 let content_term = Term::from_field_text(self.content_field, identifier);
477 let content_query = TermQuery::new(content_term, IndexRecordOption::Basic);
478
479 let boosted_sig = BoostQuery::new(Box::new(term_query), 3.0);
481
482 let combined = if let Some(ref kind) = target_kind {
484 let kind_str = format!("{:?}", kind);
488 let kind_term = Term::from_field_text(self.kind_field, &kind_str);
489 let kind_query = TermQuery::new(kind_term, IndexRecordOption::Basic);
490
491 let sig_or_content =
494 BooleanQuery::union(vec![Box::new(boosted_sig), Box::new(content_query)]);
495 let mut and_queries: Vec<(Occur, Box<dyn tantivy::query::Query>)> = vec![];
496 and_queries.push((Occur::Must, Box::new(sig_or_content)));
497 and_queries.push((Occur::Must, Box::new(kind_query)));
498 BooleanQuery::new(and_queries)
499 } else {
500 BooleanQuery::union(vec![Box::new(boosted_sig), Box::new(content_query)])
502 };
503
504 let top_docs = searcher.search(&combined, &TopDocs::with_limit(limit))?;
505
506 let mut results = Vec::with_capacity(top_docs.len());
508 for (score, doc_address) in top_docs {
509 let doc: TantivyDocument = searcher.doc(doc_address)?;
510
511 if let Some(chunk_id) = doc.get_first(self.chunk_id_field) {
512 if let Some(id) = chunk_id.as_u64() {
513 results.push(FtsResult {
514 chunk_id: id as u32,
515 score,
516 });
517 }
518 }
519 }
520
521 Ok(results)
522 }
523
524 pub fn stats(&self) -> Result<FtsStats> {
526 let searcher = self.reader.searcher();
527 let num_docs = searcher.num_docs() as usize;
528
529 Ok(FtsStats {
530 num_documents: num_docs,
531 })
532 }
533
534 #[allow(dead_code)] pub fn clear(&mut self) -> Result<()> {
537 self.ensure_writer()?;
538 let writer = self.writer.as_mut().unwrap();
539 writer.delete_all_documents()?;
540 writer.commit()?;
541 self.reader.reload()?;
542 Ok(())
543 }
544}
545
546#[derive(Debug, Clone)]
548#[allow(dead_code)] pub struct FtsStats {
550 #[allow(dead_code)] pub num_documents: usize,
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557 use tempfile::tempdir;
558
559 #[test]
560 fn test_fts_basic() -> Result<()> {
561 let dir = tempdir()?;
562 let db_path = dir.path().to_path_buf();
563
564 let mut store = FtsStore::new(&db_path)?;
565
566 store.add_chunk(
568 1,
569 "fn hello_world() { println!(\"Hello!\"); }",
570 "src/main.rs",
571 Some("hello_world"),
572 "function",
573 )?;
574 store.add_chunk(
575 2,
576 "struct UserConfig { name: String, age: u32 }",
577 "src/config.rs",
578 Some("UserConfig"),
579 "struct",
580 )?;
581 store.add_chunk(
582 3,
583 "fn process_data(data: Vec<u8>) -> Result<()>",
584 "src/processor.rs",
585 Some("process_data"),
586 "function",
587 )?;
588
589 store.commit()?;
590
591 let results = store.search("hello", 10, None)?;
593 assert!(!results.is_empty());
594 assert_eq!(results[0].chunk_id, 1);
595
596 let results = store.search("UserConfig", 10, None)?;
598 assert!(!results.is_empty());
599 assert_eq!(results[0].chunk_id, 2);
600
601 let results = store.search("process data", 10, None)?;
603 assert!(!results.is_empty());
604 assert_eq!(results[0].chunk_id, 3);
605
606 Ok(())
607 }
608
609 #[test]
610 fn test_fts_delete() -> Result<()> {
611 let dir = tempdir()?;
612 let db_path = dir.path().to_path_buf();
613
614 let mut store = FtsStore::new(&db_path)?;
615
616 store.add_chunk(1, "test content one", "file1.rs", None, "block")?;
617 store.add_chunk(2, "test content two", "file2.rs", None, "block")?;
618 store.commit()?;
619
620 let results = store.search("test content", 10, None)?;
622 assert_eq!(results.len(), 2);
623
624 store.delete_chunk(1)?;
626 store.commit()?;
627
628 let results = store.search("test content", 10, None)?;
630 assert_eq!(results.len(), 1);
631 assert_eq!(results[0].chunk_id, 2);
632
633 Ok(())
634 }
635}