use crate::search::SearchDoc;
use crate::search::schema::{SearchFields, build_schema, event_key};
use anyhow::{Context, Result};
use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tantivy::schema::Term;
use tantivy::{Index, IndexWriter, TantivyDocument, doc};
const HEAP_BYTES: usize = 50_000_000;
const BATCH_DOCS: usize = 1000;
const BATCH_SECS: u64 = 1;
pub struct PendingWriter {
writer: IndexWriter<TantivyDocument>,
fields: SearchFields,
pending: usize,
last_commit: Instant,
_lock: File,
}
impl PendingWriter {
pub fn open(root: &Path) -> Result<Self> {
let dir = index_dir(root);
std::fs::create_dir_all(&dir)?;
let lock = lock_file(&dir)?;
let (schema, fields) = build_schema();
let index = open_or_create(&dir, schema)?;
let writer = index.writer(HEAP_BYTES)?;
Ok(Self {
writer,
fields,
pending: 0,
last_commit: Instant::now(),
_lock: lock,
})
}
pub fn add(&mut self, doc: &SearchDoc) -> Result<()> {
self.writer.delete_term(Term::from_field_text(
self.fields.event_key,
&event_key(&doc.session_id, doc.seq),
));
self.writer.add_document(self.tantivy_doc(doc))?;
self.pending += 1;
if self.should_commit() {
self.commit()?;
}
Ok(())
}
pub fn commit(&mut self) -> Result<()> {
if self.pending == 0 {
return Ok(());
}
self.writer.commit()?;
self.pending = 0;
self.last_commit = Instant::now();
Ok(())
}
fn should_commit(&self) -> bool {
self.pending >= BATCH_DOCS || self.last_commit.elapsed() >= Duration::from_secs(BATCH_SECS)
}
fn tantivy_doc(&self, d: &SearchDoc) -> TantivyDocument {
let mut doc = doc!(
self.fields.session_id => d.session_id.clone(),
self.fields.seq => d.seq as i64,
self.fields.event_key => event_key(&d.session_id, d.seq),
self.fields.ts_ms => d.ts_ms as i64,
self.fields.agent => d.agent.clone(),
self.fields.kind => d.kind.clone(),
self.fields.text => d.text.clone(),
self.fields.tokens_total => d.tokens_total,
);
d.paths
.iter()
.for_each(|p| doc.add_text(self.fields.path, p));
d.skills
.iter()
.for_each(|s| doc.add_text(self.fields.skill, s));
doc
}
}
pub fn delete_sessions(root: &Path, ids: &[String]) -> Result<()> {
if ids.is_empty() {
return Ok(());
}
let mut writer = PendingWriter::open(root)?;
for id in ids {
writer
.writer
.delete_term(Term::from_field_text(writer.fields.session_id, id));
}
writer.pending = ids.len();
writer.commit()
}
pub fn index_dir(root: &Path) -> PathBuf {
root.join("search")
}
fn lock_file(dir: &Path) -> Result<File> {
let file = OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(dir.join(".writer.lock"))?;
file.lock().context("lock search writer")?;
Ok(file)
}
fn open_or_create(dir: &Path, schema: tantivy::schema::Schema) -> Result<Index> {
Ok(Index::open_in_dir(dir).or_else(|_| Index::create_in_dir(dir, schema))?)
}