1use crate::search::SearchDoc;
5use crate::search::schema::{SearchFields, build_schema, event_key};
6use anyhow::{Context, Result};
7use std::fs::File;
8use std::path::{Path, PathBuf};
9use std::time::{Duration, Instant};
10use tantivy::indexer::IndexWriterOptions;
11use tantivy::schema::Term;
12use tantivy::{Index, IndexWriter, TantivyDocument, doc};
13
14const HEAP_BYTES: usize = 50_000_000;
15const BATCH_DOCS: usize = 256;
16const BATCH_SECS: u64 = 60;
17
18pub struct PendingWriter {
19 writer: IndexWriter<TantivyDocument>,
20 fields: SearchFields,
21 pending: usize,
22 last_commit: Instant,
23 _lock: File,
24}
25
26impl PendingWriter {
27 pub fn open(root: &Path) -> Result<Self> {
28 let dir = crate::core::paths::descendant_dir_for_write(root, Path::new("search"))?;
29 reject_tree_symlinks(&dir)?;
30 let lock = lock_file(&dir)?;
31 let (schema, fields) = build_schema();
32 let index = open_or_create(&dir, schema)?;
33 let writer = index.writer_with_options(writer_options())?;
34 Ok(Self {
35 writer,
36 fields,
37 pending: 0,
38 last_commit: Instant::now(),
39 _lock: lock,
40 })
41 }
42
43 pub fn add(&mut self, doc: &SearchDoc) -> Result<()> {
44 if self.pending == 0 {
45 self.last_commit = Instant::now();
46 }
47 self.writer.delete_term(Term::from_field_text(
48 self.fields.event_key,
49 &event_key(&doc.session_id, doc.seq),
50 ));
51 self.writer.add_document(self.tantivy_doc(doc))?;
52 self.pending += 1;
53 if self.should_commit() {
54 self.commit()?;
55 }
56 Ok(())
57 }
58
59 pub fn commit(&mut self) -> Result<()> {
60 if self.pending == 0 {
61 return Ok(());
62 }
63 self.writer.commit()?;
64 self.pending = 0;
65 self.last_commit = Instant::now();
66 Ok(())
67 }
68
69 fn should_commit(&self) -> bool {
70 self.pending >= BATCH_DOCS || self.last_commit.elapsed() >= Duration::from_secs(BATCH_SECS)
71 }
72
73 fn tantivy_doc(&self, d: &SearchDoc) -> TantivyDocument {
74 let mut doc = doc!(
75 self.fields.session_id => d.session_id.clone(),
76 self.fields.seq => d.seq as i64,
77 self.fields.event_key => event_key(&d.session_id, d.seq),
78 self.fields.ts_ms => d.ts_ms as i64,
79 self.fields.agent => d.agent.clone(),
80 self.fields.kind => d.kind.clone(),
81 self.fields.text => d.text.clone(),
82 self.fields.tokens_total => d.tokens_total,
83 );
84 d.paths
85 .iter()
86 .for_each(|p| doc.add_text(self.fields.path, p));
87 d.skills
88 .iter()
89 .for_each(|s| doc.add_text(self.fields.skill, s));
90 doc
91 }
92}
93
94fn writer_options() -> IndexWriterOptions {
95 IndexWriterOptions::builder()
96 .memory_budget_per_thread(HEAP_BYTES)
97 .num_worker_threads(1)
98 .num_merge_threads(1)
99 .build()
100}
101
102pub fn delete_sessions(root: &Path, ids: &[String]) -> Result<()> {
103 if ids.is_empty() {
104 return Ok(());
105 }
106 let mut writer = PendingWriter::open(root)?;
107 for id in ids {
108 writer
109 .writer
110 .delete_term(Term::from_field_text(writer.fields.session_id, id));
111 }
112 writer.pending = ids.len();
113 writer.commit()
114}
115
116pub fn index_dir(root: &Path) -> PathBuf {
117 root.join("search")
118}
119
120fn lock_file(dir: &Path) -> Result<File> {
121 let file = crate::core::safe_fs::read_write(&dir.join(".writer.lock"))?;
122 file.lock().context("lock search writer")?;
123 Ok(file)
124}
125
126fn reject_tree_symlinks(root: &Path) -> Result<()> {
127 for entry in std::fs::read_dir(root)? {
128 let path = entry?.path();
129 let metadata = std::fs::symlink_metadata(&path)?;
130 anyhow::ensure!(
131 !metadata.file_type().is_symlink(),
132 "search index rejects symlink"
133 );
134 if metadata.is_dir() {
135 reject_tree_symlinks(&path)?;
136 }
137 }
138 Ok(())
139}
140
141fn open_or_create(dir: &Path, schema: tantivy::schema::Schema) -> Result<Index> {
142 Ok(Index::open_in_dir(dir).or_else(|_| Index::create_in_dir(dir, schema))?)
143}
144
145#[cfg(test)]
146#[path = "writer_tests.rs"]
147mod tests;