Skip to main content

kaizen/shell/
ingest.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen ingest` — hook ingestion (stdin or explicit payload for MCP).
3
4use crate::core::config;
5use crate::store::Store;
6use crate::{collect, core::event::SessionRecord, prompt};
7use anyhow::Result;
8use serde_json::Value;
9use std::path::PathBuf;
10
11mod identity;
12mod prompt_change;
13mod sidecars;
14#[cfg(test)]
15mod tests;
16
17use prompt_change::maybe_emit_prompt_changed;
18use sidecars::post_ingest_detached;
19
20/// Hook source, aligned with the `kaizen ingest hook --source` CLI.
21#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
22#[serde(rename_all = "lowercase")]
23pub enum IngestSource {
24    Cursor,
25    Claude,
26    Openclaw,
27    Vibe,
28}
29
30impl IngestSource {
31    pub fn parse(s: &str) -> Option<Self> {
32        match s.to_lowercase().as_str() {
33            "cursor" => Some(Self::Cursor),
34            "claude" => Some(Self::Claude),
35            "openclaw" => Some(Self::Openclaw),
36            "vibe" => Some(Self::Vibe),
37            _ => None,
38        }
39    }
40
41    pub fn agent(self) -> &'static str {
42        match self {
43            Self::Cursor => "cursor",
44            Self::Claude => "claude",
45            Self::Openclaw => "openclaw",
46            Self::Vibe => "vibe",
47        }
48    }
49}
50
51/// Process hook JSON (same as stdin for `kaizen ingest hook`). On success, returns empty string (CLI prints nothing).
52pub fn ingest_hook_string(
53    source: IngestSource,
54    input: &str,
55    workspace: Option<PathBuf>,
56) -> Result<String> {
57    ingest_hook_text(source, input, workspace)?;
58    Ok(String::new())
59}
60
61/// Process hook JSON (same as stdin for `kaizen ingest hook`).
62pub fn ingest_hook_text(
63    source: IngestSource,
64    input: &str,
65    workspace: Option<PathBuf>,
66) -> Result<()> {
67    let ws = workspace.unwrap_or_else(|| std::env::current_dir().expect("cwd"));
68    let db_path = crate::core::workspace::db_path(&ws)?;
69    let store = Store::open(&db_path)?;
70    ingest_hook_with_store(source, input, &ws, &store)
71}
72
73pub(crate) fn ingest_hook_with_store(
74    source: IngestSource,
75    input: &str,
76    ws: &std::path::Path,
77    store: &Store,
78) -> Result<()> {
79    let event = match source {
80        IngestSource::Cursor => collect::hooks::cursor::parse_cursor_hook(input)?,
81        IngestSource::Claude => collect::hooks::claude::parse_claude_hook(input)?,
82        IngestSource::Openclaw => collect::hooks::openclaw::parse_openclaw_hook(input)?,
83        IngestSource::Vibe => collect::hooks::vibe::parse_vibe_hook(input)?,
84    };
85    let cfg = config::load(ws)?;
86    let sync_ctx = crate::sync::ingest_ctx(&cfg, ws.to_path_buf());
87    let now_ms = std::time::SystemTime::now()
88        .duration_since(std::time::UNIX_EPOCH)
89        .map(|d| d.as_millis() as u64)
90        .unwrap_or(0);
91    let ts = if event.ts_ms == 0 {
92        now_ms
93    } else {
94        event.ts_ms
95    };
96    let mut event = event;
97    event.ts_ms = ts;
98    let identity = identity::from_payload(source, &event.payload);
99    let seq = store.next_event_seq(&event.session_id)?;
100    let ev = collect::hooks::normalize::hook_to_event(&event, seq);
101    if let Some(status) = collect::hooks::normalize::hook_to_status(&event.kind) {
102        if matches!(event.kind, collect::hooks::EventKind::SessionStart) {
103            let snap = prompt::snapshot::capture(ws, now_ms).ok();
104            let fingerprint = snap.as_ref().map(|s| s.fingerprint.clone());
105            if let Some(ref s) = snap {
106                let _ = store.upsert_prompt_snapshot(s);
107            }
108            let model = collect::model_from_json::from_value(&event.payload);
109            let env = session_env_fields(&event.payload);
110            let record = SessionRecord {
111                id: event.session_id.clone(),
112                agent: identity.agent.clone(),
113                model: identity.model.clone().or(model),
114                workspace: ws.to_string_lossy().to_string(),
115                started_at_ms: event.ts_ms,
116                ended_at_ms: None,
117                status: status.clone(),
118                trace_path: identity.trace_path.clone().unwrap_or_default(),
119                start_commit: None,
120                end_commit: None,
121                branch: None,
122                dirty_start: None,
123                dirty_end: None,
124                repo_binding_source: None,
125                prompt_fingerprint: fingerprint,
126                parent_session_id: None,
127                agent_version: env.0,
128                os: env.1,
129                arch: env.2,
130                repo_file_count: None,
131                repo_total_loc: None,
132            };
133            store.upsert_session(&record)?;
134        } else {
135            store.ensure_session_stub(
136                &event.session_id,
137                &identity.agent,
138                &ws.to_string_lossy(),
139                event.ts_ms,
140            )?;
141            if matches!(event.kind, collect::hooks::EventKind::Stop) {
142                maybe_emit_prompt_changed(
143                    store,
144                    &event.session_id,
145                    ws,
146                    now_ms,
147                    &ev,
148                    sync_ctx.as_ref(),
149                )?;
150            }
151            store.update_session_status(&event.session_id, status)?;
152        }
153    } else {
154        store.ensure_session_stub(
155            &event.session_id,
156            &identity.agent,
157            &ws.to_string_lossy(),
158            event.ts_ms,
159        )?;
160    }
161    store.enrich_session_identity(
162        &event.session_id,
163        identity.agent_update.as_deref(),
164        identity.model.as_deref(),
165        identity.trace_path.as_deref(),
166    )?;
167    store.append_event_with_sync(&ev, sync_ctx.as_ref())?;
168    if matches!(event.kind, collect::hooks::EventKind::Stop) {
169        store.flush_search()?;
170    }
171    post_ingest_detached(&event, &cfg, ws)?;
172    Ok(())
173}
174
175fn session_env_fields(payload: &Value) -> (Option<String>, Option<String>, Option<String>) {
176    let ver = [
177        "cursor_version",
178        "claude_version",
179        "agent_version",
180        "version",
181    ]
182    .into_iter()
183    .find_map(|k| {
184        payload
185            .get(k)
186            .and_then(|v| v.as_str())
187            .map(|s| s.to_string())
188    });
189    let os = payload
190        .get("os")
191        .and_then(|v| v.as_str())
192        .map(|s| s.to_string());
193    let arch = payload
194        .get("arch")
195        .and_then(|v| v.as_str())
196        .map(|s| s.to_string());
197    (ver, os, arch)
198}