kaizen/store/sqlite/
event_projector.rs1use super::events::*;
2use super::*;
3
4impl Store {
5 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
6 if projector_legacy_mode() {
7 rebuild_tool_spans_for_session(&self.conn, session_id)?;
8 self.invalidate_span_tree_cache();
9 return Ok(());
10 }
11 let deltas = self
12 .projector
13 .borrow_mut()
14 .flush_session(session_id, now_ms);
15 if self.apply_projector_events(&deltas)? {
16 self.invalidate_span_tree_cache();
17 }
18 Ok(())
19 }
20
21 pub(super) fn replay_projector_session(&self, session_id: &str) -> Result<()> {
22 clear_session_spans(&self.conn, session_id)?;
23 self.projector.borrow_mut().reset_session(session_id);
24 let events = self.list_events_for_session(session_id)?;
25 let mut changed = false;
26 for event in &events {
27 let deltas = self.projector.borrow_mut().apply(event);
28 changed |= self.apply_projector_events(&deltas)?;
29 }
30 if self
31 .get_session(session_id)?
32 .is_some_and(|session| session.status == SessionStatus::Done)
33 {
34 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
35 let deltas = self
36 .projector
37 .borrow_mut()
38 .flush_session(session_id, now_ms);
39 changed |= self.apply_projector_events(&deltas)?;
40 }
41 if changed {
42 self.invalidate_span_tree_cache();
43 }
44 Ok(())
45 }
46
47 pub(super) fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
48 let mut changed = false;
49 for delta in deltas {
50 match delta {
51 ProjectorEvent::SpanClosed(span, sample) => {
52 upsert_tool_span_record(&self.conn, span)?;
53 tracing::debug!(
54 session_id = %sample.session_id,
55 span_id = %sample.span_id,
56 tool = ?sample.tool,
57 lead_time_ms = ?sample.lead_time_ms,
58 tokens_in = ?sample.tokens_in,
59 tokens_out = ?sample.tokens_out,
60 reasoning_tokens = ?sample.reasoning_tokens,
61 cost_usd_e6 = ?sample.cost_usd_e6,
62 paths = ?sample.paths,
63 "tool span closed"
64 );
65 changed = true;
66 }
67 ProjectorEvent::SpanPatched(span) => {
68 upsert_tool_span_record(&self.conn, span)?;
69 changed = true;
70 }
71 ProjectorEvent::FileTouched { session, path } => {
72 self.conn.execute(
73 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
74 params![session, path],
75 )?;
76 changed = true;
77 }
78 ProjectorEvent::SkillUsed { session, skill } => {
79 self.conn.execute(
80 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
81 params![session, skill],
82 )?;
83 changed = true;
84 }
85 ProjectorEvent::RuleUsed { session, rule } => {
86 self.conn.execute(
87 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
88 params![session, rule],
89 )?;
90 changed = true;
91 }
92 }
93 }
94 Ok(changed)
95 }
96}