Skip to main content

kaizen/shell/
ingest.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen ingest` — hook ingestion (stdin or explicit payload for MCP).
3
4use crate::core::config;
5use crate::store::Store;
6use crate::{collect, core::event::SessionRecord};
7use anyhow::Result;
8use std::path::PathBuf;
9
10/// Hook source, aligned with the `kaizen ingest hook --source` CLI.
11#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
12#[serde(rename_all = "lowercase")]
13pub enum IngestSource {
14    Cursor,
15    Claude,
16}
17
18impl IngestSource {
19    pub fn parse(s: &str) -> Option<Self> {
20        match s.to_lowercase().as_str() {
21            "cursor" => Some(Self::Cursor),
22            "claude" => Some(Self::Claude),
23            _ => None,
24        }
25    }
26
27    pub fn agent(self) -> &'static str {
28        match self {
29            Self::Cursor => "cursor",
30            Self::Claude => "claude",
31        }
32    }
33}
34
35/// Process hook JSON (same as stdin for `kaizen ingest hook`). On success, returns empty string (CLI prints nothing).
36pub fn ingest_hook_string(
37    source: IngestSource,
38    input: &str,
39    workspace: Option<PathBuf>,
40) -> Result<String> {
41    ingest_hook_text(source, input, workspace)?;
42    Ok(String::new())
43}
44
45/// Process hook JSON (same as stdin for `kaizen ingest hook`).
46pub fn ingest_hook_text(
47    source: IngestSource,
48    input: &str,
49    workspace: Option<PathBuf>,
50) -> Result<()> {
51    let event = match source {
52        IngestSource::Cursor => collect::hooks::cursor::parse_cursor_hook(input)?,
53        IngestSource::Claude => collect::hooks::claude::parse_claude_hook(input)?,
54    };
55    let ws = workspace.unwrap_or_else(|| std::env::current_dir().expect("cwd"));
56    let cfg = config::load(&ws)?;
57    let sync_ctx = crate::sync::ingest_ctx(&cfg, ws.clone());
58    let db_path = ws.join(".kaizen/kaizen.db");
59    let store = Store::open(&db_path)?;
60    let now_ms = std::time::SystemTime::now()
61        .duration_since(std::time::UNIX_EPOCH)
62        .map(|d| d.as_millis() as u64)
63        .unwrap_or(0);
64    let ts = if event.ts_ms == 0 {
65        now_ms
66    } else {
67        event.ts_ms
68    };
69    let mut event = event;
70    event.ts_ms = ts;
71    let ev = collect::hooks::normalize::hook_to_event(&event, 0);
72    if let Some(status) = collect::hooks::normalize::hook_to_status(&event.kind) {
73        if matches!(event.kind, collect::hooks::EventKind::SessionStart) {
74            let model = collect::model_from_json::from_value(&event.payload);
75            let record = SessionRecord {
76                id: event.session_id.clone(),
77                agent: source.agent().to_string(),
78                model,
79                workspace: ws.to_string_lossy().to_string(),
80                started_at_ms: event.ts_ms,
81                ended_at_ms: None,
82                status: status.clone(),
83                trace_path: String::new(),
84                start_commit: None,
85                end_commit: None,
86                branch: None,
87                dirty_start: None,
88                dirty_end: None,
89                repo_binding_source: None,
90            };
91            store.upsert_session(&record)?;
92        } else {
93            store.ensure_session_stub(
94                &event.session_id,
95                source.agent(),
96                &ws.to_string_lossy(),
97                event.ts_ms,
98            )?;
99            store.update_session_status(&event.session_id, status)?;
100        }
101    } else {
102        store.ensure_session_stub(
103            &event.session_id,
104            source.agent(),
105            &ws.to_string_lossy(),
106            event.ts_ms,
107        )?;
108    }
109    store.append_event_with_sync(&ev, sync_ctx.as_ref())?;
110    Ok(())
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use tempfile::TempDir;
117
118    fn ws_with_kaizen_dir() -> TempDir {
119        let dir = TempDir::new().unwrap();
120        std::fs::create_dir_all(dir.path().join(".kaizen")).unwrap();
121        dir
122    }
123
124    #[test]
125    fn session_start_records_source_as_agent_not_unknown() {
126        let dir = ws_with_kaizen_dir();
127        let payload =
128            r#"{"hook_event_name":"SessionStart","session_id":"s-agent-1","source":"startup"}"#;
129        ingest_hook_text(
130            IngestSource::Claude,
131            payload,
132            Some(dir.path().to_path_buf()),
133        )
134        .unwrap();
135
136        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
137        let sessions = db
138            .list_sessions(dir.path().to_string_lossy().as_ref())
139            .unwrap();
140        assert_eq!(sessions.len(), 1);
141        assert_eq!(sessions[0].agent, "claude");
142    }
143
144    #[test]
145    fn missing_timestamp_falls_back_to_now() {
146        let dir = ws_with_kaizen_dir();
147        // No timestamp_ms field — Claude Code never sends one.
148        let payload =
149            r#"{"hook_event_name":"SessionStart","session_id":"s-ts","source":"startup"}"#;
150        ingest_hook_text(
151            IngestSource::Claude,
152            payload,
153            Some(dir.path().to_path_buf()),
154        )
155        .unwrap();
156
157        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
158        let sessions = db
159            .list_sessions(dir.path().to_string_lossy().as_ref())
160            .unwrap();
161        assert!(sessions[0].started_at_ms > 0, "started_at_ms must not be 0");
162    }
163
164    #[test]
165    fn post_tool_use_without_session_start_auto_provisions_stub() {
166        let dir = ws_with_kaizen_dir();
167        // Hooks installed mid-session: first event is PostToolUse, no SessionStart.
168        let payload = r#"{"event":"PostToolUse","session_id":"s-stub","tool_name":"Read","tool_input":{"file_path":"/tmp/x"},"tool_response":{"content":"hi"}}"#;
169        ingest_hook_text(
170            IngestSource::Cursor,
171            payload,
172            Some(dir.path().to_path_buf()),
173        )
174        .unwrap();
175
176        let db = Store::open(&dir.path().join(".kaizen/kaizen.db")).unwrap();
177        let sessions = db
178            .list_sessions(dir.path().to_string_lossy().as_ref())
179            .unwrap();
180        assert_eq!(sessions.len(), 1);
181        assert_eq!(sessions[0].agent, "cursor");
182        assert_eq!(sessions[0].id, "s-stub");
183    }
184}