Skip to main content

kaizen/retro/
inputs.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Build [`Inputs`] from SQLite + workspace filesystem.
3
4use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk};
7use crate::store::Store;
8use crate::sync::outbound::OutboundEvent;
9use anyhow::Result;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::Path;
13
14const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
15
16/// Load retro inputs after the store has been refreshed (e.g. `scan_all_agents`).
17pub fn load_inputs(
18    store: &Store,
19    workspace_root: &Path,
20    workspace_key: &str,
21    window_start_ms: u64,
22    window_end_ms: u64,
23) -> Result<Inputs> {
24    let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
25    let files_touched =
26        store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
27    let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
28    let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
29
30    let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
31    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
32    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
33
34    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
35    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
36
37    let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
38    let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
39    let file_facts = latest_file_facts(store, workspace_key)?;
40
41    let aggregates = build_aggregates(&events);
42    let eval_scores = store
43        .list_evals_in_window(window_start_ms, window_end_ms)
44        .unwrap_or_default()
45        .into_iter()
46        .map(|r| (r.session_id, r.score))
47        .collect();
48
49    Ok(Inputs {
50        window_start_ms,
51        window_end_ms,
52        events,
53        files_touched,
54        skills_used,
55        tool_spans,
56        skills_used_recent_slugs,
57        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
58        skill_files_on_disk,
59        rule_files_on_disk,
60        rules_used_recent_slugs,
61        file_facts,
62        aggregates,
63        eval_scores,
64    })
65}
66
67/// Same as [`load_inputs`], with optional `remote_events` + local merge when `DataSource` is not local.
68#[allow(clippy::too_many_arguments)]
69pub fn load_inputs_for_data_source(
70    store: &Store,
71    workspace_root: &Path,
72    workspace_key: &str,
73    start_ms: u64,
74    end_ms: u64,
75    source: DataSource,
76    team_id: Option<&str>,
77    workspace_hash: Option<&str>,
78) -> Result<Inputs> {
79    match source {
80        DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
81        DataSource::Provider => {
82            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
83                load_inputs_from_remote_cache(
84                    store,
85                    workspace_root,
86                    workspace_key,
87                    start_ms,
88                    end_ms,
89                    t,
90                    wh,
91                )
92            } else {
93                load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
94            }
95        }
96        DataSource::Mixed => {
97            let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
98            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
99                for raw in store.list_remote_event_jsons(t, wh)? {
100                    let o: OutboundEvent = serde_json::from_str(&raw)?;
101                    if o.ts_ms < start_ms || o.ts_ms > end_ms {
102                        continue;
103                    }
104                    i.events
105                        .push(session_event_from_outbound(&o, workspace_key));
106                }
107                i.events.sort_by(|(a, ea), (b, eb)| {
108                    ea.ts_ms
109                        .cmp(&eb.ts_ms)
110                        .then_with(|| a.id.cmp(&b.id))
111                        .then_with(|| ea.seq.cmp(&eb.seq))
112                });
113                i.aggregates = build_aggregates(&i.events);
114            }
115            Ok(i)
116        }
117    }
118}
119
120fn event_kind_from_outbound(s: &str) -> EventKind {
121    match s {
122        "tool_call" => EventKind::ToolCall,
123        "tool_result" => EventKind::ToolResult,
124        "message" => EventKind::Message,
125        "error" => EventKind::Error,
126        "cost" => EventKind::Cost,
127        "hook" => EventKind::Hook,
128        _ => EventKind::Message,
129    }
130}
131
132fn event_source_from_outbound(s: &str) -> EventSource {
133    match s {
134        "tail" => EventSource::Tail,
135        "proxy" => EventSource::Proxy,
136        "hook" => EventSource::Hook,
137        _ => EventSource::Hook,
138    }
139}
140
141fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
142    let sid = format!("remote:{}", o.session_id_hash);
143    let session = SessionRecord {
144        id: sid.clone(),
145        agent: o.agent.clone(),
146        model: Some(o.model.clone()),
147        workspace: workspace_key.to_string(),
148        started_at_ms: o.ts_ms,
149        ended_at_ms: None,
150        status: SessionStatus::Done,
151        trace_path: String::new(),
152        start_commit: None,
153        end_commit: None,
154        branch: None,
155        dirty_start: None,
156        dirty_end: None,
157        repo_binding_source: None,
158    };
159    let event = Event {
160        session_id: sid,
161        seq: o.event_seq,
162        ts_ms: o.ts_ms,
163        ts_exact: true,
164        kind: event_kind_from_outbound(&o.kind),
165        source: event_source_from_outbound(&o.source),
166        tool: o.tool.clone(),
167        tool_call_id: o.tool_call_id.clone(),
168        tokens_in: o.tokens_in,
169        tokens_out: o.tokens_out,
170        reasoning_tokens: o.reasoning_tokens,
171        cost_usd_e6: o.cost_usd_e6,
172        payload: o.payload.clone(),
173    };
174    (session, event)
175}
176
177fn load_inputs_from_remote_cache(
178    store: &Store,
179    workspace_root: &Path,
180    workspace_key: &str,
181    start_ms: u64,
182    end_ms: u64,
183    team_id: &str,
184    workspace_hash: &str,
185) -> Result<Inputs> {
186    let mut events = Vec::new();
187    for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
188        let o: OutboundEvent = serde_json::from_str(&raw)?;
189        if o.ts_ms < start_ms || o.ts_ms > end_ms {
190            continue;
191        }
192        events.push(session_event_from_outbound(&o, workspace_key));
193    }
194    events.sort_by(|(a, ea), (b, eb)| {
195        ea.ts_ms
196            .cmp(&eb.ts_ms)
197            .then_with(|| a.id.cmp(&b.id))
198            .then_with(|| ea.seq.cmp(&eb.seq))
199    });
200    let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
201    let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
202    let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
203    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
204    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
205    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
206    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
207    let file_facts = latest_file_facts(store, workspace_key)?;
208    let aggregates = build_aggregates(&events);
209    Ok(Inputs {
210        window_start_ms: start_ms,
211        window_end_ms: end_ms,
212        events,
213        files_touched: vec![],
214        skills_used: vec![],
215        tool_spans: vec![],
216        skills_used_recent_slugs,
217        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
218        skill_files_on_disk,
219        rule_files_on_disk,
220        rules_used_recent_slugs,
221        file_facts,
222        aggregates,
223        eval_scores: vec![],
224    })
225}
226
227fn latest_file_facts(
228    store: &Store,
229    workspace: &str,
230) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
231    let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
232        return Ok(HashMap::new());
233    };
234    let facts = store.file_facts_for_snapshot(&snapshot.id)?;
235    Ok(facts
236        .into_iter()
237        .map(|fact| (fact.path.clone(), fact))
238        .collect())
239}
240
241/// `.cursor/skills/<slug>/SKILL.md` on disk.
242pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
243    let skills_dir = workspace_root.join(".cursor/skills");
244    if !skills_dir.is_dir() {
245        return Ok(vec![]);
246    }
247    let mut out = Vec::new();
248    for entry in fs::read_dir(&skills_dir)? {
249        let entry = entry?;
250        if !entry.file_type()?.is_dir() {
251            continue;
252        }
253        let slug = entry.file_name().to_string_lossy().to_string();
254        let skill_md = entry.path().join("SKILL.md");
255        if !skill_md.is_file() {
256            continue;
257        }
258        let meta = fs::metadata(&skill_md)?;
259        let mtime_ms = meta
260            .modified()
261            .ok()
262            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
263            .map(|d| d.as_millis() as u64)
264            .unwrap_or(now_ms);
265        out.push(SkillFileOnDisk {
266            slug,
267            size_bytes: meta.len(),
268            mtime_ms,
269        });
270    }
271    out.sort_by(|a, b| a.slug.cmp(&b.slug));
272    Ok(out)
273}
274
275/// `.cursor/rules/*.mdc` files (stem = rule id).
276pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
277    let rules_dir = workspace_root.join(".cursor/rules");
278    if !rules_dir.is_dir() {
279        return Ok(vec![]);
280    }
281    let mut out = Vec::new();
282    for entry in fs::read_dir(&rules_dir)? {
283        let entry = entry?;
284        let path = entry.path();
285        if !path.is_file() {
286            continue;
287        }
288        if !path
289            .extension()
290            .and_then(|x| x.to_str())
291            .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
292        {
293            continue;
294        }
295        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
296            continue;
297        };
298        let slug = stem.to_string();
299        let meta = fs::metadata(&path)?;
300        let mtime_ms = meta
301            .modified()
302            .ok()
303            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
304            .map(|d| d.as_millis() as u64)
305            .unwrap_or(now_ms);
306        out.push(SkillFileOnDisk {
307            slug,
308            size_bytes: meta.len(),
309            mtime_ms,
310        });
311    }
312    out.sort_by(|a, b| a.slug.cmp(&b.slug));
313    Ok(out)
314}
315
316fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
317    let mut agg = RetroAggregates::default();
318    let mut model_once = HashSet::new();
319    for (s, e) in events {
320        agg.unique_session_ids.insert(s.id.clone());
321        if model_once.insert(s.id.clone()) {
322            let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
323            *agg.model_session_counts.entry(mkey).or_default() += 1;
324        }
325        if let Some(ref t) = e.tool {
326            *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
327            if let Some(c) = e.cost_usd_e6 {
328                *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
329            }
330        }
331        if let Some(c) = e.cost_usd_e6 {
332            agg.total_cost_usd_e6 += c;
333        }
334    }
335    agg
336}
337
338/// Collect bet ids from existing markdown reports (`###` headings embed id in body or title line).
339pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
340    let mut out = HashSet::new();
341    if !reports_dir.is_dir() {
342        return Ok(out);
343    }
344    for entry in fs::read_dir(reports_dir)? {
345        let entry = entry?;
346        let p = entry.path();
347        if p.extension().and_then(|x| x.to_str()) != Some("md") {
348            continue;
349        }
350        let raw = fs::read_to_string(&p).unwrap_or_default();
351        for line in raw.lines() {
352            let l = line.trim();
353            let Some(rest) = l.strip_prefix("### ") else {
354                continue;
355            };
356            let Some(open) = rest.rfind('(') else {
357                continue;
358            };
359            let Some(close) = rest.rfind(')') else {
360                continue;
361            };
362            if close <= open + 1 {
363                continue;
364            }
365            let id = rest[open + 1..close].trim();
366            if id.starts_with('H') && id.contains(':') {
367                out.insert(id.to_string());
368            }
369        }
370    }
371    Ok(out)
372}