Skip to main content

kaizen/search/
writer.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Tantivy writer with batching and one-process file lock.
3
4use crate::search::SearchDoc;
5use crate::search::schema::{SearchFields, build_schema, event_key};
6use anyhow::{Context, Result};
7use std::fs::{File, OpenOptions};
8use std::path::{Path, PathBuf};
9use std::time::{Duration, Instant};
10use tantivy::schema::Term;
11use tantivy::{Index, IndexWriter, TantivyDocument, doc};
12
13const HEAP_BYTES: usize = 50_000_000;
14const BATCH_DOCS: usize = 1000;
15const BATCH_SECS: u64 = 1;
16
17pub struct PendingWriter {
18    writer: IndexWriter<TantivyDocument>,
19    fields: SearchFields,
20    pending: usize,
21    last_commit: Instant,
22    _lock: File,
23}
24
25impl PendingWriter {
26    pub fn open(root: &Path) -> Result<Self> {
27        let dir = index_dir(root);
28        std::fs::create_dir_all(&dir)?;
29        let lock = lock_file(&dir)?;
30        let (schema, fields) = build_schema();
31        let index = open_or_create(&dir, schema)?;
32        let writer = index.writer(HEAP_BYTES)?;
33        Ok(Self {
34            writer,
35            fields,
36            pending: 0,
37            last_commit: Instant::now(),
38            _lock: lock,
39        })
40    }
41
42    pub fn add(&mut self, doc: &SearchDoc) -> Result<()> {
43        self.writer.delete_term(Term::from_field_text(
44            self.fields.event_key,
45            &event_key(&doc.session_id, doc.seq),
46        ));
47        self.writer.add_document(self.tantivy_doc(doc))?;
48        self.pending += 1;
49        if self.should_commit() {
50            self.commit()?;
51        }
52        Ok(())
53    }
54
55    pub fn commit(&mut self) -> Result<()> {
56        if self.pending == 0 {
57            return Ok(());
58        }
59        self.writer.commit()?;
60        self.pending = 0;
61        self.last_commit = Instant::now();
62        Ok(())
63    }
64
65    fn should_commit(&self) -> bool {
66        self.pending >= BATCH_DOCS || self.last_commit.elapsed() >= Duration::from_secs(BATCH_SECS)
67    }
68
69    fn tantivy_doc(&self, d: &SearchDoc) -> TantivyDocument {
70        let mut doc = doc!(
71            self.fields.session_id => d.session_id.clone(),
72            self.fields.seq => d.seq as i64,
73            self.fields.event_key => event_key(&d.session_id, d.seq),
74            self.fields.ts_ms => d.ts_ms as i64,
75            self.fields.agent => d.agent.clone(),
76            self.fields.kind => d.kind.clone(),
77            self.fields.text => d.text.clone(),
78            self.fields.tokens_total => d.tokens_total,
79        );
80        d.paths
81            .iter()
82            .for_each(|p| doc.add_text(self.fields.path, p));
83        d.skills
84            .iter()
85            .for_each(|s| doc.add_text(self.fields.skill, s));
86        doc
87    }
88}
89
90pub fn delete_sessions(root: &Path, ids: &[String]) -> Result<()> {
91    if ids.is_empty() {
92        return Ok(());
93    }
94    let mut writer = PendingWriter::open(root)?;
95    for id in ids {
96        writer
97            .writer
98            .delete_term(Term::from_field_text(writer.fields.session_id, id));
99    }
100    writer.pending = ids.len();
101    writer.commit()
102}
103
104pub fn index_dir(root: &Path) -> PathBuf {
105    root.join("search")
106}
107
108fn lock_file(dir: &Path) -> Result<File> {
109    let file = OpenOptions::new()
110        .create(true)
111        .truncate(false)
112        .read(true)
113        .write(true)
114        .open(dir.join(".writer.lock"))?;
115    file.lock().context("lock search writer")?;
116    Ok(file)
117}
118
119fn open_or_create(dir: &Path, schema: tantivy::schema::Schema) -> Result<Index> {
120    Ok(Index::open_in_dir(dir).or_else(|_| Index::create_in_dir(dir, schema))?)
121}