Skip to main content

kaizen/store/sqlite/
event_projector.rs

1use super::events::*;
2use super::*;
3
4impl Store {
5    pub(super) fn sync_projector_session(
6        &self,
7        session_id: &str,
8        last_seq: Option<u64>,
9    ) -> Result<()> {
10        if self.projector.borrow().last_seq(session_id) == last_seq {
11            return Ok(());
12        }
13        self.replay_projector_session(session_id)
14    }
15
16    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
17        if projector_legacy_mode() {
18            rebuild_tool_spans_for_session(&self.conn, session_id)?;
19            self.invalidate_span_tree_cache();
20            return Ok(());
21        }
22        let deltas = self
23            .projector
24            .borrow_mut()
25            .flush_session(session_id, now_ms);
26        if self.apply_projector_events(&deltas)? {
27            self.invalidate_span_tree_cache();
28        }
29        Ok(())
30    }
31
32    pub(super) fn replay_projector_session(&self, session_id: &str) -> Result<()> {
33        clear_session_spans(&self.conn, session_id)?;
34        self.projector.borrow_mut().reset_session(session_id);
35        let events = self.list_events_for_session(session_id)?;
36        let mut changed = false;
37        for event in &events {
38            let deltas = self.projector.borrow_mut().apply(event);
39            changed |= self.apply_projector_events(&deltas)?;
40        }
41        if self
42            .get_session(session_id)?
43            .is_some_and(|session| session.status == SessionStatus::Done)
44        {
45            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
46            let deltas = self
47                .projector
48                .borrow_mut()
49                .flush_session(session_id, now_ms);
50            changed |= self.apply_projector_events(&deltas)?;
51        }
52        if changed {
53            self.invalidate_span_tree_cache();
54        }
55        Ok(())
56    }
57
58    pub(super) fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
59        let mut changed = false;
60        for delta in deltas {
61            match delta {
62                ProjectorEvent::SpanClosed(span, sample) => {
63                    upsert_tool_span_record(&self.conn, span)?;
64                    tracing::debug!(
65                        session_id = %sample.session_id,
66                        span_id = %sample.span_id,
67                        tool = ?sample.tool,
68                        lead_time_ms = ?sample.lead_time_ms,
69                        tokens_in = ?sample.tokens_in,
70                        tokens_out = ?sample.tokens_out,
71                        reasoning_tokens = ?sample.reasoning_tokens,
72                        cost_usd_e6 = ?sample.cost_usd_e6,
73                        paths = ?sample.paths,
74                        "tool span closed"
75                    );
76                    changed = true;
77                }
78                ProjectorEvent::FileTouched { session, path } => {
79                    self.conn.execute(
80                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
81                        params![session, path],
82                    )?;
83                    changed = true;
84                }
85                ProjectorEvent::SkillUsed { session, skill } => {
86                    self.conn.execute(
87                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
88                        params![session, skill],
89                    )?;
90                    changed = true;
91                }
92                ProjectorEvent::RuleUsed { session, rule } => {
93                    self.conn.execute(
94                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
95                        params![session, rule],
96                    )?;
97                    changed = true;
98                }
99            }
100        }
101        Ok(changed)
102    }
103}