1use super::events::*;
2use super::rows::*;
3use super::*;
4
5impl Store {
6 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
8 let n: i64 = self.conn.query_row(
9 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
10 [session_id],
11 |r| r.get(0),
12 )?;
13 Ok(n as u64)
14 }
15
16 pub fn append_event(&self, e: &Event) -> Result<()> {
17 self.append_event_with_sync(e, None)
18 }
19
20 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
22 let last_before = if projector_legacy_mode() {
23 None
24 } else {
25 self.last_event_seq_for_session(&e.session_id)?
26 };
27 let payload = serde_json::to_string(&e.payload)?;
28 self.conn.execute(
29 "INSERT INTO events (
30 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
31 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
32 stop_reason, latency_ms, ttft_ms, retry_count,
33 context_used_tokens, context_max_tokens,
34 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
35 ) VALUES (
36 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
37 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
38 )
39 ON CONFLICT(session_id, seq) DO UPDATE SET
40 ts_ms = excluded.ts_ms,
41 ts_exact = excluded.ts_exact,
42 kind = excluded.kind,
43 source = excluded.source,
44 tool = excluded.tool,
45 tool_call_id = excluded.tool_call_id,
46 tokens_in = excluded.tokens_in,
47 tokens_out = excluded.tokens_out,
48 reasoning_tokens = excluded.reasoning_tokens,
49 cost_usd_e6 = excluded.cost_usd_e6,
50 payload = excluded.payload,
51 stop_reason = excluded.stop_reason,
52 latency_ms = excluded.latency_ms,
53 ttft_ms = excluded.ttft_ms,
54 retry_count = excluded.retry_count,
55 context_used_tokens = excluded.context_used_tokens,
56 context_max_tokens = excluded.context_max_tokens,
57 cache_creation_tokens = excluded.cache_creation_tokens,
58 cache_read_tokens = excluded.cache_read_tokens,
59 system_prompt_tokens = excluded.system_prompt_tokens",
60 params![
61 e.session_id,
62 e.seq as i64,
63 e.ts_ms as i64,
64 bool_to_i64(e.ts_exact),
65 format!("{:?}", e.kind),
66 format!("{:?}", e.source),
67 e.tool,
68 e.tool_call_id,
69 e.tokens_in.map(|v| v as i64),
70 e.tokens_out.map(|v| v as i64),
71 e.reasoning_tokens.map(|v| v as i64),
72 e.cost_usd_e6,
73 payload,
74 e.stop_reason,
75 e.latency_ms.map(|v| v as i64),
76 e.ttft_ms.map(|v| v as i64),
77 e.retry_count.map(|v| v as i64),
78 e.context_used_tokens.map(|v| v as i64),
79 e.context_max_tokens.map(|v| v as i64),
80 e.cache_creation_tokens.map(|v| v as i64),
81 e.cache_read_tokens.map(|v| v as i64),
82 e.system_prompt_tokens.map(|v| v as i64),
83 ],
84 )?;
85 if self.conn.changes() == 0 {
86 return Ok(());
87 }
88 self.append_hot_event(e)?;
89 if projector_legacy_mode() {
90 index_event_derived(&self.conn, e)?;
91 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
92 self.invalidate_span_tree_cache();
93 } else if last_before.is_some_and(|last| e.seq <= last) {
94 self.replay_projector_session(&e.session_id)?;
95 } else {
96 let deltas = self.projector.borrow_mut().apply(e);
97 self.apply_projector_events(&deltas)?;
98 let expired = self
99 .projector
100 .borrow_mut()
101 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
102 self.apply_projector_events(&expired)?;
103 if is_stop_event(e) {
104 let flushed = self
105 .projector
106 .borrow_mut()
107 .flush_session(&e.session_id, e.ts_ms);
108 self.apply_projector_events(&flushed)?;
109 }
110 self.invalidate_span_tree_cache();
111 }
112 self.append_search_event(e);
113 self.refresh_extension_rows(e)?;
114 let Some(ctx) = ctx else {
115 return Ok(());
116 };
117 let sync = &ctx.sync;
118 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
119 return Ok(());
120 }
121 let Some(salt) = try_team_salt(sync) else {
122 tracing::warn!(
123 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
124 );
125 return Ok(());
126 };
127 if sync.sample_rate < 1.0 {
128 let u: f64 = rand::random();
129 if u > sync.sample_rate {
130 return Ok(());
131 }
132 }
133 let Some(session) = self.get_session(&e.session_id)? else {
134 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
135 return Ok(());
136 };
137 let mut outbound = outbound_event_from_row(e, &session, &salt);
138 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
139 let row = serde_json::to_string(&outbound)?;
140 self.outbox()?.append(&e.session_id, "events", &row)?;
141 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
142 Ok(())
143 }
144
145 fn refresh_extension_rows(&self, e: &Event) -> Result<()> {
146 crate::extensions::hash_chain::store_event_hash(self, e)?;
147 crate::extensions::aggregates::upsert_session(self, &e.session_id)?;
148 if let Err(err) = crate::extensions::diffs::refresh_session(self, &e.session_id, false) {
149 tracing::warn!(session_id = %e.session_id, "step diff attribution skipped: {err:#}");
150 }
151 Ok(())
152 }
153
154 pub(super) fn append_hot_event(&self, e: &Event) -> Result<()> {
155 if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
156 return Ok(());
157 }
158 let mut slot = self.hot_log.borrow_mut();
159 if slot.is_none() {
160 *slot = Some(HotLog::open(&self.root)?);
161 }
162 if let Some(log) = slot.as_mut() {
163 log.append(e)?;
164 }
165 Ok(())
166 }
167
168 pub(super) fn append_search_event(&self, e: &Event) {
169 if let Err(err) = self.try_append_search_event(e) {
170 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
171 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
172 }
173 }
174
175 pub(super) fn try_append_search_event(&self, e: &Event) -> Result<()> {
176 let Some(session) = self.get_session(&e.session_id)? else {
177 return Ok(());
178 };
179 let workspace = PathBuf::from(&session.workspace);
180 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
181 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
182 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
183 return Ok(());
184 };
185 let mut slot = self.search_writer.borrow_mut();
186 if slot.is_none() {
187 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
188 }
189 slot.as_mut().expect("writer").add(&doc)
190 }
191
192 pub fn flush_search(&self) -> Result<()> {
193 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
194 writer.commit()?;
195 }
196 Ok(())
197 }
198}