use anyhow::{Context, Result};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;
use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
use tantivy::schema::*;
use tantivy::{Index, IndexWriter, ReloadPolicy, TantivyDocument, doc};
pub struct BM25Search {
index: Index,
id_field: Field,
content_field: Field,
file_path_field: Field,
index_path: std::path::PathBuf,
writer_lock: Mutex<()>,
}
#[derive(Debug, Clone)]
pub struct BM25Result {
pub id: u64,
pub score: f32,
}
impl BM25Search {
pub fn new<P: AsRef<Path>>(index_path: P) -> Result<Self> {
let index_path = index_path.as_ref().to_path_buf();
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", STORED | INDEXED);
let content_field = schema_builder.add_text_field("content", TEXT);
let file_path_field = schema_builder.add_text_field("file_path", STRING | STORED);
let schema = schema_builder.build();
std::fs::create_dir_all(&index_path).context("Failed to create BM25 index directory")?;
let index = if index_path.join("meta.json").exists() {
Index::open_in_dir(&index_path).context("Failed to open existing BM25 index")?
} else {
Index::create_in_dir(&index_path, schema.clone())
.context("Failed to create BM25 index")?
};
Ok(Self {
index,
id_field,
content_field,
file_path_field,
index_path,
writer_lock: Mutex::new(()),
})
}
fn is_lock_stale(lock_path: &Path) -> bool {
if !lock_path.exists() {
return false;
}
if let Ok(metadata) = std::fs::metadata(lock_path)
&& let Ok(modified) = metadata.modified()
&& let Ok(elapsed) = modified.elapsed()
{
return elapsed.as_secs() > 300;
}
false
}
fn try_cleanup_stale_locks(index_path: &Path) -> Result<bool> {
let writer_lock = index_path.join(".tantivy-writer.lock");
let meta_lock = index_path.join(".tantivy-meta.lock");
let writer_stale = Self::is_lock_stale(&writer_lock);
let meta_stale = Self::is_lock_stale(&meta_lock);
if !writer_stale && !meta_stale {
return Ok(false); }
if writer_stale && writer_lock.exists() {
tracing::warn!(
"Removing stale Tantivy writer lock file (>5min old): {:?}",
writer_lock
);
std::fs::remove_file(&writer_lock)
.context("Failed to remove stale writer lock file")?;
}
if meta_stale && meta_lock.exists() {
tracing::warn!(
"Removing stale Tantivy meta lock file (>5min old): {:?}",
meta_lock
);
std::fs::remove_file(&meta_lock).context("Failed to remove stale meta lock file")?;
}
Ok(true) }
pub fn add_documents(&self, documents: Vec<(u64, String, String)>) -> Result<()> {
let _guard = self
.writer_lock
.lock()
.map_err(|e| anyhow::anyhow!("Failed to acquire writer lock: {}", e))?;
let mut index_writer: IndexWriter<TantivyDocument> = match self.index.writer(50_000_000) {
Ok(writer) => writer,
Err(e) => {
let error_msg = format!("{}", e);
if error_msg.contains("lock") || error_msg.contains("Lock") {
tracing::warn!(
"Index writer creation failed (possibly locked), checking for stale locks..."
);
match Self::try_cleanup_stale_locks(&self.index_path) {
Ok(true) => {
tracing::info!("Stale locks cleaned up, retrying writer creation...");
self.index.writer(50_000_000).context(
"Failed to create index writer after cleaning stale locks",
)?
}
Ok(false) => {
return Err(anyhow::anyhow!(
"BM25 index is currently being used by another process. Please wait and try again later."
));
}
Err(cleanup_err) => {
return Err(anyhow::anyhow!(
"Failed to create index writer (locked) and failed to cleanup stale locks: {}. Original error: {}",
cleanup_err,
e
));
}
}
} else {
return Err(e).context("Failed to create index writer");
}
}
};
for (id, content, file_path) in documents {
let doc = doc!(
self.id_field => id,
self.content_field => content,
self.file_path_field => file_path,
);
index_writer
.add_document(doc)
.context("Failed to add document")?;
}
index_writer
.commit()
.context("Failed to commit documents")?;
Ok(())
}
pub fn search(&self, query_text: &str, limit: usize) -> Result<Vec<BM25Result>> {
let reader = self
.index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.context("Failed to create index reader")?;
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&self.index, vec![self.content_field]);
let (query, _errors) = query_parser.parse_query_lenient(query_text);
let top_docs = searcher
.search(&query, &TopDocs::with_limit(limit))
.context("Failed to execute search")?;
let mut results = Vec::new();
for (score, doc_address) in top_docs {
let retrieved_doc: TantivyDocument = searcher
.doc(doc_address)
.context("Failed to retrieve document")?;
if let Some(id_value) = retrieved_doc.get_first(self.id_field)
&& let Some(id) = id_value.as_u64()
{
results.push(BM25Result { id, score });
}
}
Ok(results)
}
pub fn delete_by_id(&self, id: u64) -> Result<()> {
let _guard = self
.writer_lock
.lock()
.map_err(|e| anyhow::anyhow!("Failed to acquire writer lock: {}", e))?;
let mut index_writer: IndexWriter<TantivyDocument> = self
.index
.writer(50_000_000)
.context("Failed to create index writer")?;
let term = Term::from_field_u64(self.id_field, id);
index_writer.delete_term(term);
index_writer.commit().context("Failed to commit deletion")?;
Ok(())
}
pub fn delete_by_file_path(&self, file_path: &str) -> Result<usize> {
let _guard = self
.writer_lock
.lock()
.map_err(|e| anyhow::anyhow!("Failed to acquire writer lock: {}", e))?;
let mut index_writer: IndexWriter<TantivyDocument> = self
.index
.writer(50_000_000)
.context("Failed to create index writer")?;
let term = Term::from_field_text(self.file_path_field, file_path);
index_writer.delete_term(term);
index_writer
.commit()
.context("Failed to commit file_path deletion")?;
Ok(0)
}
pub fn clear(&self) -> Result<()> {
let _guard = self
.writer_lock
.lock()
.map_err(|e| anyhow::anyhow!("Failed to acquire writer lock: {}", e))?;
let mut index_writer: IndexWriter<TantivyDocument> = self
.index
.writer(50_000_000)
.context("Failed to create index writer")?;
index_writer
.delete_all_documents()
.context("Failed to delete all documents")?;
index_writer.commit().context("Failed to commit clear")?;
Ok(())
}
pub fn get_stats(&self) -> Result<BM25Stats> {
let reader = self
.index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.context("Failed to create index reader")?;
let searcher = reader.searcher();
let total_docs = searcher.num_docs() as usize;
Ok(BM25Stats {
total_documents: total_docs,
})
}
}
#[derive(Debug, Clone)]
pub struct BM25Stats {
pub total_documents: usize,
}
pub const RRF_K_CONSTANT: f32 = 60.0;
pub fn reciprocal_rank_fusion(
vector_results: Vec<(u64, f32)>,
bm25_results: Vec<BM25Result>,
k: usize,
) -> Vec<(u64, f32)> {
let bm25_tuples: Vec<(u64, f32)> = bm25_results.into_iter().map(|r| (r.id, r.score)).collect();
reciprocal_rank_fusion_generic([vector_results, bm25_tuples], k)
}
pub fn reciprocal_rank_fusion_generic<T, I, L>(ranked_lists: I, limit: usize) -> Vec<(T, f32)>
where
T: Eq + std::hash::Hash + Clone,
I: IntoIterator<Item = L>,
L: IntoIterator<Item = (T, f32)>,
{
let mut score_map: HashMap<T, f32> = HashMap::new();
for list in ranked_lists {
for (rank, (id, _score)) in list.into_iter().enumerate() {
let rrf_score = 1.0 / (RRF_K_CONSTANT + (rank + 1) as f32);
*score_map.entry(id).or_insert(0.0) += rrf_score;
}
}
let mut combined: Vec<(T, f32)> = score_map.into_iter().collect();
combined.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
combined.truncate(limit);
combined
}