Skip to main content

kaizen/retro/
inputs.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Build [`Inputs`] from SQLite + workspace filesystem.
3
4use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk};
7use crate::store::Store;
8use crate::sync::outbound::OutboundEvent;
9use anyhow::Result;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::Path;
13
14const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
15
16/// Load retro inputs after the store has been refreshed (e.g. `scan_all_agents`).
17pub fn load_inputs(
18    store: &Store,
19    workspace_root: &Path,
20    workspace_key: &str,
21    window_start_ms: u64,
22    window_end_ms: u64,
23) -> Result<Inputs> {
24    let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
25    let files_touched =
26        store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
27    let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
28    let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
29
30    let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
31    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
32    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
33
34    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
35    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
36
37    let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
38    let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
39    let file_facts = latest_file_facts(store, workspace_key)?;
40
41    let aggregates = build_aggregates(&events);
42    let eval_scores = store
43        .list_evals_in_window(window_start_ms, window_end_ms)
44        .unwrap_or_default()
45        .into_iter()
46        .map(|r| (r.session_id, r.score))
47        .collect();
48    let prompt_fingerprints = store
49        .sessions_with_prompt_fingerprint(workspace_key, window_start_ms, window_end_ms)
50        .unwrap_or_default();
51
52    let feedback = store
53        .list_feedback_in_window(window_start_ms, window_end_ms)
54        .unwrap_or_default();
55    Ok(Inputs {
56        window_start_ms,
57        window_end_ms,
58        events,
59        files_touched,
60        skills_used,
61        tool_spans,
62        skills_used_recent_slugs,
63        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
64        skill_files_on_disk,
65        rule_files_on_disk,
66        rules_used_recent_slugs,
67        file_facts,
68        aggregates,
69        eval_scores,
70        prompt_fingerprints,
71        feedback,
72    })
73}
74
75/// Same as [`load_inputs`], with optional `remote_events` + local merge when `DataSource` is not local.
76#[allow(clippy::too_many_arguments)]
77pub fn load_inputs_for_data_source(
78    store: &Store,
79    workspace_root: &Path,
80    workspace_key: &str,
81    start_ms: u64,
82    end_ms: u64,
83    source: DataSource,
84    team_id: Option<&str>,
85    workspace_hash: Option<&str>,
86) -> Result<Inputs> {
87    match source {
88        DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
89        DataSource::Provider => {
90            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
91                load_inputs_from_remote_cache(
92                    store,
93                    workspace_root,
94                    workspace_key,
95                    start_ms,
96                    end_ms,
97                    t,
98                    wh,
99                )
100            } else {
101                load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
102            }
103        }
104        DataSource::Mixed => {
105            let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
106            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
107                for raw in store.list_remote_event_jsons(t, wh)? {
108                    let o: OutboundEvent = serde_json::from_str(&raw)?;
109                    if o.ts_ms < start_ms || o.ts_ms > end_ms {
110                        continue;
111                    }
112                    i.events
113                        .push(session_event_from_outbound(&o, workspace_key));
114                }
115                i.events.sort_by(|(a, ea), (b, eb)| {
116                    ea.ts_ms
117                        .cmp(&eb.ts_ms)
118                        .then_with(|| a.id.cmp(&b.id))
119                        .then_with(|| ea.seq.cmp(&eb.seq))
120                });
121                i.aggregates = build_aggregates(&i.events);
122            }
123            Ok(i)
124        }
125    }
126}
127
128fn event_kind_from_outbound(s: &str) -> EventKind {
129    match s {
130        "tool_call" => EventKind::ToolCall,
131        "tool_result" => EventKind::ToolResult,
132        "message" => EventKind::Message,
133        "error" => EventKind::Error,
134        "cost" => EventKind::Cost,
135        "hook" => EventKind::Hook,
136        _ => EventKind::Message,
137    }
138}
139
140fn event_source_from_outbound(s: &str) -> EventSource {
141    match s {
142        "tail" => EventSource::Tail,
143        "proxy" => EventSource::Proxy,
144        "hook" => EventSource::Hook,
145        _ => EventSource::Hook,
146    }
147}
148
149fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
150    let sid = format!("remote:{}", o.session_id_hash);
151    let session = SessionRecord {
152        id: sid.clone(),
153        agent: o.agent.clone(),
154        model: Some(o.model.clone()),
155        workspace: workspace_key.to_string(),
156        started_at_ms: o.ts_ms,
157        ended_at_ms: None,
158        status: SessionStatus::Done,
159        trace_path: String::new(),
160        start_commit: None,
161        end_commit: None,
162        branch: None,
163        dirty_start: None,
164        dirty_end: None,
165        repo_binding_source: None,
166        prompt_fingerprint: None,
167    };
168    let event = Event {
169        session_id: sid,
170        seq: o.event_seq,
171        ts_ms: o.ts_ms,
172        ts_exact: true,
173        kind: event_kind_from_outbound(&o.kind),
174        source: event_source_from_outbound(&o.source),
175        tool: o.tool.clone(),
176        tool_call_id: o.tool_call_id.clone(),
177        tokens_in: o.tokens_in,
178        tokens_out: o.tokens_out,
179        reasoning_tokens: o.reasoning_tokens,
180        cost_usd_e6: o.cost_usd_e6,
181        payload: o.payload.clone(),
182    };
183    (session, event)
184}
185
186fn load_inputs_from_remote_cache(
187    store: &Store,
188    workspace_root: &Path,
189    workspace_key: &str,
190    start_ms: u64,
191    end_ms: u64,
192    team_id: &str,
193    workspace_hash: &str,
194) -> Result<Inputs> {
195    let mut events = Vec::new();
196    for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
197        let o: OutboundEvent = serde_json::from_str(&raw)?;
198        if o.ts_ms < start_ms || o.ts_ms > end_ms {
199            continue;
200        }
201        events.push(session_event_from_outbound(&o, workspace_key));
202    }
203    events.sort_by(|(a, ea), (b, eb)| {
204        ea.ts_ms
205            .cmp(&eb.ts_ms)
206            .then_with(|| a.id.cmp(&b.id))
207            .then_with(|| ea.seq.cmp(&eb.seq))
208    });
209    let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
210    let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
211    let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
212    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
213    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
214    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
215    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
216    let file_facts = latest_file_facts(store, workspace_key)?;
217    let aggregates = build_aggregates(&events);
218    Ok(Inputs {
219        window_start_ms: start_ms,
220        window_end_ms: end_ms,
221        events,
222        files_touched: vec![],
223        skills_used: vec![],
224        tool_spans: vec![],
225        skills_used_recent_slugs,
226        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
227        skill_files_on_disk,
228        rule_files_on_disk,
229        rules_used_recent_slugs,
230        file_facts,
231        aggregates,
232        eval_scores: vec![],
233        prompt_fingerprints: vec![],
234        feedback: vec![],
235    })
236}
237
238fn latest_file_facts(
239    store: &Store,
240    workspace: &str,
241) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
242    let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
243        return Ok(HashMap::new());
244    };
245    let facts = store.file_facts_for_snapshot(&snapshot.id)?;
246    Ok(facts
247        .into_iter()
248        .map(|fact| (fact.path.clone(), fact))
249        .collect())
250}
251
252/// `.cursor/skills/<slug>/SKILL.md` on disk.
253pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
254    let skills_dir = workspace_root.join(".cursor/skills");
255    if !skills_dir.is_dir() {
256        return Ok(vec![]);
257    }
258    let mut out = Vec::new();
259    for entry in fs::read_dir(&skills_dir)? {
260        let entry = entry?;
261        if !entry.file_type()?.is_dir() {
262            continue;
263        }
264        let slug = entry.file_name().to_string_lossy().to_string();
265        let skill_md = entry.path().join("SKILL.md");
266        if !skill_md.is_file() {
267            continue;
268        }
269        let meta = fs::metadata(&skill_md)?;
270        let mtime_ms = meta
271            .modified()
272            .ok()
273            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
274            .map(|d| d.as_millis() as u64)
275            .unwrap_or(now_ms);
276        out.push(SkillFileOnDisk {
277            slug,
278            size_bytes: meta.len(),
279            mtime_ms,
280        });
281    }
282    out.sort_by(|a, b| a.slug.cmp(&b.slug));
283    Ok(out)
284}
285
286/// `.cursor/rules/*.mdc` files (stem = rule id).
287pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
288    let rules_dir = workspace_root.join(".cursor/rules");
289    if !rules_dir.is_dir() {
290        return Ok(vec![]);
291    }
292    let mut out = Vec::new();
293    for entry in fs::read_dir(&rules_dir)? {
294        let entry = entry?;
295        let path = entry.path();
296        if !path.is_file() {
297            continue;
298        }
299        if !path
300            .extension()
301            .and_then(|x| x.to_str())
302            .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
303        {
304            continue;
305        }
306        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
307            continue;
308        };
309        let slug = stem.to_string();
310        let meta = fs::metadata(&path)?;
311        let mtime_ms = meta
312            .modified()
313            .ok()
314            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
315            .map(|d| d.as_millis() as u64)
316            .unwrap_or(now_ms);
317        out.push(SkillFileOnDisk {
318            slug,
319            size_bytes: meta.len(),
320            mtime_ms,
321        });
322    }
323    out.sort_by(|a, b| a.slug.cmp(&b.slug));
324    Ok(out)
325}
326
327fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
328    let mut agg = RetroAggregates::default();
329    let mut model_once = HashSet::new();
330    for (s, e) in events {
331        agg.unique_session_ids.insert(s.id.clone());
332        if model_once.insert(s.id.clone()) {
333            let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
334            *agg.model_session_counts.entry(mkey).or_default() += 1;
335        }
336        if let Some(ref t) = e.tool {
337            *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
338            if let Some(c) = e.cost_usd_e6 {
339                *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
340            }
341        }
342        if let Some(c) = e.cost_usd_e6 {
343            agg.total_cost_usd_e6 += c;
344        }
345    }
346    agg
347}
348
349/// Collect bet ids from existing markdown reports (`###` headings embed id in body or title line).
350pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
351    let mut out = HashSet::new();
352    if !reports_dir.is_dir() {
353        return Ok(out);
354    }
355    for entry in fs::read_dir(reports_dir)? {
356        let entry = entry?;
357        let p = entry.path();
358        if p.extension().and_then(|x| x.to_str()) != Some("md") {
359            continue;
360        }
361        let raw = fs::read_to_string(&p).unwrap_or_default();
362        for line in raw.lines() {
363            let l = line.trim();
364            let Some(rest) = l.strip_prefix("### ") else {
365                continue;
366            };
367            let Some(open) = rest.rfind('(') else {
368                continue;
369            };
370            let Some(close) = rest.rfind(')') else {
371                continue;
372            };
373            if close <= open + 1 {
374                continue;
375            }
376            let id = rest[open + 1..close].trim();
377            if id.starts_with('H') && id.contains(':') {
378                out.insert(id.to_string());
379            }
380        }
381    }
382    Ok(out)
383}