1use crate::search::SearchDoc;
5use crate::search::schema::{SearchFields, build_schema, event_key};
6use anyhow::{Context, Result};
7use std::fs::{File, OpenOptions};
8use std::path::{Path, PathBuf};
9use std::time::{Duration, Instant};
10use tantivy::schema::Term;
11use tantivy::{Index, IndexWriter, TantivyDocument, doc};
12
13const HEAP_BYTES: usize = 50_000_000;
14const BATCH_DOCS: usize = 1000;
15const BATCH_SECS: u64 = 1;
16
17pub struct PendingWriter {
18 writer: IndexWriter<TantivyDocument>,
19 fields: SearchFields,
20 pending: usize,
21 last_commit: Instant,
22 _lock: File,
23}
24
25impl PendingWriter {
26 pub fn open(root: &Path) -> Result<Self> {
27 let dir = index_dir(root);
28 std::fs::create_dir_all(&dir)?;
29 let lock = lock_file(&dir)?;
30 let (schema, fields) = build_schema();
31 let index = open_or_create(&dir, schema)?;
32 let writer = index.writer(HEAP_BYTES)?;
33 Ok(Self {
34 writer,
35 fields,
36 pending: 0,
37 last_commit: Instant::now(),
38 _lock: lock,
39 })
40 }
41
42 pub fn add(&mut self, doc: &SearchDoc) -> Result<()> {
43 self.writer.delete_term(Term::from_field_text(
44 self.fields.event_key,
45 &event_key(&doc.session_id, doc.seq),
46 ));
47 self.writer.add_document(self.tantivy_doc(doc))?;
48 self.pending += 1;
49 if self.should_commit() {
50 self.commit()?;
51 }
52 Ok(())
53 }
54
55 pub fn commit(&mut self) -> Result<()> {
56 if self.pending == 0 {
57 return Ok(());
58 }
59 self.writer.commit()?;
60 self.pending = 0;
61 self.last_commit = Instant::now();
62 Ok(())
63 }
64
65 fn should_commit(&self) -> bool {
66 self.pending >= BATCH_DOCS || self.last_commit.elapsed() >= Duration::from_secs(BATCH_SECS)
67 }
68
69 fn tantivy_doc(&self, d: &SearchDoc) -> TantivyDocument {
70 let mut doc = doc!(
71 self.fields.session_id => d.session_id.clone(),
72 self.fields.seq => d.seq as i64,
73 self.fields.event_key => event_key(&d.session_id, d.seq),
74 self.fields.ts_ms => d.ts_ms as i64,
75 self.fields.agent => d.agent.clone(),
76 self.fields.kind => d.kind.clone(),
77 self.fields.text => d.text.clone(),
78 self.fields.tokens_total => d.tokens_total,
79 );
80 d.paths
81 .iter()
82 .for_each(|p| doc.add_text(self.fields.path, p));
83 d.skills
84 .iter()
85 .for_each(|s| doc.add_text(self.fields.skill, s));
86 doc
87 }
88}
89
90pub fn delete_sessions(root: &Path, ids: &[String]) -> Result<()> {
91 if ids.is_empty() {
92 return Ok(());
93 }
94 let mut writer = PendingWriter::open(root)?;
95 for id in ids {
96 writer
97 .writer
98 .delete_term(Term::from_field_text(writer.fields.session_id, id));
99 }
100 writer.pending = ids.len();
101 writer.commit()
102}
103
104pub fn index_dir(root: &Path) -> PathBuf {
105 root.join("search")
106}
107
108fn lock_file(dir: &Path) -> Result<File> {
109 let file = OpenOptions::new()
110 .create(true)
111 .truncate(false)
112 .read(true)
113 .write(true)
114 .open(dir.join(".writer.lock"))?;
115 file.lock().context("lock search writer")?;
116 Ok(file)
117}
118
119fn open_or_create(dir: &Path, schema: tantivy::schema::Schema) -> Result<Index> {
120 Ok(Index::open_in_dir(dir).or_else(|_| Index::create_in_dir(dir, schema))?)
121}