1use super::events::*;
2use super::rows::*;
3use super::*;
4
5impl Store {
6 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
8 let n: i64 = self.conn.query_row(
9 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
10 [session_id],
11 |r| r.get(0),
12 )?;
13 Ok(n as u64)
14 }
15
16 pub fn append_event(&self, e: &Event) -> Result<()> {
17 self.append_event_with_sync(e, None)
18 }
19
20 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
22 let last_before = if projector_legacy_mode() {
23 None
24 } else {
25 self.last_event_seq_for_session(&e.session_id)?
26 };
27 if !projector_legacy_mode() {
28 self.sync_projector_session(&e.session_id, last_before)?;
29 }
30 let payload = serde_json::to_string(&e.payload)?;
31 self.conn.execute(
32 "INSERT INTO events (
33 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
34 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
35 stop_reason, latency_ms, ttft_ms, retry_count,
36 context_used_tokens, context_max_tokens,
37 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
38 ) VALUES (
39 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
40 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
41 )
42 ON CONFLICT(session_id, seq) DO UPDATE SET
43 ts_ms = excluded.ts_ms,
44 ts_exact = excluded.ts_exact,
45 kind = excluded.kind,
46 source = excluded.source,
47 tool = excluded.tool,
48 tool_call_id = excluded.tool_call_id,
49 tokens_in = excluded.tokens_in,
50 tokens_out = excluded.tokens_out,
51 reasoning_tokens = excluded.reasoning_tokens,
52 cost_usd_e6 = excluded.cost_usd_e6,
53 payload = excluded.payload,
54 stop_reason = excluded.stop_reason,
55 latency_ms = excluded.latency_ms,
56 ttft_ms = excluded.ttft_ms,
57 retry_count = excluded.retry_count,
58 context_used_tokens = excluded.context_used_tokens,
59 context_max_tokens = excluded.context_max_tokens,
60 cache_creation_tokens = excluded.cache_creation_tokens,
61 cache_read_tokens = excluded.cache_read_tokens,
62 system_prompt_tokens = excluded.system_prompt_tokens",
63 params![
64 e.session_id,
65 e.seq as i64,
66 e.ts_ms as i64,
67 bool_to_i64(e.ts_exact),
68 format!("{:?}", e.kind),
69 format!("{:?}", e.source),
70 e.tool,
71 e.tool_call_id,
72 e.tokens_in.map(|v| v as i64),
73 e.tokens_out.map(|v| v as i64),
74 e.reasoning_tokens.map(|v| v as i64),
75 e.cost_usd_e6,
76 payload,
77 e.stop_reason,
78 e.latency_ms.map(|v| v as i64),
79 e.ttft_ms.map(|v| v as i64),
80 e.retry_count.map(|v| v as i64),
81 e.context_used_tokens.map(|v| v as i64),
82 e.context_max_tokens.map(|v| v as i64),
83 e.cache_creation_tokens.map(|v| v as i64),
84 e.cache_read_tokens.map(|v| v as i64),
85 e.system_prompt_tokens.map(|v| v as i64),
86 ],
87 )?;
88 if self.conn.changes() == 0 {
89 return Ok(());
90 }
91 if projector_legacy_mode() {
92 index_event_derived(&self.conn, e)?;
93 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
94 self.invalidate_span_tree_cache();
95 } else if last_before.is_some_and(|last| e.seq <= last) {
96 self.replay_projector_session(&e.session_id)?;
97 } else {
98 let deltas = self.projector.borrow_mut().apply(e);
99 self.apply_projector_events(&deltas)?;
100 let expired = self
101 .projector
102 .borrow_mut()
103 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
104 self.apply_projector_events(&expired)?;
105 if is_stop_event(e) {
106 let flushed = self
107 .projector
108 .borrow_mut()
109 .flush_session(&e.session_id, e.ts_ms);
110 self.apply_projector_events(&flushed)?;
111 }
112 self.invalidate_span_tree_cache();
113 }
114 self.append_search_event(e);
115 self.refresh_extension_rows(e)?;
116 let Some(ctx) = ctx else {
117 return Ok(());
118 };
119 let sync = &ctx.sync;
120 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
121 return Ok(());
122 }
123 let Some(salt) = try_team_salt(sync) else {
124 tracing::warn!(
125 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
126 );
127 return Ok(());
128 };
129 if sync.sample_rate < 1.0 {
130 let u: f64 = rand::random();
131 if u > sync.sample_rate {
132 return Ok(());
133 }
134 }
135 let Some(session) = self.get_session(&e.session_id)? else {
136 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
137 return Ok(());
138 };
139 let mut outbound = outbound_event_from_row(e, &session, &salt);
140 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
141 let row = serde_json::to_string(&outbound)?;
142 self.append_outbox_row(&e.session_id, "events", &row)?;
143 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
144 Ok(())
145 }
146
147 fn refresh_extension_rows(&self, e: &Event) -> Result<()> {
148 crate::extensions::hash_chain::store_event_hash(self, e)?;
149 crate::extensions::aggregates::upsert_session(self, &e.session_id)?;
150 if let Err(err) = crate::extensions::diffs::refresh_session(self, &e.session_id, false) {
151 tracing::warn!(session_id = %e.session_id, "step diff attribution skipped: {err:#}");
152 }
153 Ok(())
154 }
155
156 pub(super) fn append_search_event(&self, e: &Event) {
157 if let Err(err) = self.try_append_search_event(e) {
158 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
159 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
160 }
161 }
162
163 pub(super) fn try_append_search_event(&self, e: &Event) -> Result<()> {
164 let Some(session) = self.get_session(&e.session_id)? else {
165 return Ok(());
166 };
167 let workspace = PathBuf::from(&session.workspace);
168 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
169 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
170 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
171 return Ok(());
172 };
173 let mut slot = self.search_writer.borrow_mut();
174 if slot.is_none() {
175 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
176 }
177 slot.as_mut().expect("writer").add(&doc)
178 }
179
180 pub fn flush_search(&self) -> Result<()> {
181 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
182 writer.commit()?;
183 }
184 Ok(())
185 }
186}