1use crate::collect::hooks::EventKind;
5use crate::core::config;
6use crate::store::Store;
7use crate::{collect, core::event::SessionRecord, prompt};
8use anyhow::Result;
9use serde_json::Value;
10use std::ffi::OsString;
11use std::path::PathBuf;
12
13#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
15#[serde(rename_all = "lowercase")]
16pub enum IngestSource {
17 Cursor,
18 Claude,
19 Openclaw,
20 Vibe,
21}
22
23impl IngestSource {
24 pub fn parse(s: &str) -> Option<Self> {
25 match s.to_lowercase().as_str() {
26 "cursor" => Some(Self::Cursor),
27 "claude" => Some(Self::Claude),
28 "openclaw" => Some(Self::Openclaw),
29 "vibe" => Some(Self::Vibe),
30 _ => None,
31 }
32 }
33
34 pub fn agent(self) -> &'static str {
35 match self {
36 Self::Cursor => "cursor",
37 Self::Claude => "claude",
38 Self::Openclaw => "openclaw",
39 Self::Vibe => "vibe",
40 }
41 }
42}
43
44pub fn ingest_hook_string(
46 source: IngestSource,
47 input: &str,
48 workspace: Option<PathBuf>,
49) -> Result<String> {
50 ingest_hook_text(source, input, workspace)?;
51 Ok(String::new())
52}
53
54pub fn ingest_hook_text(
56 source: IngestSource,
57 input: &str,
58 workspace: Option<PathBuf>,
59) -> Result<()> {
60 let event = match source {
61 IngestSource::Cursor => collect::hooks::cursor::parse_cursor_hook(input)?,
62 IngestSource::Claude => collect::hooks::claude::parse_claude_hook(input)?,
63 IngestSource::Openclaw => collect::hooks::openclaw::parse_openclaw_hook(input)?,
64 IngestSource::Vibe => collect::hooks::vibe::parse_vibe_hook(input)?,
65 };
66 let ws = workspace.unwrap_or_else(|| std::env::current_dir().expect("cwd"));
67 let cfg = config::load(&ws)?;
68 let sync_ctx = crate::sync::ingest_ctx(&cfg, ws.clone());
69 let db_path = crate::core::workspace::db_path(&ws)?;
70 let store = Store::open(&db_path)?;
71 let now_ms = std::time::SystemTime::now()
72 .duration_since(std::time::UNIX_EPOCH)
73 .map(|d| d.as_millis() as u64)
74 .unwrap_or(0);
75 let ts = if event.ts_ms == 0 {
76 now_ms
77 } else {
78 event.ts_ms
79 };
80 let mut event = event;
81 event.ts_ms = ts;
82 let seq = store.next_event_seq(&event.session_id)?;
83 let ev = collect::hooks::normalize::hook_to_event(&event, seq);
84 if let Some(status) = collect::hooks::normalize::hook_to_status(&event.kind) {
85 if matches!(event.kind, collect::hooks::EventKind::SessionStart) {
86 let snap = prompt::snapshot::capture(&ws, now_ms).ok();
87 let fingerprint = snap.as_ref().map(|s| s.fingerprint.clone());
88 if let Some(ref s) = snap {
89 let _ = store.upsert_prompt_snapshot(s);
90 }
91 let model = collect::model_from_json::from_value(&event.payload);
92 let env = session_env_fields(&event.payload);
93 let record = SessionRecord {
94 id: event.session_id.clone(),
95 agent: source.agent().to_string(),
96 model,
97 workspace: ws.to_string_lossy().to_string(),
98 started_at_ms: event.ts_ms,
99 ended_at_ms: None,
100 status: status.clone(),
101 trace_path: String::new(),
102 start_commit: None,
103 end_commit: None,
104 branch: None,
105 dirty_start: None,
106 dirty_end: None,
107 repo_binding_source: None,
108 prompt_fingerprint: fingerprint,
109 parent_session_id: None,
110 agent_version: env.0,
111 os: env.1,
112 arch: env.2,
113 repo_file_count: None,
114 repo_total_loc: None,
115 };
116 store.upsert_session(&record)?;
117 } else {
118 store.ensure_session_stub(
119 &event.session_id,
120 source.agent(),
121 &ws.to_string_lossy(),
122 event.ts_ms,
123 )?;
124 if matches!(event.kind, collect::hooks::EventKind::Stop) {
125 maybe_emit_prompt_changed(
126 &store,
127 &event.session_id,
128 &ws,
129 now_ms,
130 &ev,
131 sync_ctx.as_ref(),
132 )?;
133 }
134 store.update_session_status(&event.session_id, status)?;
135 }
136 } else {
137 store.ensure_session_stub(
138 &event.session_id,
139 source.agent(),
140 &ws.to_string_lossy(),
141 event.ts_ms,
142 )?;
143 }
144 store.append_event_with_sync(&ev, sync_ctx.as_ref())?;
145 post_ingest_detached(&event, &cfg, &ws)?;
146 Ok(())
147}
148
149fn post_ingest_detached(
151 event: &collect::hooks::HookEvent,
152 cfg: &config::Config,
153 ws: &std::path::Path,
154) -> Result<()> {
155 if matches!(event.kind, EventKind::Stop) {
156 if cfg.collect.outcomes.enabled {
157 spawn_outcome_measure(ws, &event.session_id);
158 }
159 if cfg.collect.system_sampler.enabled {
160 touch_sampler_stop_file(ws, &event.session_id);
161 }
162 }
163 if matches!(event.kind, EventKind::SessionStart)
164 && cfg.collect.system_sampler.enabled
165 && let Some(pid) = payload_pid(&event.payload)
166 {
167 spawn_sampler_run(ws, &event.session_id, pid);
168 }
169 Ok(())
170}
171
172fn payload_pid(v: &Value) -> Option<u32> {
173 v.get("pid")
174 .and_then(|x| x.as_u64().map(|n| n as u32))
175 .or_else(|| {
176 v.get("pid")
177 .and_then(|x| x.as_i64())
178 .and_then(|i| u32::try_from(i).ok())
179 })
180}
181
182fn spawn_outcome_measure(ws: &std::path::Path, session_id: &str) {
183 let args = vec![
184 OsString::from("outcomes"),
185 OsString::from("measure"),
186 OsString::from("--workspace"),
187 ws.as_os_str().to_owned(),
188 OsString::from("--session"),
189 OsString::from(session_id),
190 ];
191 if let Err(e) = super::kaizen_child::spawn_kaizen_detached(&args) {
192 tracing::warn!(?e, "kaizen outcomes measure");
193 }
194}
195
196fn spawn_sampler_run(ws: &std::path::Path, session_id: &str, pid: u32) {
197 let args = vec![
198 OsString::from("__sampler-run"),
199 OsString::from("--workspace"),
200 ws.as_os_str().to_owned(),
201 OsString::from("--session"),
202 OsString::from(session_id),
203 OsString::from("--pid"),
204 OsString::from(pid.to_string()),
205 ];
206 if let Err(e) = super::kaizen_child::spawn_kaizen_detached(&args) {
207 tracing::warn!(?e, "kaizen sampler");
208 }
209}
210
211fn touch_sampler_stop_file(ws: &std::path::Path, session_id: &str) {
212 let dir = match crate::core::paths::project_data_dir(ws) {
213 Ok(d) => d.join("sampler-stop"),
214 Err(e) => {
215 tracing::warn!(?e, "sampler-stop: no data dir");
216 return;
217 }
218 };
219 if let Err(e) = std::fs::create_dir_all(&dir) {
220 tracing::warn!(?e, "sampler-stop mkdir");
221 return;
222 }
223 let path = dir.join(session_id);
224 if let Err(e) = std::fs::File::create(&path) {
225 tracing::warn!(?e, "sampler-stop touch");
226 }
227}
228
229fn session_env_fields(payload: &Value) -> (Option<String>, Option<String>, Option<String>) {
230 let ver = [
231 "cursor_version",
232 "claude_version",
233 "agent_version",
234 "version",
235 ]
236 .into_iter()
237 .find_map(|k| {
238 payload
239 .get(k)
240 .and_then(|v| v.as_str())
241 .map(|s| s.to_string())
242 });
243 let os = payload
244 .get("os")
245 .and_then(|v| v.as_str())
246 .map(|s| s.to_string());
247 let arch = payload
248 .get("arch")
249 .and_then(|v| v.as_str())
250 .map(|s| s.to_string());
251 (ver, os, arch)
252}
253
254fn maybe_emit_prompt_changed(
255 store: &Store,
256 session_id: &str,
257 ws: &std::path::Path,
258 now_ms: u64,
259 trigger_ev: &crate::core::event::Event,
260 sync_ctx: Option<&crate::sync::context::SyncIngestContext>,
261) -> Result<()> {
262 let Some(session) = store.get_session(session_id)? else {
263 return Ok(());
264 };
265 let Some(from_fp) = session.prompt_fingerprint else {
266 return Ok(());
267 };
268 let snap = prompt::snapshot::capture(ws, now_ms).ok();
269 let Some(snap) = snap else { return Ok(()) };
270 if snap.fingerprint == from_fp {
271 return Ok(());
272 }
273 let _ = store.upsert_prompt_snapshot(&snap);
274 let changed_ev = crate::core::event::Event {
275 session_id: session_id.to_string(),
276 seq: trigger_ev.seq + 1,
277 ts_ms: now_ms,
278 ts_exact: true,
279 kind: crate::core::event::EventKind::Hook,
280 source: crate::core::event::EventSource::Hook,
281 tool: None,
282 tool_call_id: None,
283 tokens_in: None,
284 tokens_out: None,
285 reasoning_tokens: None,
286 cost_usd_e6: None,
287 stop_reason: None,
288 latency_ms: None,
289 ttft_ms: None,
290 retry_count: None,
291 context_used_tokens: None,
292 context_max_tokens: None,
293 cache_creation_tokens: None,
294 cache_read_tokens: None,
295 system_prompt_tokens: None,
296 payload: serde_json::json!({
297 "kind": "prompt_changed",
298 "from_fingerprint": from_fp,
299 "to_fingerprint": snap.fingerprint,
300 }),
301 };
302 store.append_event_with_sync(&changed_ev, sync_ctx)?;
303 Ok(())
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use crate::core::paths::test_lock;
310 use tempfile::TempDir;
311
312 fn setup_ws() -> (TempDir, TempDir) {
313 let home = TempDir::new().unwrap();
314 let ws = TempDir::new().unwrap();
315 unsafe { std::env::set_var("KAIZEN_HOME", home.path()) };
316 (home, ws)
317 }
318
319 #[test]
320 fn session_start_records_source_as_agent_not_unknown() {
321 let _guard = test_lock::global().lock().unwrap();
322 let (_home, ws) = setup_ws();
323 let payload =
324 r#"{"hook_event_name":"SessionStart","session_id":"s-agent-1","source":"startup"}"#;
325 ingest_hook_text(IngestSource::Claude, payload, Some(ws.path().to_path_buf())).unwrap();
326 let db = Store::open(&crate::core::workspace::db_path(ws.path()).unwrap()).unwrap();
327 let sessions = db
328 .list_sessions(ws.path().to_string_lossy().as_ref())
329 .unwrap();
330 unsafe { std::env::remove_var("KAIZEN_HOME") };
331 assert_eq!(sessions.len(), 1);
332 assert_eq!(sessions[0].agent, "claude");
333 }
334
335 #[test]
336 fn missing_timestamp_falls_back_to_now() {
337 let _guard = test_lock::global().lock().unwrap();
338 let (_home, ws) = setup_ws();
339 let payload =
340 r#"{"hook_event_name":"SessionStart","session_id":"s-ts","source":"startup"}"#;
341 ingest_hook_text(IngestSource::Claude, payload, Some(ws.path().to_path_buf())).unwrap();
342 let db = Store::open(&crate::core::workspace::db_path(ws.path()).unwrap()).unwrap();
343 let sessions = db
344 .list_sessions(ws.path().to_string_lossy().as_ref())
345 .unwrap();
346 unsafe { std::env::remove_var("KAIZEN_HOME") };
347 assert!(sessions[0].started_at_ms > 0, "started_at_ms must not be 0");
348 }
349
350 #[test]
351 fn post_tool_use_without_session_start_auto_provisions_stub() {
352 let _guard = test_lock::global().lock().unwrap();
353 let (_home, ws) = setup_ws();
354 let payload = r#"{"event":"PostToolUse","session_id":"s-stub","tool_name":"Read","tool_input":{"file_path":"/tmp/x"},"tool_response":{"content":"hi"}}"#;
355 ingest_hook_text(IngestSource::Cursor, payload, Some(ws.path().to_path_buf())).unwrap();
356 let db = Store::open(&crate::core::workspace::db_path(ws.path()).unwrap()).unwrap();
357 let sessions = db
358 .list_sessions(ws.path().to_string_lossy().as_ref())
359 .unwrap();
360 unsafe { std::env::remove_var("KAIZEN_HOME") };
361 assert_eq!(sessions.len(), 1);
362 assert_eq!(sessions[0].agent, "cursor");
363 assert_eq!(sessions[0].id, "s-stub");
364 }
365}