Skip to main content

kaizen/store/sqlite/
event_write.rs

1use super::events::*;
2use super::rows::*;
3use super::*;
4
5impl Store {
6    /// Next `seq` for a new event in this session (0 when there are no events yet).
7    pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
8        let n: i64 = self.conn.query_row(
9            "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
10            [session_id],
11            |r| r.get(0),
12        )?;
13        Ok(n as u64)
14    }
15
16    pub fn append_event(&self, e: &Event) -> Result<()> {
17        self.append_event_with_sync(e, None)
18    }
19
20    /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row.
21    pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
22        self.append_event_inner(e, ctx, true)
23    }
24
25    pub(super) fn append_event_deferred(
26        &self,
27        e: &Event,
28        ctx: Option<&SyncIngestContext>,
29    ) -> Result<()> {
30        self.append_event_inner(e, ctx, false)
31    }
32
33    fn append_event_inner(
34        &self,
35        e: &Event,
36        ctx: Option<&SyncIngestContext>,
37        refresh_session: bool,
38    ) -> Result<()> {
39        let last_before = if projector_legacy_mode() {
40            None
41        } else {
42            self.last_event_seq_for_session(&e.session_id)?
43        };
44        if !projector_legacy_mode() {
45            self.sync_projector_session(&e.session_id, last_before)?;
46        }
47        let payload = serde_json::to_string(&e.payload)?;
48        self.conn.execute(
49            "INSERT INTO events (
50                session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
51                tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
52                stop_reason, latency_ms, ttft_ms, retry_count,
53                context_used_tokens, context_max_tokens,
54                cache_creation_tokens, cache_read_tokens, system_prompt_tokens
55             ) VALUES (
56                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
57                ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
58             )
59             ON CONFLICT(session_id, seq) DO NOTHING",
60            params![
61                e.session_id,
62                e.seq as i64,
63                e.ts_ms as i64,
64                bool_to_i64(e.ts_exact),
65                format!("{:?}", e.kind),
66                format!("{:?}", e.source),
67                e.tool,
68                e.tool_call_id,
69                e.tokens_in.map(|v| v as i64),
70                e.tokens_out.map(|v| v as i64),
71                e.reasoning_tokens.map(|v| v as i64),
72                e.cost_usd_e6,
73                payload,
74                e.stop_reason,
75                e.latency_ms.map(|v| v as i64),
76                e.ttft_ms.map(|v| v as i64),
77                e.retry_count.map(|v| v as i64),
78                e.context_used_tokens.map(|v| v as i64),
79                e.context_max_tokens.map(|v| v as i64),
80                e.cache_creation_tokens.map(|v| v as i64),
81                e.cache_read_tokens.map(|v| v as i64),
82                e.system_prompt_tokens.map(|v| v as i64),
83            ],
84        )?;
85        if self.conn.changes() == 0 {
86            return Ok(());
87        }
88        if projector_legacy_mode() {
89            index_event_derived(&self.conn, e)?;
90            rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
91            self.invalidate_span_tree_cache();
92        } else if last_before.is_some_and(|last| e.seq <= last) {
93            self.replay_projector_session(&e.session_id)?;
94        } else {
95            let deltas = self.projector.borrow_mut().apply(e);
96            self.apply_projector_events(&deltas)?;
97            let expired = self
98                .projector
99                .borrow_mut()
100                .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
101            self.apply_projector_events(&expired)?;
102            if is_stop_event(e) {
103                let flushed = self
104                    .projector
105                    .borrow_mut()
106                    .flush_session(&e.session_id, e.ts_ms);
107                self.apply_projector_events(&flushed)?;
108            }
109            self.invalidate_span_tree_cache();
110        }
111        self.append_search_event(e);
112        self.index_extension_event(e)?;
113        if refresh_session {
114            self.apply_live_extension_event(e)?;
115        }
116        let Some(ctx) = ctx else {
117            return Ok(());
118        };
119        let sync = &ctx.sync;
120        if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
121            return Ok(());
122        }
123        let Some(salt) = try_team_salt(sync) else {
124            tracing::warn!(
125                "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
126            );
127            return Ok(());
128        };
129        if sync.sample_rate < 1.0 {
130            let u: f64 = rand::random();
131            if u > sync.sample_rate {
132                return Ok(());
133            }
134        }
135        let Some(session) = self.get_session(&e.session_id)? else {
136            tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
137            return Ok(());
138        };
139        let mut outbound = outbound_event_from_row(e, &session, &salt);
140        redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
141        let row = serde_json::to_string(&outbound)?;
142        self.append_outbox_row(&e.session_id, "events", &row)?;
143        enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
144        Ok(())
145    }
146
147    pub(super) fn append_search_event(&self, e: &Event) {
148        if let Err(err) = self.try_append_search_event(e) {
149            tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
150            let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
151        }
152    }
153
154    pub(super) fn try_append_search_event(&self, e: &Event) -> Result<()> {
155        let Some(session) = self.get_session(&e.session_id)? else {
156            return Ok(());
157        };
158        let workspace = PathBuf::from(&session.workspace);
159        let cfg = crate::core::config::load(&workspace).unwrap_or_default();
160        let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
161        let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
162            return Ok(());
163        };
164        let mut slot = self.search_writer.borrow_mut();
165        if slot.is_none() {
166            *slot = Some(crate::search::PendingWriter::open(&self.root)?);
167        }
168        slot.as_mut().expect("writer").add(&doc)
169    }
170
171    pub fn flush_search(&self) -> Result<()> {
172        if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
173            writer.commit()?;
174        }
175        Ok(())
176    }
177}