Skip to main content

kaizen/store/
projector.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Incremental projector for event-derived rows.
3
4use crate::core::event::{Event, EventKind};
5use crate::metrics::types::ToolSpanSample;
6use crate::store::event_index::{
7    paths_from_event_payload, rules_from_event_json, skills_from_event_json,
8};
9use crate::store::tool_span_index::{
10    SpanBuilder, ToolSpanRecord, find_open_same_tool, find_open_without_call, hook_kind, hook_tool,
11    match_span_id, pick_i64, pick_u32, span_start, synthetic_span_id,
12};
13use std::collections::{BTreeMap, HashMap, HashSet};
14
15pub const DEFAULT_ORPHAN_TTL_MS: u64 = 60 * 60 * 1_000;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct OpenSpan {
19    pub(crate) inner: SpanBuilder,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct ClosedSpan {
24    pub record: ToolSpanRecord,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum ProjectorEvent {
29    SpanClosed(ToolSpanRecord, ToolSpanSample),
30    SpanPatched(ToolSpanRecord),
31    FileTouched { session: String, path: String },
32    SkillUsed { session: String, skill: String },
33    RuleUsed { session: String, rule: String },
34}
35
36#[derive(Debug, Default)]
37pub struct Projector {
38    open_spans: HashMap<String, SpanBuilder>,
39    open_order: HashMap<String, Vec<String>>,
40    closed_spans: HashMap<String, BTreeMap<String, ToolSpanRecord>>,
41    file_touch: HashMap<String, HashSet<String>>,
42    skill_use: HashMap<String, HashSet<String>>,
43    rule_use: HashMap<String, HashSet<String>>,
44    last_seq: HashMap<String, u64>,
45}
46
47impl Projector {
48    pub fn apply(&mut self, evt: &Event) -> Vec<ProjectorEvent> {
49        let mut out = self.apply_derived(evt);
50        if !matches!(
51            evt.kind,
52            EventKind::ToolCall | EventKind::ToolResult | EventKind::Hook
53        ) {
54            self.last_seq.insert(evt.session_id.clone(), evt.seq);
55            return out;
56        }
57        match evt.kind {
58            EventKind::ToolCall => self.apply_tool_call(evt),
59            EventKind::ToolResult => out.extend(self.apply_tool_result(evt)),
60            EventKind::Hook => out.extend(self.apply_hook(evt)),
61            _ => {}
62        }
63        self.last_seq.insert(evt.session_id.clone(), evt.seq);
64        out
65    }
66
67    pub fn flush_session(&mut self, session_id: &str, _now_ms: u64) -> Vec<ProjectorEvent> {
68        let ids = self
69            .open_order
70            .remove(session_id)
71            .unwrap_or_default()
72            .into_iter()
73            .filter(|id| self.open_spans.contains_key(id))
74            .collect::<Vec<_>>();
75        let mut out = Vec::new();
76        for id in ids {
77            if let Some(span) = self.open_spans.remove(&id) {
78                out.extend(self.close_span(span));
79            }
80        }
81        out
82    }
83
84    pub fn flush_expired(&mut self, now_ms: u64, ttl_ms: u64) -> Vec<ProjectorEvent> {
85        let expired = self
86            .open_spans
87            .iter()
88            .filter_map(|(id, span)| {
89                let started = span_start(span)?;
90                (now_ms.saturating_sub(started) > ttl_ms).then(|| id.clone())
91            })
92            .collect::<Vec<_>>();
93        let mut out = Vec::new();
94        for id in expired {
95            let Some(span) = self.open_spans.remove(&id) else {
96                continue;
97            };
98            if let Some(order) = self.open_order.get_mut(&span.session_id) {
99                order.retain(|open_id| open_id != &id);
100            }
101            out.extend(self.close_span(span));
102        }
103        out
104    }
105
106    pub fn reset_session(&mut self, session_id: &str) {
107        if let Some(ids) = self.open_order.remove(session_id) {
108            for id in ids {
109                self.open_spans.remove(&id);
110            }
111        }
112        self.closed_spans.remove(session_id);
113        self.file_touch.remove(session_id);
114        self.skill_use.remove(session_id);
115        self.rule_use.remove(session_id);
116        self.last_seq.remove(session_id);
117    }
118
119    pub fn last_seq(&self, session_id: &str) -> Option<u64> {
120        self.last_seq.get(session_id).copied()
121    }
122
123    fn apply_derived(&mut self, evt: &Event) -> Vec<ProjectorEvent> {
124        let mut out = Vec::new();
125        let session = &evt.session_id;
126        for path in paths_from_event_payload(&evt.payload) {
127            if self
128                .file_touch
129                .entry(session.clone())
130                .or_default()
131                .insert(path.clone())
132            {
133                out.push(ProjectorEvent::FileTouched {
134                    session: session.clone(),
135                    path,
136                });
137            }
138        }
139        for skill in skills_from_event_json(&evt.payload) {
140            if self
141                .skill_use
142                .entry(session.clone())
143                .or_default()
144                .insert(skill.clone())
145            {
146                out.push(ProjectorEvent::SkillUsed {
147                    session: session.clone(),
148                    skill,
149                });
150            }
151        }
152        for rule in rules_from_event_json(&evt.payload) {
153            if self
154                .rule_use
155                .entry(session.clone())
156                .or_default()
157                .insert(rule.clone())
158            {
159                out.push(ProjectorEvent::RuleUsed {
160                    session: session.clone(),
161                    rule,
162                });
163            }
164        }
165        out
166    }
167
168    fn apply_tool_call(&mut self, event: &Event) {
169        let tool = event.tool.clone();
170        let existing = tool
171            .as_deref()
172            .and_then(|name| find_open_without_call(&self.open_spans, self.order(event), name));
173        let span_id = event
174            .tool_call_id
175            .clone()
176            .unwrap_or_else(|| existing.unwrap_or_else(|| synthetic_span_id(event)));
177        let span = self
178            .open_spans
179            .entry(span_id.clone())
180            .or_insert_with(|| SpanBuilder {
181                span_id: span_id.clone(),
182                session_id: event.session_id.clone(),
183                tool: tool.clone(),
184                tool_call_id: event.tool_call_id.clone(),
185                ..Default::default()
186            });
187        span.tool = tool;
188        span.tool_call_id = event.tool_call_id.clone();
189        span.call_start_ms = Some(event.ts_ms);
190        span.call_start_exact = event.ts_exact;
191        span.tokens_in = pick_u32(span.tokens_in, event.tokens_in);
192        span.tokens_out = pick_u32(span.tokens_out, event.tokens_out);
193        span.reasoning_tokens = pick_u32(span.reasoning_tokens, event.reasoning_tokens);
194        span.cost_usd_e6 = pick_i64(span.cost_usd_e6, event.cost_usd_e6);
195        span.paths.extend(paths_from_event_payload(&event.payload));
196        span.has_call = true;
197        self.push_open_order(&event.session_id, &span_id);
198    }
199
200    fn apply_tool_result(&mut self, event: &Event) -> Vec<ProjectorEvent> {
201        let Some(span_id) = match_span_id(event, &self.open_spans, self.order(event)) else {
202            return Vec::new();
203        };
204        let Some(span) = self.open_spans.get_mut(&span_id) else {
205            return Vec::new();
206        };
207        span.result_end_ms = Some(event.ts_ms);
208        span.result_end_exact = event.ts_exact;
209        span.tokens_in = pick_u32(span.tokens_in, event.tokens_in);
210        span.tokens_out = pick_u32(span.tokens_out, event.tokens_out);
211        span.reasoning_tokens = pick_u32(span.reasoning_tokens, event.reasoning_tokens);
212        span.cost_usd_e6 = pick_i64(span.cost_usd_e6, event.cost_usd_e6);
213        span.paths.extend(paths_from_event_payload(&event.payload));
214        span.has_end = true;
215        self.remove_open(&event.session_id, &span_id)
216            .map(|span| self.close_span(span))
217            .unwrap_or_default()
218    }
219
220    fn apply_hook(&mut self, event: &Event) -> Vec<ProjectorEvent> {
221        let Some(kind) = hook_kind(&event.payload) else {
222            return Vec::new();
223        };
224        let tool = hook_tool(&event.payload);
225        let span_id = event
226            .tool_call_id
227            .clone()
228            .or_else(|| {
229                tool.as_deref()
230                    .and_then(|name| find_open_same_tool(&self.open_spans, self.order(event), name))
231            })
232            .unwrap_or_else(|| synthetic_span_id(event));
233        let span = self
234            .open_spans
235            .entry(span_id.clone())
236            .or_insert_with(|| SpanBuilder {
237                span_id: span_id.clone(),
238                session_id: event.session_id.clone(),
239                tool: tool.clone(),
240                tool_call_id: event.tool_call_id.clone(),
241                ..Default::default()
242            });
243        span.tool = span.tool.clone().or(tool);
244        span.tool_call_id = span.tool_call_id.clone().or(event.tool_call_id.clone());
245        span.paths.extend(paths_from_event_payload(&event.payload));
246        match kind {
247            "pre" => span.hook_start_ms = Some(event.ts_ms),
248            "post" => {
249                span.hook_end_ms = Some(event.ts_ms);
250                span.has_end = true;
251            }
252            _ => {}
253        }
254        self.push_open_order(&event.session_id, &span_id);
255        if kind == "post" {
256            return self
257                .remove_open(&event.session_id, &span_id)
258                .map(|span| self.close_span(span))
259                .unwrap_or_default();
260        }
261        Vec::new()
262    }
263
264    fn close_span(&mut self, mut span: SpanBuilder) -> Vec<ProjectorEvent> {
265        let session_id = span.session_id.clone();
266        span.parent_span_id = None;
267        span.depth = 0;
268        span.subtree_cost_usd_e6 = span.cost_usd_e6;
269        span.subtree_token_count = span.tokens_in.map(|i| i + span.tokens_out.unwrap_or(0));
270        let mut record = ToolSpanRecord::from_builder(&span);
271        let before = self
272            .closed_spans
273            .get(&session_id)
274            .cloned()
275            .unwrap_or_default();
276        self.closed_spans
277            .entry(session_id.clone())
278            .or_default()
279            .insert(record.span_id.clone(), record.clone());
280        self.recompute_session_tree(&session_id);
281        record = self
282            .closed_spans
283            .get(&session_id)
284            .and_then(|spans| spans.get(&record.span_id))
285            .cloned()
286            .unwrap_or(record);
287        let sample = ToolSpanSample::from(&record);
288        let mut out = vec![ProjectorEvent::SpanClosed(record.clone(), sample)];
289        if let Some(after) = self.closed_spans.get(&session_id) {
290            for (id, span) in after {
291                if id == &record.span_id {
292                    continue;
293                }
294                if before.get(id) != Some(span) {
295                    out.push(ProjectorEvent::SpanPatched(span.clone()));
296                }
297            }
298        }
299        out
300    }
301
302    fn recompute_session_tree(&mut self, session_id: &str) {
303        let Some(map) = self.closed_spans.get_mut(session_id) else {
304            return;
305        };
306        let mut spans = map
307            .values()
308            .map(record_to_builder)
309            .collect::<Vec<SpanBuilder>>();
310        crate::store::tool_span_index::assign_parents(&mut spans);
311        crate::store::tool_span_index::compute_subtree_costs(&mut spans);
312        map.clear();
313        for span in spans {
314            let record = ToolSpanRecord::from_builder(&span);
315            map.insert(record.span_id.clone(), record);
316        }
317    }
318
319    fn order(&self, event: &Event) -> &[String] {
320        self.open_order
321            .get(&event.session_id)
322            .map(Vec::as_slice)
323            .unwrap_or(&[])
324    }
325
326    fn push_open_order(&mut self, session_id: &str, span_id: &str) {
327        let order = self.open_order.entry(session_id.to_string()).or_default();
328        if !order.iter().any(|id| id == span_id) {
329            order.push(span_id.to_string());
330        }
331    }
332
333    fn remove_open(&mut self, session_id: &str, span_id: &str) -> Option<SpanBuilder> {
334        if let Some(order) = self.open_order.get_mut(session_id) {
335            order.retain(|id| id != span_id);
336        }
337        self.open_spans.remove(span_id)
338    }
339}
340
341fn record_to_builder(record: &ToolSpanRecord) -> SpanBuilder {
342    let paths = record.paths.iter().cloned().collect();
343    SpanBuilder {
344        span_id: record.span_id.clone(),
345        session_id: record.session_id.clone(),
346        tool: record.tool.clone(),
347        tool_call_id: record.tool_call_id.clone(),
348        hook_start_ms: record.started_at_ms,
349        hook_end_ms: None,
350        call_start_ms: record.started_at_ms,
351        result_end_ms: record.ended_at_ms,
352        call_start_exact: record.lead_time_ms.is_some(),
353        result_end_exact: record.lead_time_ms.is_some(),
354        tokens_in: record.tokens_in,
355        tokens_out: record.tokens_out,
356        reasoning_tokens: record.reasoning_tokens,
357        cost_usd_e6: record.cost_usd_e6,
358        paths,
359        has_call: record.started_at_ms.is_some(),
360        has_end: record.ended_at_ms.is_some(),
361        parent_span_id: None,
362        depth: 0,
363        subtree_cost_usd_e6: None,
364        subtree_token_count: None,
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use crate::core::event::EventSource;
372    use serde_json::json;
373
374    fn event(seq: u64, ts_ms: u64, kind: EventKind, tool: Option<&str>) -> Event {
375        Event {
376            session_id: "s".into(),
377            seq,
378            ts_ms,
379            ts_exact: true,
380            kind,
381            source: EventSource::Tail,
382            tool: tool.map(str::to_string),
383            tool_call_id: None,
384            tokens_in: None,
385            tokens_out: None,
386            reasoning_tokens: None,
387            cost_usd_e6: None,
388            stop_reason: None,
389            latency_ms: None,
390            ttft_ms: None,
391            retry_count: None,
392            context_used_tokens: None,
393            context_max_tokens: None,
394            cache_creation_tokens: None,
395            cache_read_tokens: None,
396            system_prompt_tokens: None,
397            payload: json!({}),
398        }
399    }
400
401    fn span_closed(events: Vec<ProjectorEvent>) -> Vec<ToolSpanRecord> {
402        events
403            .into_iter()
404            .filter_map(|event| match event {
405                ProjectorEvent::SpanClosed(span, _) => Some(span),
406                _ => None,
407            })
408            .collect()
409    }
410
411    #[test]
412    fn tool_call_result_without_id_closes_span() {
413        let mut p = Projector::default();
414        p.apply(&event(0, 10, EventKind::ToolCall, Some("bash")));
415        let spans = span_closed(p.apply(&event(1, 15, EventKind::ToolResult, Some("bash"))));
416        assert_eq!(spans.len(), 1);
417        assert_eq!(spans[0].status, "done");
418        assert_eq!(spans[0].lead_time_ms, Some(5));
419    }
420
421    #[test]
422    fn hook_pre_post_matching_closes_span() {
423        let mut pre = event(0, 10, EventKind::Hook, None);
424        pre.payload = json!({"event": "PreToolUse", "tool_name": "Read"});
425        let mut post = event(1, 17, EventKind::Hook, None);
426        post.payload = json!({"event": "PostToolUse", "tool_name": "Read"});
427        let mut p = Projector::default();
428        p.apply(&pre);
429        let spans = span_closed(p.apply(&post));
430        assert_eq!(spans[0].tool.as_deref(), Some("Read"));
431        assert_eq!(spans[0].lead_time_ms, Some(7));
432    }
433
434    #[test]
435    fn flush_session_marks_open_span_orphaned() {
436        let mut p = Projector::default();
437        p.apply(&event(0, 10, EventKind::ToolCall, Some("bash")));
438        let spans = span_closed(p.flush_session("s", 100));
439        assert_eq!(spans[0].status, "orphaned");
440        assert_eq!(spans[0].ended_at_ms, None);
441    }
442
443    #[test]
444    fn flush_expired_marks_old_open_span_orphaned() {
445        let mut p = Projector::default();
446        p.apply(&event(0, 10, EventKind::ToolCall, Some("bash")));
447        let spans = span_closed(p.flush_expired(20, 5));
448        assert_eq!(spans[0].status, "orphaned");
449    }
450
451    #[test]
452    fn derived_rows_dedup_per_session() {
453        let mut e = event(0, 10, EventKind::Message, None);
454        e.payload = json!({
455            "path": "src/lib.rs",
456            "text": ".cursor/skills/tdd/SKILL.md .cursor/rules/style.mdc"
457        });
458        let mut p = Projector::default();
459        assert_eq!(p.apply(&e).len(), 3);
460        assert!(p.apply(&e).is_empty());
461    }
462
463    #[test]
464    fn parent_close_patches_existing_child() {
465        let mut p = Projector::default();
466        p.apply(&event(0, 0, EventKind::ToolCall, Some("parent")));
467        p.apply(&event(1, 10, EventKind::ToolCall, Some("child")));
468        p.apply(&event(2, 20, EventKind::ToolResult, Some("child")));
469        let out = p.apply(&event(3, 30, EventKind::ToolResult, Some("parent")));
470        assert!(
471            out.iter()
472                .any(|event| matches!(event, ProjectorEvent::SpanPatched(span) if span.depth == 1))
473        );
474    }
475
476    #[test]
477    fn reset_session_clears_accumulators() {
478        let mut p = Projector::default();
479        let mut e = event(0, 10, EventKind::Message, None);
480        e.payload = json!({"path": "src/lib.rs"});
481        assert_eq!(p.apply(&e).len(), 1);
482        p.reset_session("s");
483        assert_eq!(p.apply(&e).len(), 1);
484    }
485}