Skip to main content

kaizen/store/sqlite/
event_projector.rs

1use super::events::*;
2use super::*;
3
4impl Store {
5    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
6        if projector_legacy_mode() {
7            rebuild_tool_spans_for_session(&self.conn, session_id)?;
8            self.invalidate_span_tree_cache();
9            return Ok(());
10        }
11        let deltas = self
12            .projector
13            .borrow_mut()
14            .flush_session(session_id, now_ms);
15        if self.apply_projector_events(&deltas)? {
16            self.invalidate_span_tree_cache();
17        }
18        Ok(())
19    }
20
21    pub(super) fn replay_projector_session(&self, session_id: &str) -> Result<()> {
22        clear_session_spans(&self.conn, session_id)?;
23        self.projector.borrow_mut().reset_session(session_id);
24        let events = self.list_events_for_session(session_id)?;
25        let mut changed = false;
26        for event in &events {
27            let deltas = self.projector.borrow_mut().apply(event);
28            changed |= self.apply_projector_events(&deltas)?;
29        }
30        if self
31            .get_session(session_id)?
32            .is_some_and(|session| session.status == SessionStatus::Done)
33        {
34            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
35            let deltas = self
36                .projector
37                .borrow_mut()
38                .flush_session(session_id, now_ms);
39            changed |= self.apply_projector_events(&deltas)?;
40        }
41        if changed {
42            self.invalidate_span_tree_cache();
43        }
44        Ok(())
45    }
46
47    pub(super) fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
48        let mut changed = false;
49        for delta in deltas {
50            match delta {
51                ProjectorEvent::SpanClosed(span, sample) => {
52                    upsert_tool_span_record(&self.conn, span)?;
53                    tracing::debug!(
54                        session_id = %sample.session_id,
55                        span_id = %sample.span_id,
56                        tool = ?sample.tool,
57                        lead_time_ms = ?sample.lead_time_ms,
58                        tokens_in = ?sample.tokens_in,
59                        tokens_out = ?sample.tokens_out,
60                        reasoning_tokens = ?sample.reasoning_tokens,
61                        cost_usd_e6 = ?sample.cost_usd_e6,
62                        paths = ?sample.paths,
63                        "tool span closed"
64                    );
65                    changed = true;
66                }
67                ProjectorEvent::SpanPatched(span) => {
68                    upsert_tool_span_record(&self.conn, span)?;
69                    changed = true;
70                }
71                ProjectorEvent::FileTouched { session, path } => {
72                    self.conn.execute(
73                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
74                        params![session, path],
75                    )?;
76                    changed = true;
77                }
78                ProjectorEvent::SkillUsed { session, skill } => {
79                    self.conn.execute(
80                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
81                        params![session, skill],
82                    )?;
83                    changed = true;
84                }
85                ProjectorEvent::RuleUsed { session, rule } => {
86                    self.conn.execute(
87                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
88                        params![session, rule],
89                    )?;
90                    changed = true;
91                }
92            }
93        }
94        Ok(changed)
95    }
96}