Skip to main content

kaizen/store/
projector.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Incremental projector for event-derived rows.
3
4use crate::core::event::Event;
5use crate::metrics::types::ToolSpanSample;
6use crate::store::tool_span_index::{SpanBuilder, ToolSpanRecord};
7use std::collections::{HashMap, HashSet};
8
9mod close;
10mod derived;
11mod open;
12#[cfg(test)]
13mod tests;
14
15pub const DEFAULT_ORPHAN_TTL_MS: u64 = 60 * 60 * 1_000;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct OpenSpan {
19    pub(crate) inner: SpanBuilder,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct ClosedSpan {
24    pub record: ToolSpanRecord,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum ProjectorEvent {
29    SpanClosed(Box<ToolSpanRecord>, ToolSpanSample),
30    FileTouched { session: String, path: String },
31    SkillUsed { session: String, skill: String },
32    RuleUsed { session: String, rule: String },
33}
34
35#[derive(Debug, Default)]
36pub struct Projector {
37    open_spans: HashMap<String, HashMap<String, SpanBuilder>>,
38    open_order: HashMap<String, Vec<String>>,
39    file_touch: HashMap<String, HashSet<String>>,
40    skill_use: HashMap<String, HashSet<String>>,
41    rule_use: HashMap<String, HashSet<String>>,
42    last_seq: HashMap<String, u64>,
43}
44
45impl Projector {
46    pub fn apply(&mut self, evt: &Event) -> Vec<ProjectorEvent> {
47        let mut out = self.apply_derived(evt);
48        self.apply_span_event(evt, &mut out);
49        self.last_seq.insert(evt.session_id.clone(), evt.seq);
50        out
51    }
52
53    pub fn flush_session(&mut self, session_id: &str, _now_ms: u64) -> Vec<ProjectorEvent> {
54        self.take_session_spans(session_id)
55            .into_iter()
56            .flat_map(|span| self.close_span(span))
57            .collect()
58    }
59
60    pub fn flush_expired(&mut self, now_ms: u64, ttl_ms: u64) -> Vec<ProjectorEvent> {
61        let ids = self.expired_span_ids(now_ms, ttl_ms);
62        self.close_ids(ids)
63    }
64
65    pub fn reset_session(&mut self, session_id: &str) {
66        self.clear_open(session_id);
67        self.file_touch.remove(session_id);
68        self.skill_use.remove(session_id);
69        self.rule_use.remove(session_id);
70        self.last_seq.remove(session_id);
71    }
72
73    pub fn last_seq(&self, session_id: &str) -> Option<u64> {
74        self.last_seq.get(session_id).copied()
75    }
76}