kaizen-cli 0.1.32

Distributable agent observability: real-time-tailable sessions, agile-style retros, and repo-level improvement (Cursor, Claude Code, Codex). SQLite, redact before any sync you enable.
Documentation
// SPDX-License-Identifier: AGPL-3.0-or-later
//! Tantivy writer with batching and one-process file lock.

use crate::search::SearchDoc;
use crate::search::schema::{SearchFields, build_schema, event_key};
use anyhow::{Context, Result};
use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tantivy::schema::Term;
use tantivy::{Index, IndexWriter, TantivyDocument, doc};

const HEAP_BYTES: usize = 50_000_000;
const BATCH_DOCS: usize = 1000;
const BATCH_SECS: u64 = 1;

pub struct PendingWriter {
    writer: IndexWriter<TantivyDocument>,
    fields: SearchFields,
    pending: usize,
    last_commit: Instant,
    _lock: File,
}

impl PendingWriter {
    pub fn open(root: &Path) -> Result<Self> {
        let dir = index_dir(root);
        std::fs::create_dir_all(&dir)?;
        let lock = lock_file(&dir)?;
        let (schema, fields) = build_schema();
        let index = open_or_create(&dir, schema)?;
        let writer = index.writer(HEAP_BYTES)?;
        Ok(Self {
            writer,
            fields,
            pending: 0,
            last_commit: Instant::now(),
            _lock: lock,
        })
    }

    pub fn add(&mut self, doc: &SearchDoc) -> Result<()> {
        self.writer.delete_term(Term::from_field_text(
            self.fields.event_key,
            &event_key(&doc.session_id, doc.seq),
        ));
        self.writer.add_document(self.tantivy_doc(doc))?;
        self.pending += 1;
        if self.should_commit() {
            self.commit()?;
        }
        Ok(())
    }

    pub fn commit(&mut self) -> Result<()> {
        if self.pending == 0 {
            return Ok(());
        }
        self.writer.commit()?;
        self.pending = 0;
        self.last_commit = Instant::now();
        Ok(())
    }

    fn should_commit(&self) -> bool {
        self.pending >= BATCH_DOCS || self.last_commit.elapsed() >= Duration::from_secs(BATCH_SECS)
    }

    fn tantivy_doc(&self, d: &SearchDoc) -> TantivyDocument {
        let mut doc = doc!(
            self.fields.session_id => d.session_id.clone(),
            self.fields.seq => d.seq as i64,
            self.fields.event_key => event_key(&d.session_id, d.seq),
            self.fields.ts_ms => d.ts_ms as i64,
            self.fields.agent => d.agent.clone(),
            self.fields.kind => d.kind.clone(),
            self.fields.text => d.text.clone(),
            self.fields.tokens_total => d.tokens_total,
        );
        d.paths
            .iter()
            .for_each(|p| doc.add_text(self.fields.path, p));
        d.skills
            .iter()
            .for_each(|s| doc.add_text(self.fields.skill, s));
        doc
    }
}

pub fn delete_sessions(root: &Path, ids: &[String]) -> Result<()> {
    if ids.is_empty() {
        return Ok(());
    }
    let mut writer = PendingWriter::open(root)?;
    for id in ids {
        writer
            .writer
            .delete_term(Term::from_field_text(writer.fields.session_id, id));
    }
    writer.pending = ids.len();
    writer.commit()
}

pub fn index_dir(root: &Path) -> PathBuf {
    root.join("search")
}

fn lock_file(dir: &Path) -> Result<File> {
    let file = OpenOptions::new()
        .create(true)
        .truncate(false)
        .read(true)
        .write(true)
        .open(dir.join(".writer.lock"))?;
    file.lock().context("lock search writer")?;
    Ok(file)
}

fn open_or_create(dir: &Path, schema: tantivy::schema::Schema) -> Result<Index> {
    Ok(Index::open_in_dir(dir).or_else(|_| Index::create_in_dir(dir, schema))?)
}