Skip to main content

kaizen/shell/
ingest.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen ingest` — hook ingestion (stdin or explicit payload for MCP).
3
4use crate::collect::hooks::EventKind;
5use crate::core::config;
6use crate::store::Store;
7use crate::{collect, core::event::SessionRecord, prompt};
8use anyhow::Result;
9use serde_json::Value;
10use std::ffi::OsString;
11use std::path::PathBuf;
12
13/// Hook source, aligned with the `kaizen ingest hook --source` CLI.
14#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
15#[serde(rename_all = "lowercase")]
16pub enum IngestSource {
17    Cursor,
18    Claude,
19    Openclaw,
20    Vibe,
21}
22
23impl IngestSource {
24    pub fn parse(s: &str) -> Option<Self> {
25        match s.to_lowercase().as_str() {
26            "cursor" => Some(Self::Cursor),
27            "claude" => Some(Self::Claude),
28            "openclaw" => Some(Self::Openclaw),
29            "vibe" => Some(Self::Vibe),
30            _ => None,
31        }
32    }
33
34    pub fn agent(self) -> &'static str {
35        match self {
36            Self::Cursor => "cursor",
37            Self::Claude => "claude",
38            Self::Openclaw => "openclaw",
39            Self::Vibe => "vibe",
40        }
41    }
42}
43
44/// Process hook JSON (same as stdin for `kaizen ingest hook`). On success, returns empty string (CLI prints nothing).
45pub fn ingest_hook_string(
46    source: IngestSource,
47    input: &str,
48    workspace: Option<PathBuf>,
49) -> Result<String> {
50    ingest_hook_text(source, input, workspace)?;
51    Ok(String::new())
52}
53
54/// Process hook JSON (same as stdin for `kaizen ingest hook`).
55pub fn ingest_hook_text(
56    source: IngestSource,
57    input: &str,
58    workspace: Option<PathBuf>,
59) -> Result<()> {
60    let event = match source {
61        IngestSource::Cursor => collect::hooks::cursor::parse_cursor_hook(input)?,
62        IngestSource::Claude => collect::hooks::claude::parse_claude_hook(input)?,
63        IngestSource::Openclaw => collect::hooks::openclaw::parse_openclaw_hook(input)?,
64        IngestSource::Vibe => collect::hooks::vibe::parse_vibe_hook(input)?,
65    };
66    let ws = workspace.unwrap_or_else(|| std::env::current_dir().expect("cwd"));
67    let cfg = config::load(&ws)?;
68    let sync_ctx = crate::sync::ingest_ctx(&cfg, ws.clone());
69    let db_path = crate::core::workspace::db_path(&ws)?;
70    let store = Store::open(&db_path)?;
71    let now_ms = std::time::SystemTime::now()
72        .duration_since(std::time::UNIX_EPOCH)
73        .map(|d| d.as_millis() as u64)
74        .unwrap_or(0);
75    let ts = if event.ts_ms == 0 {
76        now_ms
77    } else {
78        event.ts_ms
79    };
80    let mut event = event;
81    event.ts_ms = ts;
82    let seq = store.next_event_seq(&event.session_id)?;
83    let ev = collect::hooks::normalize::hook_to_event(&event, seq);
84    if let Some(status) = collect::hooks::normalize::hook_to_status(&event.kind) {
85        if matches!(event.kind, collect::hooks::EventKind::SessionStart) {
86            let snap = prompt::snapshot::capture(&ws, now_ms).ok();
87            let fingerprint = snap.as_ref().map(|s| s.fingerprint.clone());
88            if let Some(ref s) = snap {
89                let _ = store.upsert_prompt_snapshot(s);
90            }
91            let model = collect::model_from_json::from_value(&event.payload);
92            let env = session_env_fields(&event.payload);
93            let record = SessionRecord {
94                id: event.session_id.clone(),
95                agent: source.agent().to_string(),
96                model,
97                workspace: ws.to_string_lossy().to_string(),
98                started_at_ms: event.ts_ms,
99                ended_at_ms: None,
100                status: status.clone(),
101                trace_path: String::new(),
102                start_commit: None,
103                end_commit: None,
104                branch: None,
105                dirty_start: None,
106                dirty_end: None,
107                repo_binding_source: None,
108                prompt_fingerprint: fingerprint,
109                parent_session_id: None,
110                agent_version: env.0,
111                os: env.1,
112                arch: env.2,
113                repo_file_count: None,
114                repo_total_loc: None,
115            };
116            store.upsert_session(&record)?;
117        } else {
118            store.ensure_session_stub(
119                &event.session_id,
120                source.agent(),
121                &ws.to_string_lossy(),
122                event.ts_ms,
123            )?;
124            if matches!(event.kind, collect::hooks::EventKind::Stop) {
125                maybe_emit_prompt_changed(
126                    &store,
127                    &event.session_id,
128                    &ws,
129                    now_ms,
130                    &ev,
131                    sync_ctx.as_ref(),
132                )?;
133            }
134            store.update_session_status(&event.session_id, status)?;
135        }
136    } else {
137        store.ensure_session_stub(
138            &event.session_id,
139            source.agent(),
140            &ws.to_string_lossy(),
141            event.ts_ms,
142        )?;
143    }
144    store.append_event_with_sync(&ev, sync_ctx.as_ref())?;
145    post_ingest_detached(&event, &cfg, &ws)?;
146    Ok(())
147}
148
149/// Non-blocking sidecars: outcome worker, sampler child, stop file (hooks stay short).
150fn post_ingest_detached(
151    event: &collect::hooks::HookEvent,
152    cfg: &config::Config,
153    ws: &std::path::Path,
154) -> Result<()> {
155    if matches!(event.kind, EventKind::Stop) {
156        if cfg.collect.outcomes.enabled {
157            spawn_outcome_measure(ws, &event.session_id);
158        }
159        if cfg.collect.system_sampler.enabled {
160            touch_sampler_stop_file(ws, &event.session_id);
161        }
162    }
163    if matches!(event.kind, EventKind::SessionStart)
164        && cfg.collect.system_sampler.enabled
165        && let Some(pid) = payload_pid(&event.payload)
166    {
167        spawn_sampler_run(ws, &event.session_id, pid);
168    }
169    Ok(())
170}
171
172fn payload_pid(v: &Value) -> Option<u32> {
173    v.get("pid")
174        .and_then(|x| x.as_u64().map(|n| n as u32))
175        .or_else(|| {
176            v.get("pid")
177                .and_then(|x| x.as_i64())
178                .and_then(|i| u32::try_from(i).ok())
179        })
180}
181
182fn spawn_outcome_measure(ws: &std::path::Path, session_id: &str) {
183    let args = vec![
184        OsString::from("outcomes"),
185        OsString::from("measure"),
186        OsString::from("--workspace"),
187        ws.as_os_str().to_owned(),
188        OsString::from("--session"),
189        OsString::from(session_id),
190    ];
191    if let Err(e) = super::kaizen_child::spawn_kaizen_detached(&args) {
192        tracing::warn!(?e, "kaizen outcomes measure");
193    }
194}
195
196fn spawn_sampler_run(ws: &std::path::Path, session_id: &str, pid: u32) {
197    let args = vec![
198        OsString::from("__sampler-run"),
199        OsString::from("--workspace"),
200        ws.as_os_str().to_owned(),
201        OsString::from("--session"),
202        OsString::from(session_id),
203        OsString::from("--pid"),
204        OsString::from(pid.to_string()),
205    ];
206    if let Err(e) = super::kaizen_child::spawn_kaizen_detached(&args) {
207        tracing::warn!(?e, "kaizen sampler");
208    }
209}
210
211fn touch_sampler_stop_file(ws: &std::path::Path, session_id: &str) {
212    let dir = match crate::core::paths::project_data_dir(ws) {
213        Ok(d) => d.join("sampler-stop"),
214        Err(e) => {
215            tracing::warn!(?e, "sampler-stop: no data dir");
216            return;
217        }
218    };
219    if let Err(e) = std::fs::create_dir_all(&dir) {
220        tracing::warn!(?e, "sampler-stop mkdir");
221        return;
222    }
223    let path = dir.join(session_id);
224    if let Err(e) = std::fs::File::create(&path) {
225        tracing::warn!(?e, "sampler-stop touch");
226    }
227}
228
229fn session_env_fields(payload: &Value) -> (Option<String>, Option<String>, Option<String>) {
230    let ver = [
231        "cursor_version",
232        "claude_version",
233        "agent_version",
234        "version",
235    ]
236    .into_iter()
237    .find_map(|k| {
238        payload
239            .get(k)
240            .and_then(|v| v.as_str())
241            .map(|s| s.to_string())
242    });
243    let os = payload
244        .get("os")
245        .and_then(|v| v.as_str())
246        .map(|s| s.to_string());
247    let arch = payload
248        .get("arch")
249        .and_then(|v| v.as_str())
250        .map(|s| s.to_string());
251    (ver, os, arch)
252}
253
254fn maybe_emit_prompt_changed(
255    store: &Store,
256    session_id: &str,
257    ws: &std::path::Path,
258    now_ms: u64,
259    trigger_ev: &crate::core::event::Event,
260    sync_ctx: Option<&crate::sync::context::SyncIngestContext>,
261) -> Result<()> {
262    let Some(session) = store.get_session(session_id)? else {
263        return Ok(());
264    };
265    let Some(from_fp) = session.prompt_fingerprint else {
266        return Ok(());
267    };
268    let snap = prompt::snapshot::capture(ws, now_ms).ok();
269    let Some(snap) = snap else { return Ok(()) };
270    if snap.fingerprint == from_fp {
271        return Ok(());
272    }
273    let _ = store.upsert_prompt_snapshot(&snap);
274    let changed_ev = crate::core::event::Event {
275        session_id: session_id.to_string(),
276        seq: trigger_ev.seq + 1,
277        ts_ms: now_ms,
278        ts_exact: true,
279        kind: crate::core::event::EventKind::Hook,
280        source: crate::core::event::EventSource::Hook,
281        tool: None,
282        tool_call_id: None,
283        tokens_in: None,
284        tokens_out: None,
285        reasoning_tokens: None,
286        cost_usd_e6: None,
287        stop_reason: None,
288        latency_ms: None,
289        ttft_ms: None,
290        retry_count: None,
291        context_used_tokens: None,
292        context_max_tokens: None,
293        cache_creation_tokens: None,
294        cache_read_tokens: None,
295        system_prompt_tokens: None,
296        payload: serde_json::json!({
297            "kind": "prompt_changed",
298            "from_fingerprint": from_fp,
299            "to_fingerprint": snap.fingerprint,
300        }),
301    };
302    store.append_event_with_sync(&changed_ev, sync_ctx)?;
303    Ok(())
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::core::paths::test_lock;
310    use tempfile::TempDir;
311
312    fn setup_ws() -> (TempDir, TempDir) {
313        let home = TempDir::new().unwrap();
314        let ws = TempDir::new().unwrap();
315        unsafe { std::env::set_var("KAIZEN_HOME", home.path()) };
316        (home, ws)
317    }
318
319    #[test]
320    fn session_start_records_source_as_agent_not_unknown() {
321        let _guard = test_lock::global().lock().unwrap();
322        let (_home, ws) = setup_ws();
323        let payload =
324            r#"{"hook_event_name":"SessionStart","session_id":"s-agent-1","source":"startup"}"#;
325        ingest_hook_text(IngestSource::Claude, payload, Some(ws.path().to_path_buf())).unwrap();
326        let db = Store::open(&crate::core::workspace::db_path(ws.path()).unwrap()).unwrap();
327        let sessions = db
328            .list_sessions(ws.path().to_string_lossy().as_ref())
329            .unwrap();
330        unsafe { std::env::remove_var("KAIZEN_HOME") };
331        assert_eq!(sessions.len(), 1);
332        assert_eq!(sessions[0].agent, "claude");
333    }
334
335    #[test]
336    fn missing_timestamp_falls_back_to_now() {
337        let _guard = test_lock::global().lock().unwrap();
338        let (_home, ws) = setup_ws();
339        let payload =
340            r#"{"hook_event_name":"SessionStart","session_id":"s-ts","source":"startup"}"#;
341        ingest_hook_text(IngestSource::Claude, payload, Some(ws.path().to_path_buf())).unwrap();
342        let db = Store::open(&crate::core::workspace::db_path(ws.path()).unwrap()).unwrap();
343        let sessions = db
344            .list_sessions(ws.path().to_string_lossy().as_ref())
345            .unwrap();
346        unsafe { std::env::remove_var("KAIZEN_HOME") };
347        assert!(sessions[0].started_at_ms > 0, "started_at_ms must not be 0");
348    }
349
350    #[test]
351    fn post_tool_use_without_session_start_auto_provisions_stub() {
352        let _guard = test_lock::global().lock().unwrap();
353        let (_home, ws) = setup_ws();
354        let payload = r#"{"event":"PostToolUse","session_id":"s-stub","tool_name":"Read","tool_input":{"file_path":"/tmp/x"},"tool_response":{"content":"hi"}}"#;
355        ingest_hook_text(IngestSource::Cursor, payload, Some(ws.path().to_path_buf())).unwrap();
356        let db = Store::open(&crate::core::workspace::db_path(ws.path()).unwrap()).unwrap();
357        let sessions = db
358            .list_sessions(ws.path().to_string_lossy().as_ref())
359            .unwrap();
360        unsafe { std::env::remove_var("KAIZEN_HOME") };
361        assert_eq!(sessions.len(), 1);
362        assert_eq!(sessions[0].agent, "cursor");
363        assert_eq!(sessions[0].id, "s-stub");
364    }
365}