Skip to main content

kaizen/search/
reader.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Tantivy reader and result shaping.
3
4use crate::core::event::Event;
5use crate::search::extract::{redacted_event_text, snippet, tokens_total};
6use crate::search::schema::{SearchFields, build_schema};
7use crate::search::writer::index_dir;
8use anyhow::Result;
9use serde_json::Value as JsonValue;
10use std::path::Path;
11use tantivy::collector::TopDocs;
12use tantivy::query::QueryParser;
13use tantivy::schema::{TantivyDocument, Value};
14use tantivy::{Index, TantivyError};
15
16#[derive(Debug, Clone)]
17pub struct SearchQuery {
18    pub query: String,
19    pub since_ms: Option<u64>,
20    pub agent: Option<String>,
21    pub kind: Option<String>,
22    pub limit: usize,
23}
24
25#[derive(Debug, Clone, serde::Serialize)]
26pub struct SearchHit {
27    pub session_id: String,
28    pub seq: u64,
29    pub ts_ms: u64,
30    pub agent: String,
31    pub kind: String,
32    pub score: f32,
33    pub snippet: String,
34    pub paths: Vec<String>,
35    pub skills: Vec<String>,
36    pub tokens_total: i64,
37}
38
39pub fn search<F>(
40    root: &Path,
41    opts: &SearchQuery,
42    workspace: &Path,
43    salt: &[u8; 32],
44    load: F,
45) -> Result<Vec<SearchHit>>
46where
47    F: Fn(&str, u64) -> Result<Option<Event>>,
48{
49    let (_, fields) = build_schema();
50    let index = Index::open_in_dir(index_dir(root))?;
51    let parser = QueryParser::for_index(&index, vec![fields.text]);
52    let query = parser.parse_query(&query_text(opts))?;
53    let reader = index.reader()?;
54    let searcher = reader.searcher();
55    let docs = searcher.search(&query, &TopDocs::with_limit(opts.limit).order_by_score())?;
56    let ctx = HitCtx {
57        fields,
58        opts,
59        workspace,
60        salt,
61    };
62    docs.into_iter()
63        .filter_map(|(score, addr)| doc_hit(&searcher, addr, score, &ctx, &load))
64        .collect()
65}
66
67pub fn is_missing_index(err: &anyhow::Error) -> bool {
68    err.downcast_ref::<TantivyError>().is_some()
69}
70
71fn query_text(opts: &SearchQuery) -> String {
72    [
73        opts.query.clone(),
74        opt("agent", &opts.agent),
75        opt("kind", &opts.kind),
76    ]
77    .into_iter()
78    .chain(opts.since_ms.map(|ms| format!("ts_ms:>={ms}")))
79    .filter(|s| !s.is_empty())
80    .collect::<Vec<_>>()
81    .join(" AND ")
82}
83
84fn opt(field: &str, value: &Option<String>) -> String {
85    value
86        .as_ref()
87        .map(|v| format!("{field}:{v}"))
88        .unwrap_or_default()
89}
90
91struct HitCtx<'a> {
92    fields: SearchFields,
93    opts: &'a SearchQuery,
94    workspace: &'a Path,
95    salt: &'a [u8; 32],
96}
97
98fn doc_hit<F>(
99    searcher: &tantivy::Searcher,
100    addr: tantivy::DocAddress,
101    score: f32,
102    ctx: &HitCtx<'_>,
103    load: &F,
104) -> Option<Result<SearchHit>>
105where
106    F: Fn(&str, u64) -> Result<Option<Event>>,
107{
108    Some((|| {
109        let doc = searcher.doc::<TantivyDocument>(addr)?;
110        let session_id = str_field(&doc, ctx.fields.session_id)?;
111        let seq = i64_field(&doc, ctx.fields.seq)? as u64;
112        let event = load(&session_id, seq)?.unwrap_or_else(|| empty_event(&session_id, seq));
113        Ok(SearchHit {
114            session_id,
115            seq,
116            ts_ms: i64_field(&doc, ctx.fields.ts_ms)? as u64,
117            agent: str_field(&doc, ctx.fields.agent)?,
118            kind: str_field(&doc, ctx.fields.kind)?,
119            score,
120            snippet: snippet(
121                &redacted_event_text(&event, ctx.workspace, ctx.salt),
122                &ctx.opts.query,
123            ),
124            paths: crate::store::event_index::paths_from_event_payload(&event.payload),
125            skills: crate::store::event_index::skills_from_event_json(&event.payload),
126            tokens_total: tokens_total(&event),
127        })
128    })())
129}
130
131fn str_field(doc: &TantivyDocument, field: tantivy::schema::Field) -> Result<String> {
132    Ok(doc
133        .get_first(field)
134        .and_then(|v| v.as_str())
135        .unwrap_or_default()
136        .to_string())
137}
138
139fn i64_field(doc: &TantivyDocument, field: tantivy::schema::Field) -> Result<i64> {
140    Ok(doc
141        .get_first(field)
142        .and_then(|v| v.as_i64())
143        .unwrap_or_default())
144}
145
146fn empty_event(session_id: &str, seq: u64) -> Event {
147    Event {
148        session_id: session_id.to_string(),
149        seq,
150        ts_ms: 0,
151        ts_exact: false,
152        kind: crate::core::event::EventKind::Message,
153        source: crate::core::event::EventSource::Tail,
154        tool: None,
155        tool_call_id: None,
156        tokens_in: None,
157        tokens_out: None,
158        reasoning_tokens: None,
159        cost_usd_e6: None,
160        stop_reason: None,
161        latency_ms: None,
162        ttft_ms: None,
163        retry_count: None,
164        context_used_tokens: None,
165        context_max_tokens: None,
166        cache_creation_tokens: None,
167        cache_read_tokens: None,
168        system_prompt_tokens: None,
169        payload: JsonValue::Null,
170    }
171}