Skip to main content

kaizen/retro/
inputs.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Build [`Inputs`] from SQLite + workspace filesystem.
3
4use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk};
7use crate::store::Store;
8use crate::sync::outbound::OutboundEvent;
9use anyhow::Result;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::Path;
13
14const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
15
16/// Load retro inputs after the store has been refreshed (e.g. `scan_all_agents`).
17pub fn load_inputs(
18    store: &Store,
19    workspace_root: &Path,
20    workspace_key: &str,
21    window_start_ms: u64,
22    window_end_ms: u64,
23) -> Result<Inputs> {
24    let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
25    let files_touched =
26        store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
27    let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
28    let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
29
30    let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
31    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
32    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
33
34    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
35    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
36
37    let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
38    let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
39    let file_facts = latest_file_facts(store, workspace_key)?;
40
41    let aggregates = build_aggregates(&events);
42    let eval_scores = store
43        .list_evals_in_window(window_start_ms, window_end_ms)
44        .unwrap_or_default()
45        .into_iter()
46        .map(|r| (r.session_id, r.score))
47        .collect();
48    let prompt_fingerprints = store
49        .sessions_with_prompt_fingerprint(workspace_key, window_start_ms, window_end_ms)
50        .unwrap_or_default();
51
52    Ok(Inputs {
53        window_start_ms,
54        window_end_ms,
55        events,
56        files_touched,
57        skills_used,
58        tool_spans,
59        skills_used_recent_slugs,
60        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
61        skill_files_on_disk,
62        rule_files_on_disk,
63        rules_used_recent_slugs,
64        file_facts,
65        aggregates,
66        eval_scores,
67        prompt_fingerprints,
68    })
69}
70
71/// Same as [`load_inputs`], with optional `remote_events` + local merge when `DataSource` is not local.
72#[allow(clippy::too_many_arguments)]
73pub fn load_inputs_for_data_source(
74    store: &Store,
75    workspace_root: &Path,
76    workspace_key: &str,
77    start_ms: u64,
78    end_ms: u64,
79    source: DataSource,
80    team_id: Option<&str>,
81    workspace_hash: Option<&str>,
82) -> Result<Inputs> {
83    match source {
84        DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
85        DataSource::Provider => {
86            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
87                load_inputs_from_remote_cache(
88                    store,
89                    workspace_root,
90                    workspace_key,
91                    start_ms,
92                    end_ms,
93                    t,
94                    wh,
95                )
96            } else {
97                load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
98            }
99        }
100        DataSource::Mixed => {
101            let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
102            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
103                for raw in store.list_remote_event_jsons(t, wh)? {
104                    let o: OutboundEvent = serde_json::from_str(&raw)?;
105                    if o.ts_ms < start_ms || o.ts_ms > end_ms {
106                        continue;
107                    }
108                    i.events
109                        .push(session_event_from_outbound(&o, workspace_key));
110                }
111                i.events.sort_by(|(a, ea), (b, eb)| {
112                    ea.ts_ms
113                        .cmp(&eb.ts_ms)
114                        .then_with(|| a.id.cmp(&b.id))
115                        .then_with(|| ea.seq.cmp(&eb.seq))
116                });
117                i.aggregates = build_aggregates(&i.events);
118            }
119            Ok(i)
120        }
121    }
122}
123
124fn event_kind_from_outbound(s: &str) -> EventKind {
125    match s {
126        "tool_call" => EventKind::ToolCall,
127        "tool_result" => EventKind::ToolResult,
128        "message" => EventKind::Message,
129        "error" => EventKind::Error,
130        "cost" => EventKind::Cost,
131        "hook" => EventKind::Hook,
132        _ => EventKind::Message,
133    }
134}
135
136fn event_source_from_outbound(s: &str) -> EventSource {
137    match s {
138        "tail" => EventSource::Tail,
139        "proxy" => EventSource::Proxy,
140        "hook" => EventSource::Hook,
141        _ => EventSource::Hook,
142    }
143}
144
145fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
146    let sid = format!("remote:{}", o.session_id_hash);
147    let session = SessionRecord {
148        id: sid.clone(),
149        agent: o.agent.clone(),
150        model: Some(o.model.clone()),
151        workspace: workspace_key.to_string(),
152        started_at_ms: o.ts_ms,
153        ended_at_ms: None,
154        status: SessionStatus::Done,
155        trace_path: String::new(),
156        start_commit: None,
157        end_commit: None,
158        branch: None,
159        dirty_start: None,
160        dirty_end: None,
161        repo_binding_source: None,
162        prompt_fingerprint: None,
163    };
164    let event = Event {
165        session_id: sid,
166        seq: o.event_seq,
167        ts_ms: o.ts_ms,
168        ts_exact: true,
169        kind: event_kind_from_outbound(&o.kind),
170        source: event_source_from_outbound(&o.source),
171        tool: o.tool.clone(),
172        tool_call_id: o.tool_call_id.clone(),
173        tokens_in: o.tokens_in,
174        tokens_out: o.tokens_out,
175        reasoning_tokens: o.reasoning_tokens,
176        cost_usd_e6: o.cost_usd_e6,
177        payload: o.payload.clone(),
178    };
179    (session, event)
180}
181
182fn load_inputs_from_remote_cache(
183    store: &Store,
184    workspace_root: &Path,
185    workspace_key: &str,
186    start_ms: u64,
187    end_ms: u64,
188    team_id: &str,
189    workspace_hash: &str,
190) -> Result<Inputs> {
191    let mut events = Vec::new();
192    for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
193        let o: OutboundEvent = serde_json::from_str(&raw)?;
194        if o.ts_ms < start_ms || o.ts_ms > end_ms {
195            continue;
196        }
197        events.push(session_event_from_outbound(&o, workspace_key));
198    }
199    events.sort_by(|(a, ea), (b, eb)| {
200        ea.ts_ms
201            .cmp(&eb.ts_ms)
202            .then_with(|| a.id.cmp(&b.id))
203            .then_with(|| ea.seq.cmp(&eb.seq))
204    });
205    let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
206    let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
207    let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
208    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
209    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
210    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
211    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
212    let file_facts = latest_file_facts(store, workspace_key)?;
213    let aggregates = build_aggregates(&events);
214    Ok(Inputs {
215        window_start_ms: start_ms,
216        window_end_ms: end_ms,
217        events,
218        files_touched: vec![],
219        skills_used: vec![],
220        tool_spans: vec![],
221        skills_used_recent_slugs,
222        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
223        skill_files_on_disk,
224        rule_files_on_disk,
225        rules_used_recent_slugs,
226        file_facts,
227        aggregates,
228        eval_scores: vec![],
229        prompt_fingerprints: vec![],
230    })
231}
232
233fn latest_file_facts(
234    store: &Store,
235    workspace: &str,
236) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
237    let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
238        return Ok(HashMap::new());
239    };
240    let facts = store.file_facts_for_snapshot(&snapshot.id)?;
241    Ok(facts
242        .into_iter()
243        .map(|fact| (fact.path.clone(), fact))
244        .collect())
245}
246
247/// `.cursor/skills/<slug>/SKILL.md` on disk.
248pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
249    let skills_dir = workspace_root.join(".cursor/skills");
250    if !skills_dir.is_dir() {
251        return Ok(vec![]);
252    }
253    let mut out = Vec::new();
254    for entry in fs::read_dir(&skills_dir)? {
255        let entry = entry?;
256        if !entry.file_type()?.is_dir() {
257            continue;
258        }
259        let slug = entry.file_name().to_string_lossy().to_string();
260        let skill_md = entry.path().join("SKILL.md");
261        if !skill_md.is_file() {
262            continue;
263        }
264        let meta = fs::metadata(&skill_md)?;
265        let mtime_ms = meta
266            .modified()
267            .ok()
268            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
269            .map(|d| d.as_millis() as u64)
270            .unwrap_or(now_ms);
271        out.push(SkillFileOnDisk {
272            slug,
273            size_bytes: meta.len(),
274            mtime_ms,
275        });
276    }
277    out.sort_by(|a, b| a.slug.cmp(&b.slug));
278    Ok(out)
279}
280
281/// `.cursor/rules/*.mdc` files (stem = rule id).
282pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
283    let rules_dir = workspace_root.join(".cursor/rules");
284    if !rules_dir.is_dir() {
285        return Ok(vec![]);
286    }
287    let mut out = Vec::new();
288    for entry in fs::read_dir(&rules_dir)? {
289        let entry = entry?;
290        let path = entry.path();
291        if !path.is_file() {
292            continue;
293        }
294        if !path
295            .extension()
296            .and_then(|x| x.to_str())
297            .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
298        {
299            continue;
300        }
301        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
302            continue;
303        };
304        let slug = stem.to_string();
305        let meta = fs::metadata(&path)?;
306        let mtime_ms = meta
307            .modified()
308            .ok()
309            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
310            .map(|d| d.as_millis() as u64)
311            .unwrap_or(now_ms);
312        out.push(SkillFileOnDisk {
313            slug,
314            size_bytes: meta.len(),
315            mtime_ms,
316        });
317    }
318    out.sort_by(|a, b| a.slug.cmp(&b.slug));
319    Ok(out)
320}
321
322fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
323    let mut agg = RetroAggregates::default();
324    let mut model_once = HashSet::new();
325    for (s, e) in events {
326        agg.unique_session_ids.insert(s.id.clone());
327        if model_once.insert(s.id.clone()) {
328            let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
329            *agg.model_session_counts.entry(mkey).or_default() += 1;
330        }
331        if let Some(ref t) = e.tool {
332            *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
333            if let Some(c) = e.cost_usd_e6 {
334                *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
335            }
336        }
337        if let Some(c) = e.cost_usd_e6 {
338            agg.total_cost_usd_e6 += c;
339        }
340    }
341    agg
342}
343
344/// Collect bet ids from existing markdown reports (`###` headings embed id in body or title line).
345pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
346    let mut out = HashSet::new();
347    if !reports_dir.is_dir() {
348        return Ok(out);
349    }
350    for entry in fs::read_dir(reports_dir)? {
351        let entry = entry?;
352        let p = entry.path();
353        if p.extension().and_then(|x| x.to_str()) != Some("md") {
354            continue;
355        }
356        let raw = fs::read_to_string(&p).unwrap_or_default();
357        for line in raw.lines() {
358            let l = line.trim();
359            let Some(rest) = l.strip_prefix("### ") else {
360                continue;
361            };
362            let Some(open) = rest.rfind('(') else {
363                continue;
364            };
365            let Some(close) = rest.rfind(')') else {
366                continue;
367            };
368            if close <= open + 1 {
369                continue;
370            }
371            let id = rest[open + 1..close].trim();
372            if id.starts_with('H') && id.contains(':') {
373                out.insert(id.to_string());
374            }
375        }
376    }
377    Ok(out)
378}