Skip to main content

kaizen/search/
writer.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Tantivy writer with batching and one-process file lock.
3
4use crate::search::SearchDoc;
5use crate::search::schema::{SearchFields, build_schema, event_key};
6use anyhow::{Context, Result};
7use std::fs::File;
8use std::path::{Path, PathBuf};
9use std::time::{Duration, Instant};
10use tantivy::indexer::IndexWriterOptions;
11use tantivy::schema::Term;
12use tantivy::{Index, IndexWriter, TantivyDocument, doc};
13
14const HEAP_BYTES: usize = 50_000_000;
15const BATCH_DOCS: usize = 256;
16const BATCH_SECS: u64 = 60;
17
18pub struct PendingWriter {
19    writer: IndexWriter<TantivyDocument>,
20    fields: SearchFields,
21    pending: usize,
22    last_commit: Instant,
23    _lock: File,
24}
25
26impl PendingWriter {
27    pub fn open(root: &Path) -> Result<Self> {
28        let dir = crate::core::paths::descendant_dir_for_write(root, Path::new("search"))?;
29        reject_tree_symlinks(&dir)?;
30        let lock = lock_file(&dir)?;
31        let (schema, fields) = build_schema();
32        let index = open_or_create(&dir, schema)?;
33        let writer = index.writer_with_options(writer_options())?;
34        Ok(Self {
35            writer,
36            fields,
37            pending: 0,
38            last_commit: Instant::now(),
39            _lock: lock,
40        })
41    }
42
43    pub fn add(&mut self, doc: &SearchDoc) -> Result<()> {
44        if self.pending == 0 {
45            self.last_commit = Instant::now();
46        }
47        self.writer.delete_term(Term::from_field_text(
48            self.fields.event_key,
49            &event_key(&doc.session_id, doc.seq),
50        ));
51        self.writer.add_document(self.tantivy_doc(doc))?;
52        self.pending += 1;
53        if self.should_commit() {
54            self.commit()?;
55        }
56        Ok(())
57    }
58
59    pub fn commit(&mut self) -> Result<()> {
60        if self.pending == 0 {
61            return Ok(());
62        }
63        self.writer.commit()?;
64        self.pending = 0;
65        self.last_commit = Instant::now();
66        Ok(())
67    }
68
69    fn should_commit(&self) -> bool {
70        self.pending >= BATCH_DOCS || self.last_commit.elapsed() >= Duration::from_secs(BATCH_SECS)
71    }
72
73    fn tantivy_doc(&self, d: &SearchDoc) -> TantivyDocument {
74        let mut doc = doc!(
75            self.fields.session_id => d.session_id.clone(),
76            self.fields.seq => d.seq as i64,
77            self.fields.event_key => event_key(&d.session_id, d.seq),
78            self.fields.ts_ms => d.ts_ms as i64,
79            self.fields.agent => d.agent.clone(),
80            self.fields.kind => d.kind.clone(),
81            self.fields.text => d.text.clone(),
82            self.fields.tokens_total => d.tokens_total,
83        );
84        d.paths
85            .iter()
86            .for_each(|p| doc.add_text(self.fields.path, p));
87        d.skills
88            .iter()
89            .for_each(|s| doc.add_text(self.fields.skill, s));
90        doc
91    }
92}
93
94fn writer_options() -> IndexWriterOptions {
95    IndexWriterOptions::builder()
96        .memory_budget_per_thread(HEAP_BYTES)
97        .num_worker_threads(1)
98        .num_merge_threads(1)
99        .build()
100}
101
102pub fn delete_sessions(root: &Path, ids: &[String]) -> Result<()> {
103    if ids.is_empty() {
104        return Ok(());
105    }
106    let mut writer = PendingWriter::open(root)?;
107    for id in ids {
108        writer
109            .writer
110            .delete_term(Term::from_field_text(writer.fields.session_id, id));
111    }
112    writer.pending = ids.len();
113    writer.commit()
114}
115
116pub fn index_dir(root: &Path) -> PathBuf {
117    root.join("search")
118}
119
120fn lock_file(dir: &Path) -> Result<File> {
121    let file = crate::core::safe_fs::read_write(&dir.join(".writer.lock"))?;
122    file.lock().context("lock search writer")?;
123    Ok(file)
124}
125
126fn reject_tree_symlinks(root: &Path) -> Result<()> {
127    for entry in std::fs::read_dir(root)? {
128        let path = entry?.path();
129        let metadata = std::fs::symlink_metadata(&path)?;
130        anyhow::ensure!(
131            !metadata.file_type().is_symlink(),
132            "search index rejects symlink"
133        );
134        if metadata.is_dir() {
135            reject_tree_symlinks(&path)?;
136        }
137    }
138    Ok(())
139}
140
141fn open_or_create(dir: &Path, schema: tantivy::schema::Schema) -> Result<Index> {
142    Ok(Index::open_in_dir(dir).or_else(|_| Index::create_in_dir(dir, schema))?)
143}
144
145#[cfg(test)]
146#[path = "writer_tests.rs"]
147mod tests;