Skip to main content

kaizen/store/sqlite/
event_write.rs

1use super::events::*;
2use super::rows::*;
3use super::*;
4
5impl Store {
6    /// Next `seq` for a new event in this session (0 when there are no events yet).
7    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
8        let n: i64 = self.conn.query_row(
9            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
10            [session_id],
11            |r| r.get(0),
12        )?;
13        Ok(n as u64)
14    }
15
16    pub fn append_event(&self, e: &Event) -> Result<()> {
17        self.append_event_with_sync(e, None)
18    }
19
20    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
21    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
22        let last_before = if projector_legacy_mode() {
23            None
24        } else {
25            self.last_event_seq_for_session(&e.session_id)?
26        };
27        if !projector_legacy_mode() {
28            self.sync_projector_session(&e.session_id, last_before)?;
29        }
30        let payload = serde_json::to_string(&e.payload)?;
31        self.conn.execute(
32            "INSERT INTO events (
33                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
34                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
35                stop_reason, latency_ms, ttft_ms, retry_count,
36                context_used_tokens, context_max_tokens,
37                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
38             ) VALUES (
39                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
40                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
41             )
42             ON CONFLICT(session_id, seq) DO UPDATE SET
43                ts_ms = excluded.ts_ms,
44                ts_exact = excluded.ts_exact,
45                kind = excluded.kind,
46                source = excluded.source,
47                tool = excluded.tool,
48                tool_call_id = excluded.tool_call_id,
49                tokens_in = excluded.tokens_in,
50                tokens_out = excluded.tokens_out,
51                reasoning_tokens = excluded.reasoning_tokens,
52                cost_usd_e6 = excluded.cost_usd_e6,
53                payload = excluded.payload,
54                stop_reason = excluded.stop_reason,
55                latency_ms = excluded.latency_ms,
56                ttft_ms = excluded.ttft_ms,
57                retry_count = excluded.retry_count,
58                context_used_tokens = excluded.context_used_tokens,
59                context_max_tokens = excluded.context_max_tokens,
60                cache_creation_tokens = excluded.cache_creation_tokens,
61                cache_read_tokens = excluded.cache_read_tokens,
62                system_prompt_tokens = excluded.system_prompt_tokens",
63            params![
64                e.session_id,
65                e.seq as i64,
66                e.ts_ms as i64,
67                bool_to_i64(e.ts_exact),
68                format!("{:?}", e.kind),
69                format!("{:?}", e.source),
70                e.tool,
71                e.tool_call_id,
72                e.tokens_in.map(|v| v as i64),
73                e.tokens_out.map(|v| v as i64),
74                e.reasoning_tokens.map(|v| v as i64),
75                e.cost_usd_e6,
76                payload,
77                e.stop_reason,
78                e.latency_ms.map(|v| v as i64),
79                e.ttft_ms.map(|v| v as i64),
80                e.retry_count.map(|v| v as i64),
81                e.context_used_tokens.map(|v| v as i64),
82                e.context_max_tokens.map(|v| v as i64),
83                e.cache_creation_tokens.map(|v| v as i64),
84                e.cache_read_tokens.map(|v| v as i64),
85                e.system_prompt_tokens.map(|v| v as i64),
86            ],
87        )?;
88        if self.conn.changes() == 0 {
89            return Ok(());
90        }
91        if projector_legacy_mode() {
92            index_event_derived(&self.conn, e)?;
93            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
94            self.invalidate_span_tree_cache();
95        } else if last_before.is_some_and(|last| e.seq <= last) {
96            self.replay_projector_session(&e.session_id)?;
97        } else {
98            let deltas = self.projector.borrow_mut().apply(e);
99            self.apply_projector_events(&deltas)?;
100            let expired = self
101                .projector
102                .borrow_mut()
103                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
104            self.apply_projector_events(&expired)?;
105            if is_stop_event(e) {
106                let flushed = self
107                    .projector
108                    .borrow_mut()
109                    .flush_session(&e.session_id, e.ts_ms);
110                self.apply_projector_events(&flushed)?;
111            }
112            self.invalidate_span_tree_cache();
113        }
114        self.append_search_event(e);
115        self.refresh_extension_rows(e)?;
116        let Some(ctx) = ctx else {
117            return Ok(());
118        };
119        let sync = &ctx.sync;
120        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
121            return Ok(());
122        }
123        let Some(salt) = try_team_salt(sync) else {
124            tracing::warn!(
125                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
126            );
127            return Ok(());
128        };
129        if sync.sample_rate < 1.0 {
130            let u: f64 = rand::random();
131            if u > sync.sample_rate {
132                return Ok(());
133            }
134        }
135        let Some(session) = self.get_session(&e.session_id)? else {
136            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
137            return Ok(());
138        };
139        let mut outbound = outbound_event_from_row(e, &session, &salt);
140        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
141        let row = serde_json::to_string(&outbound)?;
142        self.append_outbox_row(&e.session_id, "events", &row)?;
143        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
144        Ok(())
145    }
146
147    fn refresh_extension_rows(&self, e: &Event) -> Result<()> {
148        crate::extensions::hash_chain::store_event_hash(self, e)?;
149        crate::extensions::aggregates::upsert_session(self, &e.session_id)?;
150        if let Err(err) = crate::extensions::diffs::refresh_session(self, &e.session_id, false) {
151            tracing::warn!(session_id = %e.session_id, "step diff attribution skipped: {err:#}");
152        }
153        Ok(())
154    }
155
156    pub(super) fn append_search_event(&self, e: &Event) {
157        if let Err(err) = self.try_append_search_event(e) {
158            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
159            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
160        }
161    }
162
163    pub(super) fn try_append_search_event(&self, e: &Event) -> Result<()> {
164        let Some(session) = self.get_session(&e.session_id)? else {
165            return Ok(());
166        };
167        let workspace = PathBuf::from(&session.workspace);
168        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
169        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
170        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
171            return Ok(());
172        };
173        let mut slot = self.search_writer.borrow_mut();
174        if slot.is_none() {
175            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
176        }
177        slot.as_mut().expect("writer").add(&doc)
178    }
179
180    pub fn flush_search(&self) -> Result<()> {
181        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
182            writer.commit()?;
183        }
184        Ok(())
185    }
186}