1use super::events::*;
2use super::rows::*;
3use super::*;
4
5impl Store {
6 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
8 let n: i64 = self.conn.query_row(
9 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
10 [session_id],
11 |r| r.get(0),
12 )?;
13 Ok(n as u64)
14 }
15
16 pub fn append_event(&self, e: &Event) -> Result<()> {
17 self.append_event_with_sync(e, None)
18 }
19
20 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
22 let last_before = if projector_legacy_mode() {
23 None
24 } else {
25 self.last_event_seq_for_session(&e.session_id)?
26 };
27 let payload = serde_json::to_string(&e.payload)?;
28 self.conn.execute(
29 "INSERT INTO events (
30 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
31 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
32 stop_reason, latency_ms, ttft_ms, retry_count,
33 context_used_tokens, context_max_tokens,
34 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
35 ) VALUES (
36 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
37 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
38 )
39 ON CONFLICT(session_id, seq) DO UPDATE SET
40 ts_ms = excluded.ts_ms,
41 ts_exact = excluded.ts_exact,
42 kind = excluded.kind,
43 source = excluded.source,
44 tool = excluded.tool,
45 tool_call_id = excluded.tool_call_id,
46 tokens_in = excluded.tokens_in,
47 tokens_out = excluded.tokens_out,
48 reasoning_tokens = excluded.reasoning_tokens,
49 cost_usd_e6 = excluded.cost_usd_e6,
50 payload = excluded.payload,
51 stop_reason = excluded.stop_reason,
52 latency_ms = excluded.latency_ms,
53 ttft_ms = excluded.ttft_ms,
54 retry_count = excluded.retry_count,
55 context_used_tokens = excluded.context_used_tokens,
56 context_max_tokens = excluded.context_max_tokens,
57 cache_creation_tokens = excluded.cache_creation_tokens,
58 cache_read_tokens = excluded.cache_read_tokens,
59 system_prompt_tokens = excluded.system_prompt_tokens",
60 params![
61 e.session_id,
62 e.seq as i64,
63 e.ts_ms as i64,
64 bool_to_i64(e.ts_exact),
65 format!("{:?}", e.kind),
66 format!("{:?}", e.source),
67 e.tool,
68 e.tool_call_id,
69 e.tokens_in.map(|v| v as i64),
70 e.tokens_out.map(|v| v as i64),
71 e.reasoning_tokens.map(|v| v as i64),
72 e.cost_usd_e6,
73 payload,
74 e.stop_reason,
75 e.latency_ms.map(|v| v as i64),
76 e.ttft_ms.map(|v| v as i64),
77 e.retry_count.map(|v| v as i64),
78 e.context_used_tokens.map(|v| v as i64),
79 e.context_max_tokens.map(|v| v as i64),
80 e.cache_creation_tokens.map(|v| v as i64),
81 e.cache_read_tokens.map(|v| v as i64),
82 e.system_prompt_tokens.map(|v| v as i64),
83 ],
84 )?;
85 if self.conn.changes() == 0 {
86 return Ok(());
87 }
88 self.append_hot_event(e)?;
89 if projector_legacy_mode() {
90 index_event_derived(&self.conn, e)?;
91 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
92 self.invalidate_span_tree_cache();
93 } else if last_before.is_some_and(|last| e.seq <= last) {
94 self.replay_projector_session(&e.session_id)?;
95 } else {
96 let deltas = self.projector.borrow_mut().apply(e);
97 self.apply_projector_events(&deltas)?;
98 let expired = self
99 .projector
100 .borrow_mut()
101 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
102 self.apply_projector_events(&expired)?;
103 if is_stop_event(e) {
104 let flushed = self
105 .projector
106 .borrow_mut()
107 .flush_session(&e.session_id, e.ts_ms);
108 self.apply_projector_events(&flushed)?;
109 }
110 self.invalidate_span_tree_cache();
111 }
112 self.append_search_event(e);
113 let Some(ctx) = ctx else {
114 return Ok(());
115 };
116 let sync = &ctx.sync;
117 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
118 return Ok(());
119 }
120 let Some(salt) = try_team_salt(sync) else {
121 tracing::warn!(
122 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
123 );
124 return Ok(());
125 };
126 if sync.sample_rate < 1.0 {
127 let u: f64 = rand::random();
128 if u > sync.sample_rate {
129 return Ok(());
130 }
131 }
132 let Some(session) = self.get_session(&e.session_id)? else {
133 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
134 return Ok(());
135 };
136 let mut outbound = outbound_event_from_row(e, &session, &salt);
137 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
138 let row = serde_json::to_string(&outbound)?;
139 self.outbox()?.append(&e.session_id, "events", &row)?;
140 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
141 Ok(())
142 }
143
144 pub(super) fn append_hot_event(&self, e: &Event) -> Result<()> {
145 if std::env::var("KAIZEN_HOT_LOG").as_deref() == Ok("0") {
146 return Ok(());
147 }
148 let mut slot = self.hot_log.borrow_mut();
149 if slot.is_none() {
150 *slot = Some(HotLog::open(&self.root)?);
151 }
152 if let Some(log) = slot.as_mut() {
153 log.append(e)?;
154 }
155 Ok(())
156 }
157
158 pub(super) fn append_search_event(&self, e: &Event) {
159 if let Err(err) = self.try_append_search_event(e) {
160 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
161 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
162 }
163 }
164
165 pub(super) fn try_append_search_event(&self, e: &Event) -> Result<()> {
166 let Some(session) = self.get_session(&e.session_id)? else {
167 return Ok(());
168 };
169 let workspace = PathBuf::from(&session.workspace);
170 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
171 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
172 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
173 return Ok(());
174 };
175 let mut slot = self.search_writer.borrow_mut();
176 if slot.is_none() {
177 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
178 }
179 slot.as_mut().expect("writer").add(&doc)
180 }
181
182 pub fn flush_search(&self) -> Result<()> {
183 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
184 writer.commit()?;
185 }
186 Ok(())
187 }
188}