Skip to main content

kaizen/store/sqlite/
event_write.rs

1use super::events::*;
2use super::rows::*;
3use super::*;
4
5impl Store {
6    /// Next `seq` for a new event in this session (0 when there are no events yet).
7    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
8        let n: i64 = self.conn.query_row(
9            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
10            [session_id],
11            |r| r.get(0),
12        )?;
13        Ok(n as u64)
14    }
15
16    pub fn append_event(&self, e: &Event) -> Result<()> {
17        self.append_event_with_sync(e, None)
18    }
19
20    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
21    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
22        let last_before = if projector_legacy_mode() {
23            None
24        } else {
25            self.last_event_seq_for_session(&e.session_id)?
26        };
27        let payload = serde_json::to_string(&e.payload)?;
28        self.conn.execute(
29            "INSERT INTO events (
30                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
31                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
32                stop_reason, latency_ms, ttft_ms, retry_count,
33                context_used_tokens, context_max_tokens,
34                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
35             ) VALUES (
36                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
37                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
38             )
39             ON CONFLICT(session_id, seq) DO UPDATE SET
40                ts_ms = excluded.ts_ms,
41                ts_exact = excluded.ts_exact,
42                kind = excluded.kind,
43                source = excluded.source,
44                tool = excluded.tool,
45                tool_call_id = excluded.tool_call_id,
46                tokens_in = excluded.tokens_in,
47                tokens_out = excluded.tokens_out,
48                reasoning_tokens = excluded.reasoning_tokens,
49                cost_usd_e6 = excluded.cost_usd_e6,
50                payload = excluded.payload,
51                stop_reason = excluded.stop_reason,
52                latency_ms = excluded.latency_ms,
53                ttft_ms = excluded.ttft_ms,
54                retry_count = excluded.retry_count,
55                context_used_tokens = excluded.context_used_tokens,
56                context_max_tokens = excluded.context_max_tokens,
57                cache_creation_tokens = excluded.cache_creation_tokens,
58                cache_read_tokens = excluded.cache_read_tokens,
59                system_prompt_tokens = excluded.system_prompt_tokens",
60            params![
61                e.session_id,
62                e.seq as i64,
63                e.ts_ms as i64,
64                bool_to_i64(e.ts_exact),
65                format!("{:?}", e.kind),
66                format!("{:?}", e.source),
67                e.tool,
68                e.tool_call_id,
69                e.tokens_in.map(|v| v as i64),
70                e.tokens_out.map(|v| v as i64),
71                e.reasoning_tokens.map(|v| v as i64),
72                e.cost_usd_e6,
73                payload,
74                e.stop_reason,
75                e.latency_ms.map(|v| v as i64),
76                e.ttft_ms.map(|v| v as i64),
77                e.retry_count.map(|v| v as i64),
78                e.context_used_tokens.map(|v| v as i64),
79                e.context_max_tokens.map(|v| v as i64),
80                e.cache_creation_tokens.map(|v| v as i64),
81                e.cache_read_tokens.map(|v| v as i64),
82                e.system_prompt_tokens.map(|v| v as i64),
83            ],
84        )?;
85        if self.conn.changes() == 0 {
86            return Ok(());
87        }
88        self.append_hot_event(e)?;
89        if projector_legacy_mode() {
90            index_event_derived(&self.conn, e)?;
91            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
92            self.invalidate_span_tree_cache();
93        } else if last_before.is_some_and(|last| e.seq <= last) {
94            self.replay_projector_session(&e.session_id)?;
95        } else {
96            let deltas = self.projector.borrow_mut().apply(e);
97            self.apply_projector_events(&deltas)?;
98            let expired = self
99                .projector
100                .borrow_mut()
101                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
102            self.apply_projector_events(&expired)?;
103            if is_stop_event(e) {
104                let flushed = self
105                    .projector
106                    .borrow_mut()
107                    .flush_session(&e.session_id, e.ts_ms);
108                self.apply_projector_events(&flushed)?;
109            }
110            self.invalidate_span_tree_cache();
111        }
112        self.append_search_event(e);
113        let Some(ctx) = ctx else {
114            return Ok(());
115        };
116        let sync = &ctx.sync;
117        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
118            return Ok(());
119        }
120        let Some(salt) = try_team_salt(sync) else {
121            tracing::warn!(
122                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
123            );
124            return Ok(());
125        };
126        if sync.sample_rate < 1.0 {
127            let u: f64 = rand::random();
128            if u > sync.sample_rate {
129                return Ok(());
130            }
131        }
132        let Some(session) = self.get_session(&e.session_id)? else {
133            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
134            return Ok(());
135        };
136        let mut outbound = outbound_event_from_row(e, &session, &salt);
137        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
138        let row = serde_json::to_string(&outbound)?;
139        self.outbox()?.append(&e.session_id, "events", &row)?;
140        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
141        Ok(())
142    }
143
144    pub(super) fn append_hot_event(&self, e: &Event) -> Result<()> {
145        if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
146            return Ok(());
147        }
148        let mut slot = self.hot_log.borrow_mut();
149        if slot.is_none() {
150            *slot = Some(HotLog::open(&self.root)?);
151        }
152        if let Some(log) = slot.as_mut() {
153            log.append(e)?;
154        }
155        Ok(())
156    }
157
158    pub(super) fn append_search_event(&self, e: &Event) {
159        if let Err(err) = self.try_append_search_event(e) {
160            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
161            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
162        }
163    }
164
165    pub(super) fn try_append_search_event(&self, e: &Event) -> Result<()> {
166        let Some(session) = self.get_session(&e.session_id)? else {
167            return Ok(());
168        };
169        let workspace = PathBuf::from(&session.workspace);
170        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
171        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
172        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
173            return Ok(());
174        };
175        let mut slot = self.search_writer.borrow_mut();
176        if slot.is_none() {
177            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
178        }
179        slot.as_mut().expect("writer").add(&doc)
180    }
181
182    pub fn flush_search(&self) -> Result<()> {
183        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
184            writer.commit()?;
185        }
186        Ok(())
187    }
188}