kaizen/store/sqlite/
event_projector.rs1use super::events::*;
2use super::*;
3
4const PROJECTOR_HYDRATE_EVENTS: usize = 512;
5
6impl Store {
7 pub(super) fn sync_projector_session(
8 &self,
9 session_id: &str,
10 last_seq: Option<u64>,
11 ) -> Result<()> {
12 if self.projector.borrow().last_seq(session_id) == last_seq {
13 return Ok(());
14 }
15 self.hydrate_projector_session(session_id)
16 }
17
18 fn hydrate_projector_session(&self, session_id: &str) -> Result<()> {
19 self.projector.borrow_mut().reset_session(session_id);
20 for event in self.list_latest_events_for_session(session_id, PROJECTOR_HYDRATE_EVENTS)? {
21 self.projector.borrow_mut().apply(&event);
22 }
23 Ok(())
24 }
25
26 pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
27 if projector_legacy_mode() {
28 rebuild_tool_spans_for_session(&self.conn, session_id)?;
29 self.invalidate_span_tree_cache();
30 return Ok(());
31 }
32 let deltas = self
33 .projector
34 .borrow_mut()
35 .flush_session(session_id, now_ms);
36 if self.apply_projector_events(&deltas)? {
37 self.invalidate_span_tree_cache();
38 }
39 Ok(())
40 }
41
42 pub(super) fn replay_projector_session(&self, session_id: &str) -> Result<()> {
43 clear_session_spans(&self.conn, session_id)?;
44 self.projector.borrow_mut().reset_session(session_id);
45 let events = self.list_events_for_session(session_id)?;
46 let mut changed = false;
47 for event in &events {
48 let deltas = self.projector.borrow_mut().apply(event);
49 changed |= self.apply_projector_events(&deltas)?;
50 }
51 if self
52 .get_session(session_id)?
53 .is_some_and(|session| session.status == SessionStatus::Done)
54 {
55 let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
56 let deltas = self
57 .projector
58 .borrow_mut()
59 .flush_session(session_id, now_ms);
60 changed |= self.apply_projector_events(&deltas)?;
61 }
62 if changed {
63 self.invalidate_span_tree_cache();
64 }
65 Ok(())
66 }
67
68 pub(super) fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
69 let mut changed = false;
70 for delta in deltas {
71 match delta {
72 ProjectorEvent::SpanClosed(span, sample) => {
73 let persisted = upsert_tool_span_record(&self.conn, span)?;
74 crate::extensions::diffs::upsert_tool_span(self, &persisted)?;
75 tracing::debug!(
76 session_id = %sample.session_id,
77 span_id = %sample.span_id,
78 tool = ?sample.tool,
79 lead_time_ms = ?sample.lead_time_ms,
80 tokens_in = ?sample.tokens_in,
81 tokens_out = ?sample.tokens_out,
82 reasoning_tokens = ?sample.reasoning_tokens,
83 cost_usd_e6 = ?sample.cost_usd_e6,
84 paths = ?sample.paths,
85 "tool span closed"
86 );
87 changed = true;
88 }
89 ProjectorEvent::FileTouched { session, path } => {
90 self.conn.execute(
91 "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
92 params![session, path],
93 )?;
94 changed = true;
95 }
96 ProjectorEvent::SkillUsed { session, skill } => {
97 self.conn.execute(
98 "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
99 params![session, skill],
100 )?;
101 changed = true;
102 }
103 ProjectorEvent::RuleUsed { session, rule } => {
104 self.conn.execute(
105 "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
106 params![session, rule],
107 )?;
108 changed = true;
109 }
110 }
111 }
112 Ok(changed)
113 }
114}