use std::fs;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
use tantivy::collector::TopDocs;
use tantivy::directory::MmapDirectory;
use tantivy::query::QueryParser;
use tantivy::schema::Value;
use tantivy::schema::{Schema, Field, STORED, STRING, TEXT};
use tantivy::{Index, IndexReader, TantivyDocument, ReloadPolicy};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum FtsError {
#[error("Tantivy internal error: {0}")]
Tantivy(#[from] tantivy::TantivyError),
#[error("Failed to parse query: {0}")]
QueryParse(#[from] tantivy::query::QueryParserError),
#[error("Indexing queue is offline or full")]
QueueOffline,
#[error("I/O Error: {0}")]
Io(#[from] std::io::Error),
#[error("Tantivy open directory error: {0}")]
OpenDirectory(#[from] tantivy::directory::error::OpenDirectoryError),
#[error("Index writer error: {0}")]
Writer(String),
}
#[derive(Clone, Debug)]
pub struct FtsConfig {
pub index_path: String,
}
#[derive(Clone, Debug)]
pub struct Content {
pub id: String,
pub content: String,
}
pub enum IndexCommand {
Add(Content),
Delete(String),
}
#[derive(Clone)]
struct SchemaFields {
id: Field,
content: Field,
}
#[derive(Clone)]
pub struct Fts {
index: Index,
reader: IndexReader,
fields: SchemaFields,
command_sender: mpsc::SyncSender<IndexCommand>, }
impl Fts {
pub fn new(config: FtsConfig) -> Result<Self, FtsError> {
let mut schema_builder = Schema::builder();
let id = schema_builder.add_text_field("id", STRING | STORED);
let content = schema_builder.add_text_field("content", TEXT);
let schema = schema_builder.build();
let fields = SchemaFields { id, content };
fs::create_dir_all(&config.index_path)?;
let mmap_directory = MmapDirectory::open(&config.index_path)?;
let index = Index::open_or_create(mmap_directory, schema)?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommitWithDelay)
.try_into()?;
let (sync_tx, sync_rx) = mpsc::sync_channel(20_000);
let index_clone = index.clone();
let fields_clone = fields.clone();
thread::Builder::new()
.name("index-writer".to_string())
.spawn(move || {
run_index_writer(index_clone, fields_clone, sync_rx);
})
.map_err(|e| FtsError::Writer(e.to_string()))?;
Ok(Self {
index,
reader,
fields,
command_sender: sync_tx,
})
}
pub async fn add_doc(&self, content: Content) -> Result<(), FtsError> {
self.command_sender
.send(IndexCommand::Add(content))
.map_err(|_| FtsError::QueueOffline)
}
pub async fn del_doc(&self, doc_id: String) -> Result<(), FtsError> {
self.command_sender
.send(IndexCommand::Delete(doc_id))
.map_err(|_| FtsError::QueueOffline)
}
pub async fn edit_doc(&self, doc_id: String, new_content: Content) -> Result<(), FtsError> {
self.del_doc(doc_id).await?;
self.add_doc(new_content).await?;
Ok(())
}
pub fn search(&self, keyword: &str, limit: usize, offset: usize) -> Result<Vec<String>, FtsError> {
let searcher = self.reader.searcher();
let query_parser = QueryParser::for_index(&self.index, vec![self.fields.content]);
let query = query_parser.parse_query(keyword)?;
let collector = TopDocs::with_limit(limit).and_offset(offset);
let top_docs = searcher.search(&query, &collector)?;
let mut results = Vec::with_capacity(top_docs.len());
for (_score, doc_address) in top_docs {
if let Ok(retrieved_doc) = searcher.doc::<TantivyDocument>(doc_address) {
if let Some(id_value) = retrieved_doc.get_first(self.fields.id) {
if let Some(id_str) = id_value.as_str() {
results.push(id_str.to_string());
}
}
}
}
Ok(results)
}
}
fn run_index_writer(index: Index, fields: SchemaFields, receiver: mpsc::Receiver<IndexCommand>) {
let mut writer = match index.writer(100_000_000) { Ok(w) => w,
Err(e) => {
eprintln!("CRITICAL: Failed to create index writer: {}", e);
return;
}
};
let mut uncommitted = 0usize;
const COMMIT_THRESHOLD: usize = 1_000; const COMMIT_INTERVAL: Duration = Duration::from_secs(10);
let mut last_commit = Instant::now();
loop {
match receiver.recv_timeout(Duration::from_millis(500)) {
Ok(command) => {
match command {
IndexCommand::Add(content) => {
let mut doc = TantivyDocument::new();
doc.add_text(fields.id, &content.id);
doc.add_text(fields.content, &content.content);
if let Err(e) = writer.add_document(doc) {
eprintln!("Failed to add document {}: {}", content.id, e);
} else {
uncommitted += 1;
}
}
IndexCommand::Delete(doc_id) => {
let term = tantivy::Term::from_field_text(fields.id, &doc_id);
writer.delete_term(term);
uncommitted += 1;
}
}
if uncommitted >= COMMIT_THRESHOLD {
if let Err(e) = writer.commit() {
eprintln!("Failed to commit: {}", e);
} else {
uncommitted = 0;
last_commit = Instant::now();
}
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
if uncommitted > 0 && last_commit.elapsed() >= COMMIT_INTERVAL {
if let Err(e) = writer.commit() {
eprintln!("Failed to commit: {}", e);
} else {
uncommitted = 0;
last_commit = Instant::now();
}
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
}
if uncommitted > 0 {
let _ = writer.commit();
}
}