Skip to main content

kaizen/store/sqlite/
event_projector.rs

1use super::events::*;
2use super::*;
3
4const PROJECTOR_HYDRATE_EVENTS: usize = 512;
5
6impl Store {
7    pub(super) fn sync_projector_session(
8        &self,
9        session_id: &str,
10        last_seq: Option<u64>,
11    ) -> Result<()> {
12        if self.projector.borrow().last_seq(session_id) == last_seq {
13            return Ok(());
14        }
15        self.hydrate_projector_session(session_id)
16    }
17
18    fn hydrate_projector_session(&self, session_id: &str) -> Result<()> {
19        self.projector.borrow_mut().reset_session(session_id);
20        for event in self.list_latest_events_for_session(session_id, PROJECTOR_HYDRATE_EVENTS)? {
21            self.projector.borrow_mut().apply(&event);
22        }
23        Ok(())
24    }
25
26    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
27        if projector_legacy_mode() {
28            rebuild_tool_spans_for_session(&self.conn, session_id)?;
29            self.invalidate_span_tree_cache();
30            return Ok(());
31        }
32        let deltas = self
33            .projector
34            .borrow_mut()
35            .flush_session(session_id, now_ms);
36        if self.apply_projector_events(&deltas)? {
37            self.invalidate_span_tree_cache();
38        }
39        Ok(())
40    }
41
42    pub(super) fn replay_projector_session(&self, session_id: &str) -> Result<()> {
43        clear_session_spans(&self.conn, session_id)?;
44        self.projector.borrow_mut().reset_session(session_id);
45        let events = self.list_events_for_session(session_id)?;
46        let mut changed = false;
47        for event in &events {
48            let deltas = self.projector.borrow_mut().apply(event);
49            changed |= self.apply_projector_events(&deltas)?;
50        }
51        if self
52            .get_session(session_id)?
53            .is_some_and(|session| session.status == SessionStatus::Done)
54        {
55            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
56            let deltas = self
57                .projector
58                .borrow_mut()
59                .flush_session(session_id, now_ms);
60            changed |= self.apply_projector_events(&deltas)?;
61        }
62        if changed {
63            self.invalidate_span_tree_cache();
64        }
65        Ok(())
66    }
67
68    pub(super) fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
69        let mut changed = false;
70        for delta in deltas {
71            match delta {
72                ProjectorEvent::SpanClosed(span, sample) => {
73                    let persisted = upsert_tool_span_record(&self.conn, span)?;
74                    crate::extensions::diffs::upsert_tool_span(self, &persisted)?;
75                    tracing::debug!(
76                        session_id = %sample.session_id,
77                        span_id = %sample.span_id,
78                        tool = ?sample.tool,
79                        lead_time_ms = ?sample.lead_time_ms,
80                        tokens_in = ?sample.tokens_in,
81                        tokens_out = ?sample.tokens_out,
82                        reasoning_tokens = ?sample.reasoning_tokens,
83                        cost_usd_e6 = ?sample.cost_usd_e6,
84                        paths = ?sample.paths,
85                        "tool span closed"
86                    );
87                    changed = true;
88                }
89                ProjectorEvent::FileTouched { session, path } => {
90                    self.conn.execute(
91                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
92                        params![session, path],
93                    )?;
94                    changed = true;
95                }
96                ProjectorEvent::SkillUsed { session, skill } => {
97                    self.conn.execute(
98                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
99                        params![session, skill],
100                    )?;
101                    changed = true;
102                }
103                ProjectorEvent::RuleUsed { session, rule } => {
104                    self.conn.execute(
105                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
106                        params![session, rule],
107                    )?;
108                    changed = true;
109                }
110            }
111        }
112        Ok(changed)
113    }
114}