1use super::events::*;
2use super::rows::*;
3use super::*;
4
5impl Store {
6 pub fn next_event_seq(&self, session_id: &str) -> Result<u64> {
8 let n: i64 = self.conn.query_row(
9 "SELECT COALESCE(MAX(seq) + 1, 0) FROM events WHERE session_id = ?1",
10 [session_id],
11 |r| r.get(0),
12 )?;
13 Ok(n as u64)
14 }
15
16 pub fn append_event(&self, e: &Event) -> Result<()> {
17 self.append_event_with_sync(e, None)
18 }
19
20 pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> {
22 self.append_event_inner(e, ctx, true)
23 }
24
25 pub(super) fn append_event_deferred(
26 &self,
27 e: &Event,
28 ctx: Option<&SyncIngestContext>,
29 ) -> Result<()> {
30 self.append_event_inner(e, ctx, false)
31 }
32
33 fn append_event_inner(
34 &self,
35 e: &Event,
36 ctx: Option<&SyncIngestContext>,
37 refresh_session: bool,
38 ) -> Result<()> {
39 let last_before = if projector_legacy_mode() {
40 None
41 } else {
42 self.last_event_seq_for_session(&e.session_id)?
43 };
44 if !projector_legacy_mode() {
45 self.sync_projector_session(&e.session_id, last_before)?;
46 }
47 let payload = serde_json::to_string(&e.payload)?;
48 self.conn.execute(
49 "INSERT INTO events (
50 session_id, seq, ts_ms, ts_exact, kind, source, tool, tool_call_id,
51 tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, payload,
52 stop_reason, latency_ms, ttft_ms, retry_count,
53 context_used_tokens, context_max_tokens,
54 cache_creation_tokens, cache_read_tokens, system_prompt_tokens
55 ) VALUES (
56 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13,
57 ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22
58 )
59 ON CONFLICT(session_id, seq) DO NOTHING",
60 params![
61 e.session_id,
62 e.seq as i64,
63 e.ts_ms as i64,
64 bool_to_i64(e.ts_exact),
65 format!("{:?}", e.kind),
66 format!("{:?}", e.source),
67 e.tool,
68 e.tool_call_id,
69 e.tokens_in.map(|v| v as i64),
70 e.tokens_out.map(|v| v as i64),
71 e.reasoning_tokens.map(|v| v as i64),
72 e.cost_usd_e6,
73 payload,
74 e.stop_reason,
75 e.latency_ms.map(|v| v as i64),
76 e.ttft_ms.map(|v| v as i64),
77 e.retry_count.map(|v| v as i64),
78 e.context_used_tokens.map(|v| v as i64),
79 e.context_max_tokens.map(|v| v as i64),
80 e.cache_creation_tokens.map(|v| v as i64),
81 e.cache_read_tokens.map(|v| v as i64),
82 e.system_prompt_tokens.map(|v| v as i64),
83 ],
84 )?;
85 if self.conn.changes() == 0 {
86 return Ok(());
87 }
88 if projector_legacy_mode() {
89 index_event_derived(&self.conn, e)?;
90 rebuild_tool_spans_for_session(&self.conn, &e.session_id)?;
91 self.invalidate_span_tree_cache();
92 } else if last_before.is_some_and(|last| e.seq <= last) {
93 self.replay_projector_session(&e.session_id)?;
94 } else {
95 let deltas = self.projector.borrow_mut().apply(e);
96 self.apply_projector_events(&deltas)?;
97 let expired = self
98 .projector
99 .borrow_mut()
100 .flush_expired(e.ts_ms, DEFAULT_ORPHAN_TTL_MS);
101 self.apply_projector_events(&expired)?;
102 if is_stop_event(e) {
103 let flushed = self
104 .projector
105 .borrow_mut()
106 .flush_session(&e.session_id, e.ts_ms);
107 self.apply_projector_events(&flushed)?;
108 }
109 self.invalidate_span_tree_cache();
110 }
111 self.append_search_event(e);
112 self.index_extension_event(e)?;
113 if refresh_session {
114 self.apply_live_extension_event(e)?;
115 }
116 let Some(ctx) = ctx else {
117 return Ok(());
118 };
119 let sync = &ctx.sync;
120 if sync.endpoint.is_empty() || sync.team_token.is_empty() || sync.team_id.is_empty() {
121 return Ok(());
122 }
123 let Some(salt) = try_team_salt(sync) else {
124 tracing::warn!(
125 "sync outbox skipped: set sync.team_salt_hex (64 hex chars) in ~/.kaizen/config.toml"
126 );
127 return Ok(());
128 };
129 if sync.sample_rate < 1.0 {
130 let u: f64 = rand::random();
131 if u > sync.sample_rate {
132 return Ok(());
133 }
134 }
135 let Some(session) = self.get_session(&e.session_id)? else {
136 tracing::warn!(session_id = %e.session_id, "sync outbox skipped: session not in DB");
137 return Ok(());
138 };
139 let mut outbound = outbound_event_from_row(e, &session, &salt);
140 redact_payload(&mut outbound.payload, ctx.workspace_root(), &salt);
141 let row = serde_json::to_string(&outbound)?;
142 self.append_outbox_row(&e.session_id, "events", &row)?;
143 enqueue_tool_spans_for_session(self, &e.session_id, ctx)?;
144 Ok(())
145 }
146
147 pub(super) fn append_search_event(&self, e: &Event) {
148 if let Err(err) = self.try_append_search_event(e) {
149 tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}");
150 let _ = self.sync_state_set_u64(SYNC_STATE_SEARCH_DIRTY_MS, now_ms());
151 }
152 }
153
154 pub(super) fn try_append_search_event(&self, e: &Event) -> Result<()> {
155 let Some(session) = self.get_session(&e.session_id)? else {
156 return Ok(());
157 };
158 let workspace = PathBuf::from(&session.workspace);
159 let cfg = crate::core::config::load(&workspace).unwrap_or_default();
160 let salt = try_team_salt(&cfg.sync).unwrap_or([0; 32]);
161 let Some(doc) = crate::search::extract_doc(e, &session, &workspace, &salt) else {
162 return Ok(());
163 };
164 let mut slot = self.search_writer.borrow_mut();
165 if slot.is_none() {
166 *slot = Some(crate::search::PendingWriter::open(&self.root)?);
167 }
168 slot.as_mut().expect("writer").add(&doc)
169 }
170
171 pub fn flush_search(&self) -> Result<()> {
172 if let Some(writer) = self.search_writer.borrow_mut().as_mut() {
173 writer.commit()?;
174 }
175 Ok(())
176 }
177}