Skip to main content

kaizen/shell/
ingest.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen ingest` — hook ingestion (stdin or explicit payload for MCP).
3
4use crate::core::config;
5use crate::store::Store;
6use crate::{collect, core::event::SessionRecord, prompt};
7use anyhow::Result;
8use std::path::PathBuf;
9
10/// Hook source, aligned with the `kaizen ingest hook --source` CLI.
11#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
12#[serde(rename_all = "lowercase")]
13pub enum IngestSource {
14    Cursor,
15    Claude,
16    Openclaw,
17}
18
19impl IngestSource {
20    pub fn parse(s: &str) -> Option<Self> {
21        match s.to_lowercase().as_str() {
22            "cursor" => Some(Self::Cursor),
23            "claude" => Some(Self::Claude),
24            "openclaw" => Some(Self::Openclaw),
25            _ => None,
26        }
27    }
28
29    pub fn agent(self) -> &'static str {
30        match self {
31            Self::Cursor => "cursor",
32            Self::Claude => "claude",
33            Self::Openclaw => "openclaw",
34        }
35    }
36}
37
38/// Process hook JSON (same as stdin for `kaizen ingest hook`). On success, returns empty string (CLI prints nothing).
39pub fn ingest_hook_string(
40    source: IngestSource,
41    input: &str,
42    workspace: Option<PathBuf>,
43) -> Result<String> {
44    ingest_hook_text(source, input, workspace)?;
45    Ok(String::new())
46}
47
48/// Process hook JSON (same as stdin for `kaizen ingest hook`).
49pub fn ingest_hook_text(
50    source: IngestSource,
51    input: &str,
52    workspace: Option<PathBuf>,
53) -> Result<()> {
54    let event = match source {
55        IngestSource::Cursor => collect::hooks::cursor::parse_cursor_hook(input)?,
56        IngestSource::Claude => collect::hooks::claude::parse_claude_hook(input)?,
57        IngestSource::Openclaw => collect::hooks::openclaw::parse_openclaw_hook(input)?,
58    };
59    let ws = workspace.unwrap_or_else(|| std::env::current_dir().expect("cwd"));
60    let cfg = config::load(&ws)?;
61    let sync_ctx = crate::sync::ingest_ctx(&cfg, ws.clone());
62    let db_path = ws.join(".kaizen/kaizen.db");
63    let store = Store::open(&db_path)?;
64    let now_ms = std::time::SystemTime::now()
65        .duration_since(std::time::UNIX_EPOCH)
66        .map(|d| d.as_millis() as u64)
67        .unwrap_or(0);
68    let ts = if event.ts_ms == 0 {
69        now_ms
70    } else {
71        event.ts_ms
72    };
73    let mut event = event;
74    event.ts_ms = ts;
75    let ev = collect::hooks::normalize::hook_to_event(&event, 0);
76    if let Some(status) = collect::hooks::normalize::hook_to_status(&event.kind) {
77        if matches!(event.kind, collect::hooks::EventKind::SessionStart) {
78            let snap = prompt::snapshot::capture(&ws, now_ms).ok();
79            let fingerprint = snap.as_ref().map(|s| s.fingerprint.clone());
80            if let Some(ref s) = snap {
81                let _ = store.upsert_prompt_snapshot(s);
82            }
83            let model = collect::model_from_json::from_value(&event.payload);
84            let record = SessionRecord {
85                id: event.session_id.clone(),
86                agent: source.agent().to_string(),
87                model,
88                workspace: ws.to_string_lossy().to_string(),
89                started_at_ms: event.ts_ms,
90                ended_at_ms: None,
91                status: status.clone(),
92                trace_path: String::new(),
93                start_commit: None,
94                end_commit: None,
95                branch: None,
96                dirty_start: None,
97                dirty_end: None,
98                repo_binding_source: None,
99                prompt_fingerprint: fingerprint,
100            };
101            store.upsert_session(&record)?;
102        } else {
103            store.ensure_session_stub(
104                &event.session_id,
105                source.agent(),
106                &ws.to_string_lossy(),
107                event.ts_ms,
108            )?;
109            if matches!(event.kind, collect::hooks::EventKind::Stop) {
110                maybe_emit_prompt_changed(
111                    &store,
112                    &event.session_id,
113                    &ws,
114                    now_ms,
115                    &ev,
116                    sync_ctx.as_ref(),
117                )?;
118            }
119            store.update_session_status(&event.session_id, status)?;
120        }
121    } else {
122        store.ensure_session_stub(
123            &event.session_id,
124            source.agent(),
125            &ws.to_string_lossy(),
126            event.ts_ms,
127        )?;
128    }
129    store.append_event_with_sync(&ev, sync_ctx.as_ref())?;
130    Ok(())
131}
132
133fn maybe_emit_prompt_changed(
134    store: &Store,
135    session_id: &str,
136    ws: &std::path::Path,
137    now_ms: u64,
138    trigger_ev: &crate::core::event::Event,
139    sync_ctx: Option<&crate::sync::context::SyncIngestContext>,
140) -> Result<()> {
141    let Some(session) = store.get_session(session_id)? else {
142        return Ok(());
143    };
144    let Some(from_fp) = session.prompt_fingerprint else {
145        return Ok(());
146    };
147    let snap = prompt::snapshot::capture(ws, now_ms).ok();
148    let Some(snap) = snap else { return Ok(()) };
149    if snap.fingerprint == from_fp {
150        return Ok(());
151    }
152    let _ = store.upsert_prompt_snapshot(&snap);
153    let changed_ev = crate::core::event::Event {
154        session_id: session_id.to_string(),
155        seq: trigger_ev.seq + 1,
156        ts_ms: now_ms,
157        ts_exact: true,
158        kind: crate::core::event::EventKind::Hook,
159        source: crate::core::event::EventSource::Hook,
160        tool: None,
161        tool_call_id: None,
162        tokens_in: None,
163        tokens_out: None,
164        reasoning_tokens: None,
165        cost_usd_e6: None,
166        payload: serde_json::json!({
167            "kind": "prompt_changed",
168            "from_fingerprint": from_fp,
169            "to_fingerprint": snap.fingerprint,
170        }),
171    };
172    store.append_event_with_sync(&changed_ev, sync_ctx)?;
173    Ok(())
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use tempfile::TempDir;
180
181    fn ws_with_kaizen_dir() -> TempDir {
182        let dir = TempDir::new().unwrap();
183        std::fs::create_dir_all(dir.path().join(".kaizen")).unwrap();
184        dir
185    }
186
187    #[test]
188    fn session_start_records_source_as_agent_not_unknown() {
189        let dir = ws_with_kaizen_dir();
190        let payload =
191            r#"{"hook_event_name":"SessionStart","session_id":"s-agent-1","source":"startup"}"#;
192        ingest_hook_text(
193            IngestSource::Claude,
194            payload,
195            Some(dir.path().to_path_buf()),
196        )
197        .unwrap();
198
199        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
200        let sessions = db
201            .list_sessions(dir.path().to_string_lossy().as_ref())
202            .unwrap();
203        assert_eq!(sessions.len(), 1);
204        assert_eq!(sessions[0].agent, "claude");
205    }
206
207    #[test]
208    fn missing_timestamp_falls_back_to_now() {
209        let dir = ws_with_kaizen_dir();
210        // No timestamp_ms field — Claude Code never sends one.
211        let payload =
212            r#"{"hook_event_name":"SessionStart","session_id":"s-ts","source":"startup"}"#;
213        ingest_hook_text(
214            IngestSource::Claude,
215            payload,
216            Some(dir.path().to_path_buf()),
217        )
218        .unwrap();
219
220        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
221        let sessions = db
222            .list_sessions(dir.path().to_string_lossy().as_ref())
223            .unwrap();
224        assert!(sessions[0].started_at_ms > 0, "started_at_ms must not be 0");
225    }
226
227    #[test]
228    fn post_tool_use_without_session_start_auto_provisions_stub() {
229        let dir = ws_with_kaizen_dir();
230        // Hooks installed mid-session: first event is PostToolUse, no SessionStart.
231        let payload = r#"{"event":"PostToolUse","session_id":"s-stub","tool_name":"Read","tool_input":{"file_path":"/tmp/x"},"tool_response":{"content":"hi"}}"#;
232        ingest_hook_text(
233            IngestSource::Cursor,
234            payload,
235            Some(dir.path().to_path_buf()),
236        )
237        .unwrap();
238
239        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
240        let sessions = db
241            .list_sessions(dir.path().to_string_lossy().as_ref())
242            .unwrap();
243        assert_eq!(sessions.len(), 1);
244        assert_eq!(sessions[0].agent, "cursor");
245        assert_eq!(sessions[0].id, "s-stub");
246    }
247}