Skip to main content

kaizen/shell/
ingest.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen ingest` — hook ingestion (stdin or explicit payload for MCP).
3
4use crate::collect::hooks::EventKind;
5use crate::core::config;
6use crate::store::Store;
7use crate::{collect, core::event::SessionRecord, prompt};
8use anyhow::Result;
9use serde_json::Value;
10use std::ffi::OsString;
11use std::path::PathBuf;
12
13/// Hook source, aligned with the `kaizen ingest hook --source` CLI.
14#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
15#[serde(rename_all = "lowercase")]
16pub enum IngestSource {
17    Cursor,
18    Claude,
19    Openclaw,
20}
21
22impl IngestSource {
23    pub fn parse(s: &str) -> Option<Self> {
24        match s.to_lowercase().as_str() {
25            "cursor" => Some(Self::Cursor),
26            "claude" => Some(Self::Claude),
27            "openclaw" => Some(Self::Openclaw),
28            _ => None,
29        }
30    }
31
32    pub fn agent(self) -> &'static str {
33        match self {
34            Self::Cursor => "cursor",
35            Self::Claude => "claude",
36            Self::Openclaw => "openclaw",
37        }
38    }
39}
40
41/// Process hook JSON (same as stdin for `kaizen ingest hook`). On success, returns empty string (CLI prints nothing).
42pub fn ingest_hook_string(
43    source: IngestSource,
44    input: &str,
45    workspace: Option<PathBuf>,
46) -> Result<String> {
47    ingest_hook_text(source, input, workspace)?;
48    Ok(String::new())
49}
50
51/// Process hook JSON (same as stdin for `kaizen ingest hook`).
52pub fn ingest_hook_text(
53    source: IngestSource,
54    input: &str,
55    workspace: Option<PathBuf>,
56) -> Result<()> {
57    let event = match source {
58        IngestSource::Cursor => collect::hooks::cursor::parse_cursor_hook(input)?,
59        IngestSource::Claude => collect::hooks::claude::parse_claude_hook(input)?,
60        IngestSource::Openclaw => collect::hooks::openclaw::parse_openclaw_hook(input)?,
61    };
62    let ws = workspace.unwrap_or_else(|| std::env::current_dir().expect("cwd"));
63    let cfg = config::load(&ws)?;
64    let sync_ctx = crate::sync::ingest_ctx(&cfg, ws.clone());
65    let db_path = ws.join(".kaizen/kaizen.db");
66    let store = Store::open(&db_path)?;
67    let now_ms = std::time::SystemTime::now()
68        .duration_since(std::time::UNIX_EPOCH)
69        .map(|d| d.as_millis() as u64)
70        .unwrap_or(0);
71    let ts = if event.ts_ms == 0 {
72        now_ms
73    } else {
74        event.ts_ms
75    };
76    let mut event = event;
77    event.ts_ms = ts;
78    let ev = collect::hooks::normalize::hook_to_event(&event, 0);
79    if let Some(status) = collect::hooks::normalize::hook_to_status(&event.kind) {
80        if matches!(event.kind, collect::hooks::EventKind::SessionStart) {
81            let snap = prompt::snapshot::capture(&ws, now_ms).ok();
82            let fingerprint = snap.as_ref().map(|s| s.fingerprint.clone());
83            if let Some(ref s) = snap {
84                let _ = store.upsert_prompt_snapshot(s);
85            }
86            let model = collect::model_from_json::from_value(&event.payload);
87            let env = session_env_fields(&event.payload);
88            let record = SessionRecord {
89                id: event.session_id.clone(),
90                agent: source.agent().to_string(),
91                model,
92                workspace: ws.to_string_lossy().to_string(),
93                started_at_ms: event.ts_ms,
94                ended_at_ms: None,
95                status: status.clone(),
96                trace_path: String::new(),
97                start_commit: None,
98                end_commit: None,
99                branch: None,
100                dirty_start: None,
101                dirty_end: None,
102                repo_binding_source: None,
103                prompt_fingerprint: fingerprint,
104                parent_session_id: None,
105                agent_version: env.0,
106                os: env.1,
107                arch: env.2,
108                repo_file_count: None,
109                repo_total_loc: None,
110            };
111            store.upsert_session(&record)?;
112        } else {
113            store.ensure_session_stub(
114                &event.session_id,
115                source.agent(),
116                &ws.to_string_lossy(),
117                event.ts_ms,
118            )?;
119            if matches!(event.kind, collect::hooks::EventKind::Stop) {
120                maybe_emit_prompt_changed(
121                    &store,
122                    &event.session_id,
123                    &ws,
124                    now_ms,
125                    &ev,
126                    sync_ctx.as_ref(),
127                )?;
128            }
129            store.update_session_status(&event.session_id, status)?;
130        }
131    } else {
132        store.ensure_session_stub(
133            &event.session_id,
134            source.agent(),
135            &ws.to_string_lossy(),
136            event.ts_ms,
137        )?;
138    }
139    store.append_event_with_sync(&ev, sync_ctx.as_ref())?;
140    post_ingest_detached(&event, &cfg, &ws)?;
141    Ok(())
142}
143
144/// Non-blocking sidecars: outcome worker, sampler child, stop file (hooks stay short).
145fn post_ingest_detached(
146    event: &collect::hooks::HookEvent,
147    cfg: &config::Config,
148    ws: &std::path::Path,
149) -> Result<()> {
150    if matches!(event.kind, EventKind::Stop) {
151        if cfg.collect.outcomes.enabled {
152            spawn_outcome_measure(ws, &event.session_id);
153        }
154        if cfg.collect.system_sampler.enabled {
155            touch_sampler_stop_file(ws, &event.session_id);
156        }
157    }
158    if matches!(event.kind, EventKind::SessionStart)
159        && cfg.collect.system_sampler.enabled
160        && let Some(pid) = payload_pid(&event.payload)
161    {
162        spawn_sampler_run(ws, &event.session_id, pid);
163    }
164    Ok(())
165}
166
167fn payload_pid(v: &Value) -> Option<u32> {
168    v.get("pid")
169        .and_then(|x| x.as_u64().map(|n| n as u32))
170        .or_else(|| {
171            v.get("pid")
172                .and_then(|x| x.as_i64())
173                .and_then(|i| u32::try_from(i).ok())
174        })
175}
176
177fn spawn_outcome_measure(ws: &std::path::Path, session_id: &str) {
178    let args = vec![
179        OsString::from("outcomes"),
180        OsString::from("measure"),
181        OsString::from("--workspace"),
182        ws.as_os_str().to_owned(),
183        OsString::from("--session"),
184        OsString::from(session_id),
185    ];
186    if let Err(e) = super::kaizen_child::spawn_kaizen_detached(&args) {
187        tracing::warn!(?e, "kaizen outcomes measure");
188    }
189}
190
191fn spawn_sampler_run(ws: &std::path::Path, session_id: &str, pid: u32) {
192    let args = vec![
193        OsString::from("__sampler-run"),
194        OsString::from("--workspace"),
195        ws.as_os_str().to_owned(),
196        OsString::from("--session"),
197        OsString::from(session_id),
198        OsString::from("--pid"),
199        OsString::from(pid.to_string()),
200    ];
201    if let Err(e) = super::kaizen_child::spawn_kaizen_detached(&args) {
202        tracing::warn!(?e, "kaizen sampler");
203    }
204}
205
206fn touch_sampler_stop_file(ws: &std::path::Path, session_id: &str) {
207    let dir = ws.join(".kaizen/sampler-stop");
208    if let Err(e) = std::fs::create_dir_all(&dir) {
209        tracing::warn!(?e, "sampler-stop mkdir");
210        return;
211    }
212    let path = dir.join(session_id);
213    if let Err(e) = std::fs::File::create(&path) {
214        tracing::warn!(?e, "sampler-stop touch");
215    }
216}
217
218fn session_env_fields(payload: &Value) -> (Option<String>, Option<String>, Option<String>) {
219    let ver = [
220        "cursor_version",
221        "claude_version",
222        "agent_version",
223        "version",
224    ]
225    .into_iter()
226    .find_map(|k| {
227        payload
228            .get(k)
229            .and_then(|v| v.as_str())
230            .map(|s| s.to_string())
231    });
232    let os = payload
233        .get("os")
234        .and_then(|v| v.as_str())
235        .map(|s| s.to_string());
236    let arch = payload
237        .get("arch")
238        .and_then(|v| v.as_str())
239        .map(|s| s.to_string());
240    (ver, os, arch)
241}
242
243fn maybe_emit_prompt_changed(
244    store: &Store,
245    session_id: &str,
246    ws: &std::path::Path,
247    now_ms: u64,
248    trigger_ev: &crate::core::event::Event,
249    sync_ctx: Option<&crate::sync::context::SyncIngestContext>,
250) -> Result<()> {
251    let Some(session) = store.get_session(session_id)? else {
252        return Ok(());
253    };
254    let Some(from_fp) = session.prompt_fingerprint else {
255        return Ok(());
256    };
257    let snap = prompt::snapshot::capture(ws, now_ms).ok();
258    let Some(snap) = snap else { return Ok(()) };
259    if snap.fingerprint == from_fp {
260        return Ok(());
261    }
262    let _ = store.upsert_prompt_snapshot(&snap);
263    let changed_ev = crate::core::event::Event {
264        session_id: session_id.to_string(),
265        seq: trigger_ev.seq + 1,
266        ts_ms: now_ms,
267        ts_exact: true,
268        kind: crate::core::event::EventKind::Hook,
269        source: crate::core::event::EventSource::Hook,
270        tool: None,
271        tool_call_id: None,
272        tokens_in: None,
273        tokens_out: None,
274        reasoning_tokens: None,
275        cost_usd_e6: None,
276        stop_reason: None,
277        latency_ms: None,
278        ttft_ms: None,
279        retry_count: None,
280        context_used_tokens: None,
281        context_max_tokens: None,
282        cache_creation_tokens: None,
283        cache_read_tokens: None,
284        system_prompt_tokens: None,
285        payload: serde_json::json!({
286            "kind": "prompt_changed",
287            "from_fingerprint": from_fp,
288            "to_fingerprint": snap.fingerprint,
289        }),
290    };
291    store.append_event_with_sync(&changed_ev, sync_ctx)?;
292    Ok(())
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use tempfile::TempDir;
299
300    fn ws_with_kaizen_dir() -> TempDir {
301        let dir = TempDir::new().unwrap();
302        std::fs::create_dir_all(dir.path().join(".kaizen")).unwrap();
303        dir
304    }
305
306    #[test]
307    fn session_start_records_source_as_agent_not_unknown() {
308        let dir = ws_with_kaizen_dir();
309        let payload =
310            r#"{"hook_event_name":"SessionStart","session_id":"s-agent-1","source":"startup"}"#;
311        ingest_hook_text(
312            IngestSource::Claude,
313            payload,
314            Some(dir.path().to_path_buf()),
315        )
316        .unwrap();
317
318        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
319        let sessions = db
320            .list_sessions(dir.path().to_string_lossy().as_ref())
321            .unwrap();
322        assert_eq!(sessions.len(), 1);
323        assert_eq!(sessions[0].agent, "claude");
324    }
325
326    #[test]
327    fn missing_timestamp_falls_back_to_now() {
328        let dir = ws_with_kaizen_dir();
329        // No timestamp_ms field — Claude Code never sends one.
330        let payload =
331            r#"{"hook_event_name":"SessionStart","session_id":"s-ts","source":"startup"}"#;
332        ingest_hook_text(
333            IngestSource::Claude,
334            payload,
335            Some(dir.path().to_path_buf()),
336        )
337        .unwrap();
338
339        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
340        let sessions = db
341            .list_sessions(dir.path().to_string_lossy().as_ref())
342            .unwrap();
343        assert!(sessions[0].started_at_ms > 0, "started_at_ms must not be 0");
344    }
345
346    #[test]
347    fn post_tool_use_without_session_start_auto_provisions_stub() {
348        let dir = ws_with_kaizen_dir();
349        // Hooks installed mid-session: first event is PostToolUse, no SessionStart.
350        let payload = r#"{"event":"PostToolUse","session_id":"s-stub","tool_name":"Read","tool_input":{"file_path":"/tmp/x"},"tool_response":{"content":"hi"}}"#;
351        ingest_hook_text(
352            IngestSource::Cursor,
353            payload,
354            Some(dir.path().to_path_buf()),
355        )
356        .unwrap();
357
358        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
359        let sessions = db
360            .list_sessions(dir.path().to_string_lossy().as_ref())
361            .unwrap();
362        assert_eq!(sessions.len(), 1);
363        assert_eq!(sessions[0].agent, "cursor");
364        assert_eq!(sessions[0].id, "s-stub");
365    }
366}