Skip to main content

kaizen/shell/
ingest.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen ingest` — hook ingestion (stdin or explicit payload for MCP).
3
4use crate::core::config;
5use crate::store::Store;
6use crate::{collect, core::event::SessionRecord, prompt};
7use anyhow::Result;
8use serde_json::Value;
9use std::path::PathBuf;
10
11mod prompt_change;
12mod sidecars;
13#[cfg(test)]
14mod tests;
15
16use prompt_change::maybe_emit_prompt_changed;
17use sidecars::post_ingest_detached;
18
19/// Hook source, aligned with the `kaizen ingest hook --source` CLI.
20#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
21#[serde(rename_all = "lowercase")]
22pub enum IngestSource {
23    Cursor,
24    Claude,
25    Openclaw,
26    Vibe,
27}
28
29impl IngestSource {
30    pub fn parse(s: &str) -> Option<Self> {
31        match s.to_lowercase().as_str() {
32            "cursor" => Some(Self::Cursor),
33            "claude" => Some(Self::Claude),
34            "openclaw" => Some(Self::Openclaw),
35            "vibe" => Some(Self::Vibe),
36            _ => None,
37        }
38    }
39
40    pub fn agent(self) -> &'static str {
41        match self {
42            Self::Cursor => "cursor",
43            Self::Claude => "claude",
44            Self::Openclaw => "openclaw",
45            Self::Vibe => "vibe",
46        }
47    }
48}
49
50/// Process hook JSON (same as stdin for `kaizen ingest hook`). On success, returns empty string (CLI prints nothing).
51pub fn ingest_hook_string(
52    source: IngestSource,
53    input: &str,
54    workspace: Option<PathBuf>,
55) -> Result<String> {
56    ingest_hook_text(source, input, workspace)?;
57    Ok(String::new())
58}
59
60/// Process hook JSON (same as stdin for `kaizen ingest hook`).
61pub fn ingest_hook_text(
62    source: IngestSource,
63    input: &str,
64    workspace: Option<PathBuf>,
65) -> Result<()> {
66    let ws = workspace.unwrap_or_else(|| std::env::current_dir().expect("cwd"));
67    let db_path = crate::core::workspace::db_path(&ws)?;
68    let store = Store::open(&db_path)?;
69    ingest_hook_with_store(source, input, &ws, &store)
70}
71
72pub(crate) fn ingest_hook_with_store(
73    source: IngestSource,
74    input: &str,
75    ws: &std::path::Path,
76    store: &Store,
77) -> Result<()> {
78    let event = match source {
79        IngestSource::Cursor => collect::hooks::cursor::parse_cursor_hook(input)?,
80        IngestSource::Claude => collect::hooks::claude::parse_claude_hook(input)?,
81        IngestSource::Openclaw => collect::hooks::openclaw::parse_openclaw_hook(input)?,
82        IngestSource::Vibe => collect::hooks::vibe::parse_vibe_hook(input)?,
83    };
84    let cfg = config::load(ws)?;
85    let sync_ctx = crate::sync::ingest_ctx(&cfg, ws.to_path_buf());
86    let now_ms = std::time::SystemTime::now()
87        .duration_since(std::time::UNIX_EPOCH)
88        .map(|d| d.as_millis() as u64)
89        .unwrap_or(0);
90    let ts = if event.ts_ms == 0 {
91        now_ms
92    } else {
93        event.ts_ms
94    };
95    let mut event = event;
96    event.ts_ms = ts;
97    let seq = store.next_event_seq(&event.session_id)?;
98    let ev = collect::hooks::normalize::hook_to_event(&event, seq);
99    if let Some(status) = collect::hooks::normalize::hook_to_status(&event.kind) {
100        if matches!(event.kind, collect::hooks::EventKind::SessionStart) {
101            let snap = prompt::snapshot::capture(ws, now_ms).ok();
102            let fingerprint = snap.as_ref().map(|s| s.fingerprint.clone());
103            if let Some(ref s) = snap {
104                let _ = store.upsert_prompt_snapshot(s);
105            }
106            let model = collect::model_from_json::from_value(&event.payload);
107            let env = session_env_fields(&event.payload);
108            let record = SessionRecord {
109                id: event.session_id.clone(),
110                agent: source.agent().to_string(),
111                model,
112                workspace: ws.to_string_lossy().to_string(),
113                started_at_ms: event.ts_ms,
114                ended_at_ms: None,
115                status: status.clone(),
116                trace_path: String::new(),
117                start_commit: None,
118                end_commit: None,
119                branch: None,
120                dirty_start: None,
121                dirty_end: None,
122                repo_binding_source: None,
123                prompt_fingerprint: fingerprint,
124                parent_session_id: None,
125                agent_version: env.0,
126                os: env.1,
127                arch: env.2,
128                repo_file_count: None,
129                repo_total_loc: None,
130            };
131            store.upsert_session(&record)?;
132        } else {
133            store.ensure_session_stub(
134                &event.session_id,
135                source.agent(),
136                &ws.to_string_lossy(),
137                event.ts_ms,
138            )?;
139            if matches!(event.kind, collect::hooks::EventKind::Stop) {
140                maybe_emit_prompt_changed(
141                    store,
142                    &event.session_id,
143                    ws,
144                    now_ms,
145                    &ev,
146                    sync_ctx.as_ref(),
147                )?;
148            }
149            store.update_session_status(&event.session_id, status)?;
150        }
151    } else {
152        store.ensure_session_stub(
153            &event.session_id,
154            source.agent(),
155            &ws.to_string_lossy(),
156            event.ts_ms,
157        )?;
158    }
159    store.append_event_with_sync(&ev, sync_ctx.as_ref())?;
160    if matches!(event.kind, collect::hooks::EventKind::Stop) {
161        store.flush_search()?;
162    }
163    post_ingest_detached(&event, &cfg, ws)?;
164    Ok(())
165}
166
167fn session_env_fields(payload: &Value) -> (Option<String>, Option<String>, Option<String>) {
168    let ver = [
169        "cursor_version",
170        "claude_version",
171        "agent_version",
172        "version",
173    ]
174    .into_iter()
175    .find_map(|k| {
176        payload
177            .get(k)
178            .and_then(|v| v.as_str())
179            .map(|s| s.to_string())
180    });
181    let os = payload
182        .get("os")
183        .and_then(|v| v.as_str())
184        .map(|s| s.to_string());
185    let arch = payload
186        .get("arch")
187        .and_then(|v| v.as_str())
188        .map(|s| s.to_string());
189    (ver, os, arch)
190}