Skip to main content

kaizen/store/sqlite/
event_write.rs

1use super::events::*;
2use super::rows::*;
3use super::*;
4
5impl Store {
6    /// Next `seq` for a new event in this session (0 when there are no events yet).
7    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
8        let n: i64 = self.conn.query_row(
9            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
10            [session_id],
11            |r| r.get(0),
12        )?;
13        Ok(n as u64)
14    }
15
16    pub fn append_event(&self, e: &Event) -> Result<()> {
17        self.append_event_with_sync(e, None)
18    }
19
20    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
21    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
22        let last_before = if projector_legacy_mode() {
23            None
24        } else {
25            self.last_event_seq_for_session(&e.session_id)?
26        };
27        let payload = serde_json::to_string(&e.payload)?;
28        self.conn.execute(
29            "INSERT INTO events (
30                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
31                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
32                stop_reason, latency_ms, ttft_ms, retry_count,
33                context_used_tokens, context_max_tokens,
34                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
35             ) VALUES (
36                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
37                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
38             )
39             ON CONFLICT(session_id, seq) DO UPDATE SET
40                ts_ms = excluded.ts_ms,
41                ts_exact = excluded.ts_exact,
42                kind = excluded.kind,
43                source = excluded.source,
44                tool = excluded.tool,
45                tool_call_id = excluded.tool_call_id,
46                tokens_in = excluded.tokens_in,
47                tokens_out = excluded.tokens_out,
48                reasoning_tokens = excluded.reasoning_tokens,
49                cost_usd_e6 = excluded.cost_usd_e6,
50                payload = excluded.payload,
51                stop_reason = excluded.stop_reason,
52                latency_ms = excluded.latency_ms,
53                ttft_ms = excluded.ttft_ms,
54                retry_count = excluded.retry_count,
55                context_used_tokens = excluded.context_used_tokens,
56                context_max_tokens = excluded.context_max_tokens,
57                cache_creation_tokens = excluded.cache_creation_tokens,
58                cache_read_tokens = excluded.cache_read_tokens,
59                system_prompt_tokens = excluded.system_prompt_tokens",
60            params![
61                e.session_id,
62                e.seq as i64,
63                e.ts_ms as i64,
64                bool_to_i64(e.ts_exact),
65                format!("{:?}", e.kind),
66                format!("{:?}", e.source),
67                e.tool,
68                e.tool_call_id,
69                e.tokens_in.map(|v| v as i64),
70                e.tokens_out.map(|v| v as i64),
71                e.reasoning_tokens.map(|v| v as i64),
72                e.cost_usd_e6,
73                payload,
74                e.stop_reason,
75                e.latency_ms.map(|v| v as i64),
76                e.ttft_ms.map(|v| v as i64),
77                e.retry_count.map(|v| v as i64),
78                e.context_used_tokens.map(|v| v as i64),
79                e.context_max_tokens.map(|v| v as i64),
80                e.cache_creation_tokens.map(|v| v as i64),
81                e.cache_read_tokens.map(|v| v as i64),
82                e.system_prompt_tokens.map(|v| v as i64),
83            ],
84        )?;
85        if self.conn.changes() == 0 {
86            return Ok(());
87        }
88        self.append_hot_event(e)?;
89        if projector_legacy_mode() {
90            index_event_derived(&self.conn, e)?;
91            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
92            self.invalidate_span_tree_cache();
93        } else if last_before.is_some_and(|last| e.seq <= last) {
94            self.replay_projector_session(&e.session_id)?;
95        } else {
96            let deltas = self.projector.borrow_mut().apply(e);
97            self.apply_projector_events(&deltas)?;
98            let expired = self
99                .projector
100                .borrow_mut()
101                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
102            self.apply_projector_events(&expired)?;
103            if is_stop_event(e) {
104                let flushed = self
105                    .projector
106                    .borrow_mut()
107                    .flush_session(&e.session_id, e.ts_ms);
108                self.apply_projector_events(&flushed)?;
109            }
110            self.invalidate_span_tree_cache();
111        }
112        self.append_search_event(e);
113        self.refresh_extension_rows(e)?;
114        let Some(ctx) = ctx else {
115            return Ok(());
116        };
117        let sync = &ctx.sync;
118        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
119            return Ok(());
120        }
121        let Some(salt) = try_team_salt(sync) else {
122            tracing::warn!(
123                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
124            );
125            return Ok(());
126        };
127        if sync.sample_rate < 1.0 {
128            let u: f64 = rand::random();
129            if u > sync.sample_rate {
130                return Ok(());
131            }
132        }
133        let Some(session) = self.get_session(&e.session_id)? else {
134            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
135            return Ok(());
136        };
137        let mut outbound = outbound_event_from_row(e, &session, &salt);
138        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
139        let row = serde_json::to_string(&outbound)?;
140        self.outbox()?.append(&e.session_id, "events", &row)?;
141        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
142        Ok(())
143    }
144
145    fn refresh_extension_rows(&self, e: &Event) -> Result<()> {
146        crate::extensions::hash_chain::store_event_hash(self, e)?;
147        crate::extensions::aggregates::upsert_session(self, &e.session_id)?;
148        if let Err(err) = crate::extensions::diffs::refresh_session(self, &e.session_id, false) {
149            tracing::warn!(session_id = %e.session_id, "step diff attribution skipped: {err:#}");
150        }
151        Ok(())
152    }
153
154    pub(super) fn append_hot_event(&self, e: &Event) -> Result<()> {
155        if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
156            return Ok(());
157        }
158        let mut slot = self.hot_log.borrow_mut();
159        if slot.is_none() {
160            *slot = Some(HotLog::open(&self.root)?);
161        }
162        if let Some(log) = slot.as_mut() {
163            log.append(e)?;
164        }
165        Ok(())
166    }
167
168    pub(super) fn append_search_event(&self, e: &Event) {
169        if let Err(err) = self.try_append_search_event(e) {
170            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
171            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
172        }
173    }
174
175    pub(super) fn try_append_search_event(&self, e: &Event) -> Result<()> {
176        let Some(session) = self.get_session(&e.session_id)? else {
177            return Ok(());
178        };
179        let workspace = PathBuf::from(&session.workspace);
180        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
181        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
182        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
183            return Ok(());
184        };
185        let mut slot = self.search_writer.borrow_mut();
186        if slot.is_none() {
187            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
188        }
189        slot.as_mut().expect("writer").add(&doc)
190    }
191
192    pub fn flush_search(&self) -> Result<()> {
193        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
194            writer.commit()?;
195        }
196        Ok(())
197    }
198}