1use crate::collect::hooks::EventKind;
5use crate::core::config;
6use crate::store::Store;
7use crate::{collect, core::event::SessionRecord, prompt};
8use anyhow::Result;
9use serde_json::Value;
10use std::ffi::OsString;
11use std::path::PathBuf;
12
13#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
15#[serde(rename_all = "lowercase")]
16pub enum IngestSource {
17 Cursor,
18 Claude,
19 Openclaw,
20}
21
22impl IngestSource {
23 pub fn parse(s: &str) -> Option<Self> {
24 match s.to_lowercase().as_str() {
25 "cursor" => Some(Self::Cursor),
26 "claude" => Some(Self::Claude),
27 "openclaw" => Some(Self::Openclaw),
28 _ => None,
29 }
30 }
31
32 pub fn agent(self) -> &'static str {
33 match self {
34 Self::Cursor => "cursor",
35 Self::Claude => "claude",
36 Self::Openclaw => "openclaw",
37 }
38 }
39}
40
41pub fn ingest_hook_string(
43 source: IngestSource,
44 input: &str,
45 workspace: Option<PathBuf>,
46) -> Result<String> {
47 ingest_hook_text(source, input, workspace)?;
48 Ok(String::new())
49}
50
51pub fn ingest_hook_text(
53 source: IngestSource,
54 input: &str,
55 workspace: Option<PathBuf>,
56) -> Result<()> {
57 let event = match source {
58 IngestSource::Cursor => collect::hooks::cursor::parse_cursor_hook(input)?,
59 IngestSource::Claude => collect::hooks::claude::parse_claude_hook(input)?,
60 IngestSource::Openclaw => collect::hooks::openclaw::parse_openclaw_hook(input)?,
61 };
62 let ws = workspace.unwrap_or_else(|| std::env::current_dir().expect("cwd"));
63 let cfg = config::load(&ws)?;
64 let sync_ctx = crate::sync::ingest_ctx(&cfg, ws.clone());
65 let db_path = ws.join(".kaizen/kaizen.db");
66 let store = Store::open(&db_path)?;
67 let now_ms = std::time::SystemTime::now()
68 .duration_since(std::time::UNIX_EPOCH)
69 .map(|d| d.as_millis() as u64)
70 .unwrap_or(0);
71 let ts = if event.ts_ms == 0 {
72 now_ms
73 } else {
74 event.ts_ms
75 };
76 let mut event = event;
77 event.ts_ms = ts;
78 let ev = collect::hooks::normalize::hook_to_event(&event, 0);
79 if let Some(status) = collect::hooks::normalize::hook_to_status(&event.kind) {
80 if matches!(event.kind, collect::hooks::EventKind::SessionStart) {
81 let snap = prompt::snapshot::capture(&ws, now_ms).ok();
82 let fingerprint = snap.as_ref().map(|s| s.fingerprint.clone());
83 if let Some(ref s) = snap {
84 let _ = store.upsert_prompt_snapshot(s);
85 }
86 let model = collect::model_from_json::from_value(&event.payload);
87 let env = session_env_fields(&event.payload);
88 let record = SessionRecord {
89 id: event.session_id.clone(),
90 agent: source.agent().to_string(),
91 model,
92 workspace: ws.to_string_lossy().to_string(),
93 started_at_ms: event.ts_ms,
94 ended_at_ms: None,
95 status: status.clone(),
96 trace_path: String::new(),
97 start_commit: None,
98 end_commit: None,
99 branch: None,
100 dirty_start: None,
101 dirty_end: None,
102 repo_binding_source: None,
103 prompt_fingerprint: fingerprint,
104 parent_session_id: None,
105 agent_version: env.0,
106 os: env.1,
107 arch: env.2,
108 repo_file_count: None,
109 repo_total_loc: None,
110 };
111 store.upsert_session(&record)?;
112 } else {
113 store.ensure_session_stub(
114 &event.session_id,
115 source.agent(),
116 &ws.to_string_lossy(),
117 event.ts_ms,
118 )?;
119 if matches!(event.kind, collect::hooks::EventKind::Stop) {
120 maybe_emit_prompt_changed(
121 &store,
122 &event.session_id,
123 &ws,
124 now_ms,
125 &ev,
126 sync_ctx.as_ref(),
127 )?;
128 }
129 store.update_session_status(&event.session_id, status)?;
130 }
131 } else {
132 store.ensure_session_stub(
133 &event.session_id,
134 source.agent(),
135 &ws.to_string_lossy(),
136 event.ts_ms,
137 )?;
138 }
139 store.append_event_with_sync(&ev, sync_ctx.as_ref())?;
140 post_ingest_detached(&event, &cfg, &ws)?;
141 Ok(())
142}
143
144fn post_ingest_detached(
146 event: &collect::hooks::HookEvent,
147 cfg: &config::Config,
148 ws: &std::path::Path,
149) -> Result<()> {
150 if matches!(event.kind, EventKind::Stop) {
151 if cfg.collect.outcomes.enabled {
152 spawn_outcome_measure(ws, &event.session_id);
153 }
154 if cfg.collect.system_sampler.enabled {
155 touch_sampler_stop_file(ws, &event.session_id);
156 }
157 }
158 if matches!(event.kind, EventKind::SessionStart)
159 && cfg.collect.system_sampler.enabled
160 && let Some(pid) = payload_pid(&event.payload)
161 {
162 spawn_sampler_run(ws, &event.session_id, pid);
163 }
164 Ok(())
165}
166
167fn payload_pid(v: &Value) -> Option<u32> {
168 v.get("pid")
169 .and_then(|x| x.as_u64().map(|n| n as u32))
170 .or_else(|| {
171 v.get("pid")
172 .and_then(|x| x.as_i64())
173 .and_then(|i| u32::try_from(i).ok())
174 })
175}
176
177fn spawn_outcome_measure(ws: &std::path::Path, session_id: &str) {
178 let args = vec![
179 OsString::from("outcomes"),
180 OsString::from("measure"),
181 OsString::from("--workspace"),
182 ws.as_os_str().to_owned(),
183 OsString::from("--session"),
184 OsString::from(session_id),
185 ];
186 if let Err(e) = super::kaizen_child::spawn_kaizen_detached(&args) {
187 tracing::warn!(?e, "kaizen outcomes measure");
188 }
189}
190
191fn spawn_sampler_run(ws: &std::path::Path, session_id: &str, pid: u32) {
192 let args = vec![
193 OsString::from("__sampler-run"),
194 OsString::from("--workspace"),
195 ws.as_os_str().to_owned(),
196 OsString::from("--session"),
197 OsString::from(session_id),
198 OsString::from("--pid"),
199 OsString::from(pid.to_string()),
200 ];
201 if let Err(e) = super::kaizen_child::spawn_kaizen_detached(&args) {
202 tracing::warn!(?e, "kaizen sampler");
203 }
204}
205
206fn touch_sampler_stop_file(ws: &std::path::Path, session_id: &str) {
207 let dir = ws.join(".kaizen/sampler-stop");
208 if let Err(e) = std::fs::create_dir_all(&dir) {
209 tracing::warn!(?e, "sampler-stop mkdir");
210 return;
211 }
212 let path = dir.join(session_id);
213 if let Err(e) = std::fs::File::create(&path) {
214 tracing::warn!(?e, "sampler-stop touch");
215 }
216}
217
218fn session_env_fields(payload: &Value) -> (Option<String>, Option<String>, Option<String>) {
219 let ver = [
220 "cursor_version",
221 "claude_version",
222 "agent_version",
223 "version",
224 ]
225 .into_iter()
226 .find_map(|k| {
227 payload
228 .get(k)
229 .and_then(|v| v.as_str())
230 .map(|s| s.to_string())
231 });
232 let os = payload
233 .get("os")
234 .and_then(|v| v.as_str())
235 .map(|s| s.to_string());
236 let arch = payload
237 .get("arch")
238 .and_then(|v| v.as_str())
239 .map(|s| s.to_string());
240 (ver, os, arch)
241}
242
243fn maybe_emit_prompt_changed(
244 store: &Store,
245 session_id: &str,
246 ws: &std::path::Path,
247 now_ms: u64,
248 trigger_ev: &crate::core::event::Event,
249 sync_ctx: Option<&crate::sync::context::SyncIngestContext>,
250) -> Result<()> {
251 let Some(session) = store.get_session(session_id)? else {
252 return Ok(());
253 };
254 let Some(from_fp) = session.prompt_fingerprint else {
255 return Ok(());
256 };
257 let snap = prompt::snapshot::capture(ws, now_ms).ok();
258 let Some(snap) = snap else { return Ok(()) };
259 if snap.fingerprint == from_fp {
260 return Ok(());
261 }
262 let _ = store.upsert_prompt_snapshot(&snap);
263 let changed_ev = crate::core::event::Event {
264 session_id: session_id.to_string(),
265 seq: trigger_ev.seq + 1,
266 ts_ms: now_ms,
267 ts_exact: true,
268 kind: crate::core::event::EventKind::Hook,
269 source: crate::core::event::EventSource::Hook,
270 tool: None,
271 tool_call_id: None,
272 tokens_in: None,
273 tokens_out: None,
274 reasoning_tokens: None,
275 cost_usd_e6: None,
276 stop_reason: None,
277 latency_ms: None,
278 ttft_ms: None,
279 retry_count: None,
280 context_used_tokens: None,
281 context_max_tokens: None,
282 cache_creation_tokens: None,
283 cache_read_tokens: None,
284 system_prompt_tokens: None,
285 payload: serde_json::json!({
286 "kind": "prompt_changed",
287 "from_fingerprint": from_fp,
288 "to_fingerprint": snap.fingerprint,
289 }),
290 };
291 store.append_event_with_sync(&changed_ev, sync_ctx)?;
292 Ok(())
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use tempfile::TempDir;
299
300 fn ws_with_kaizen_dir() -> TempDir {
301 let dir = TempDir::new().unwrap();
302 std::fs::create_dir_all(dir.path().join(".kaizen")).unwrap();
303 dir
304 }
305
306 #[test]
307 fn session_start_records_source_as_agent_not_unknown() {
308 let dir = ws_with_kaizen_dir();
309 let payload =
310 r#"{"hook_event_name":"SessionStart","session_id":"s-agent-1","source":"startup"}"#;
311 ingest_hook_text(
312 IngestSource::Claude,
313 payload,
314 Some(dir.path().to_path_buf()),
315 )
316 .unwrap();
317
318 let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
319 let sessions = db
320 .list_sessions(dir.path().to_string_lossy().as_ref())
321 .unwrap();
322 assert_eq!(sessions.len(), 1);
323 assert_eq!(sessions[0].agent, "claude");
324 }
325
326 #[test]
327 fn missing_timestamp_falls_back_to_now() {
328 let dir = ws_with_kaizen_dir();
329 let payload =
331 r#"{"hook_event_name":"SessionStart","session_id":"s-ts","source":"startup"}"#;
332 ingest_hook_text(
333 IngestSource::Claude,
334 payload,
335 Some(dir.path().to_path_buf()),
336 )
337 .unwrap();
338
339 let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
340 let sessions = db
341 .list_sessions(dir.path().to_string_lossy().as_ref())
342 .unwrap();
343 assert!(sessions[0].started_at_ms > 0, "started_at_ms must not be 0");
344 }
345
346 #[test]
347 fn post_tool_use_without_session_start_auto_provisions_stub() {
348 let dir = ws_with_kaizen_dir();
349 let payload = r#"{"event":"PostToolUse","session_id":"s-stub","tool_name":"Read","tool_input":{"file_path":"/tmp/x"},"tool_response":{"content":"hi"}}"#;
351 ingest_hook_text(
352 IngestSource::Cursor,
353 payload,
354 Some(dir.path().to_path_buf()),
355 )
356 .unwrap();
357
358 let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
359 let sessions = db
360 .list_sessions(dir.path().to_string_lossy().as_ref())
361 .unwrap();
362 assert_eq!(sessions.len(), 1);
363 assert_eq!(sessions[0].agent, "cursor");
364 assert_eq!(sessions[0].id, "s-stub");
365 }
366}