kaizen/store/sqlite/
event_projector.rs1use super::events::*;
2use super::*;
3
4impl Store {
5 pub(super) fn sync_projector_session(
6 &self,
7 session_id: &str,
8 last_seq: Option<u64>,
9 ) -> Result<()> {
10 if self.projector.borrow().last_seq(session_id) == last_seq {
11 return Ok(());
12 }
13 self.replay_projector_session(session_id)
14 }
15
16 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
17 if projector_legacy_mode() {
18 rebuild_tool_spans_for_session(&self.conn, session_id)?;
19 self.invalidate_span_tree_cache();
20 return Ok(());
21 }
22 let deltas = self
23 .projector
24 .borrow_mut()
25 .flush_session(session_id, now_ms);
26 if self.apply_projector_events(&deltas)? {
27 self.invalidate_span_tree_cache();
28 }
29 Ok(())
30 }
31
32 pub(super) fn replay_projector_session(&self, session_id: &str) -> Result<()> {
33 clear_session_spans(&self.conn, session_id)?;
34 self.projector.borrow_mut().reset_session(session_id);
35 let events = self.list_events_for_session(session_id)?;
36 let mut changed = false;
37 for event in &events {
38 let deltas = self.projector.borrow_mut().apply(event);
39 changed |= self.apply_projector_events(&deltas)?;
40 }
41 if self
42 .get_session(session_id)?
43 .is_some_and(|session| session.status == SessionStatus::Done)
44 {
45 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
46 let deltas = self
47 .projector
48 .borrow_mut()
49 .flush_session(session_id, now_ms);
50 changed |= self.apply_projector_events(&deltas)?;
51 }
52 if changed {
53 self.invalidate_span_tree_cache();
54 }
55 Ok(())
56 }
57
58 pub(super) fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
59 let mut changed = false;
60 for delta in deltas {
61 match delta {
62 ProjectorEvent::SpanClosed(span, sample) => {
63 upsert_tool_span_record(&self.conn, span)?;
64 tracing::debug!(
65 session_id = %sample.session_id,
66 span_id = %sample.span_id,
67 tool = ?sample.tool,
68 lead_time_ms = ?sample.lead_time_ms,
69 tokens_in = ?sample.tokens_in,
70 tokens_out = ?sample.tokens_out,
71 reasoning_tokens = ?sample.reasoning_tokens,
72 cost_usd_e6 = ?sample.cost_usd_e6,
73 paths = ?sample.paths,
74 "tool span closed"
75 );
76 changed = true;
77 }
78 ProjectorEvent::FileTouched { session, path } => {
79 self.conn.execute(
80 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
81 params![session, path],
82 )?;
83 changed = true;
84 }
85 ProjectorEvent::SkillUsed { session, skill } => {
86 self.conn.execute(
87 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
88 params![session, skill],
89 )?;
90 changed = true;
91 }
92 ProjectorEvent::RuleUsed { session, rule } => {
93 self.conn.execute(
94 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
95 params![session, rule],
96 )?;
97 changed = true;
98 }
99 }
100 }
101 Ok(changed)
102 }
103}