Skip to main content

kaizen/retro/
inputs.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Build [`Inputs`] from SQLite + workspace filesystem.
3
4use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk};
7use crate::store::Store;
8use crate::sync::outbound::OutboundEvent;
9use anyhow::Result;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::Path;
13
14const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
15
16/// Load retro inputs after the store has been refreshed (e.g. `scan_all_agents`).
17pub fn load_inputs(
18    store: &Store,
19    workspace_root: &Path,
20    workspace_key: &str,
21    window_start_ms: u64,
22    window_end_ms: u64,
23) -> Result<Inputs> {
24    let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
25    let files_touched =
26        store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
27    let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
28    let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
29
30    let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
31    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
32    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
33
34    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
35    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
36
37    let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
38    let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
39    let file_facts = latest_file_facts(store, workspace_key)?;
40
41    let aggregates = build_aggregates(&events);
42
43    Ok(Inputs {
44        window_start_ms,
45        window_end_ms,
46        events,
47        files_touched,
48        skills_used,
49        tool_spans,
50        skills_used_recent_slugs,
51        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
52        skill_files_on_disk,
53        rule_files_on_disk,
54        rules_used_recent_slugs,
55        file_facts,
56        aggregates,
57    })
58}
59
60/// Same as [`load_inputs`], with optional `remote_events` + local merge when `DataSource` is not local.
61#[allow(clippy::too_many_arguments)]
62pub fn load_inputs_for_data_source(
63    store: &Store,
64    workspace_root: &Path,
65    workspace_key: &str,
66    start_ms: u64,
67    end_ms: u64,
68    source: DataSource,
69    team_id: Option<&str>,
70    workspace_hash: Option<&str>,
71) -> Result<Inputs> {
72    match source {
73        DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
74        DataSource::Provider => {
75            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
76                load_inputs_from_remote_cache(
77                    store,
78                    workspace_root,
79                    workspace_key,
80                    start_ms,
81                    end_ms,
82                    t,
83                    wh,
84                )
85            } else {
86                load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
87            }
88        }
89        DataSource::Mixed => {
90            let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
91            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
92                for raw in store.list_remote_event_jsons(t, wh)? {
93                    let o: OutboundEvent = serde_json::from_str(&raw)?;
94                    if o.ts_ms < start_ms || o.ts_ms > end_ms {
95                        continue;
96                    }
97                    i.events
98                        .push(session_event_from_outbound(&o, workspace_key));
99                }
100                i.events.sort_by(|(a, ea), (b, eb)| {
101                    ea.ts_ms
102                        .cmp(&eb.ts_ms)
103                        .then_with(|| a.id.cmp(&b.id))
104                        .then_with(|| ea.seq.cmp(&eb.seq))
105                });
106                i.aggregates = build_aggregates(&i.events);
107            }
108            Ok(i)
109        }
110    }
111}
112
113fn event_kind_from_outbound(s: &str) -> EventKind {
114    match s {
115        "tool_call" => EventKind::ToolCall,
116        "tool_result" => EventKind::ToolResult,
117        "message" => EventKind::Message,
118        "error" => EventKind::Error,
119        "cost" => EventKind::Cost,
120        "hook" => EventKind::Hook,
121        _ => EventKind::Message,
122    }
123}
124
125fn event_source_from_outbound(s: &str) -> EventSource {
126    match s {
127        "tail" => EventSource::Tail,
128        "proxy" => EventSource::Proxy,
129        "hook" => EventSource::Hook,
130        _ => EventSource::Hook,
131    }
132}
133
134fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
135    let sid = format!("remote:{}", o.session_id_hash);
136    let session = SessionRecord {
137        id: sid.clone(),
138        agent: o.agent.clone(),
139        model: Some(o.model.clone()),
140        workspace: workspace_key.to_string(),
141        started_at_ms: o.ts_ms,
142        ended_at_ms: None,
143        status: SessionStatus::Done,
144        trace_path: String::new(),
145        start_commit: None,
146        end_commit: None,
147        branch: None,
148        dirty_start: None,
149        dirty_end: None,
150        repo_binding_source: None,
151    };
152    let event = Event {
153        session_id: sid,
154        seq: o.event_seq,
155        ts_ms: o.ts_ms,
156        ts_exact: true,
157        kind: event_kind_from_outbound(&o.kind),
158        source: event_source_from_outbound(&o.source),
159        tool: o.tool.clone(),
160        tool_call_id: o.tool_call_id.clone(),
161        tokens_in: o.tokens_in,
162        tokens_out: o.tokens_out,
163        reasoning_tokens: o.reasoning_tokens,
164        cost_usd_e6: o.cost_usd_e6,
165        payload: o.payload.clone(),
166    };
167    (session, event)
168}
169
170fn load_inputs_from_remote_cache(
171    store: &Store,
172    workspace_root: &Path,
173    workspace_key: &str,
174    start_ms: u64,
175    end_ms: u64,
176    team_id: &str,
177    workspace_hash: &str,
178) -> Result<Inputs> {
179    let mut events = Vec::new();
180    for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
181        let o: OutboundEvent = serde_json::from_str(&raw)?;
182        if o.ts_ms < start_ms || o.ts_ms > end_ms {
183            continue;
184        }
185        events.push(session_event_from_outbound(&o, workspace_key));
186    }
187    events.sort_by(|(a, ea), (b, eb)| {
188        ea.ts_ms
189            .cmp(&eb.ts_ms)
190            .then_with(|| a.id.cmp(&b.id))
191            .then_with(|| ea.seq.cmp(&eb.seq))
192    });
193    let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
194    let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
195    let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
196    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
197    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
198    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
199    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
200    let file_facts = latest_file_facts(store, workspace_key)?;
201    let aggregates = build_aggregates(&events);
202    Ok(Inputs {
203        window_start_ms: start_ms,
204        window_end_ms: end_ms,
205        events,
206        files_touched: vec![],
207        skills_used: vec![],
208        tool_spans: vec![],
209        skills_used_recent_slugs,
210        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
211        skill_files_on_disk,
212        rule_files_on_disk,
213        rules_used_recent_slugs,
214        file_facts,
215        aggregates,
216    })
217}
218
219fn latest_file_facts(
220    store: &Store,
221    workspace: &str,
222) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
223    let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
224        return Ok(HashMap::new());
225    };
226    let facts = store.file_facts_for_snapshot(&snapshot.id)?;
227    Ok(facts
228        .into_iter()
229        .map(|fact| (fact.path.clone(), fact))
230        .collect())
231}
232
233/// `.cursor/skills/<slug>/SKILL.md` on disk.
234pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
235    let skills_dir = workspace_root.join(".cursor/skills");
236    if !skills_dir.is_dir() {
237        return Ok(vec![]);
238    }
239    let mut out = Vec::new();
240    for entry in fs::read_dir(&skills_dir)? {
241        let entry = entry?;
242        if !entry.file_type()?.is_dir() {
243            continue;
244        }
245        let slug = entry.file_name().to_string_lossy().to_string();
246        let skill_md = entry.path().join("SKILL.md");
247        if !skill_md.is_file() {
248            continue;
249        }
250        let meta = fs::metadata(&skill_md)?;
251        let mtime_ms = meta
252            .modified()
253            .ok()
254            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
255            .map(|d| d.as_millis() as u64)
256            .unwrap_or(now_ms);
257        out.push(SkillFileOnDisk {
258            slug,
259            size_bytes: meta.len(),
260            mtime_ms,
261        });
262    }
263    out.sort_by(|a, b| a.slug.cmp(&b.slug));
264    Ok(out)
265}
266
267/// `.cursor/rules/*.mdc` files (stem = rule id).
268pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
269    let rules_dir = workspace_root.join(".cursor/rules");
270    if !rules_dir.is_dir() {
271        return Ok(vec![]);
272    }
273    let mut out = Vec::new();
274    for entry in fs::read_dir(&rules_dir)? {
275        let entry = entry?;
276        let path = entry.path();
277        if !path.is_file() {
278            continue;
279        }
280        if !path
281            .extension()
282            .and_then(|x| x.to_str())
283            .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
284        {
285            continue;
286        }
287        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
288            continue;
289        };
290        let slug = stem.to_string();
291        let meta = fs::metadata(&path)?;
292        let mtime_ms = meta
293            .modified()
294            .ok()
295            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
296            .map(|d| d.as_millis() as u64)
297            .unwrap_or(now_ms);
298        out.push(SkillFileOnDisk {
299            slug,
300            size_bytes: meta.len(),
301            mtime_ms,
302        });
303    }
304    out.sort_by(|a, b| a.slug.cmp(&b.slug));
305    Ok(out)
306}
307
308fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
309    let mut agg = RetroAggregates::default();
310    let mut model_once = HashSet::new();
311    for (s, e) in events {
312        agg.unique_session_ids.insert(s.id.clone());
313        if model_once.insert(s.id.clone()) {
314            let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
315            *agg.model_session_counts.entry(mkey).or_default() += 1;
316        }
317        if let Some(ref t) = e.tool {
318            *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
319            if let Some(c) = e.cost_usd_e6 {
320                *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
321            }
322        }
323        if let Some(c) = e.cost_usd_e6 {
324            agg.total_cost_usd_e6 += c;
325        }
326    }
327    agg
328}
329
330/// Collect bet ids from existing markdown reports (`###` headings embed id in body or title line).
331pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
332    let mut out = HashSet::new();
333    if !reports_dir.is_dir() {
334        return Ok(out);
335    }
336    for entry in fs::read_dir(reports_dir)? {
337        let entry = entry?;
338        let p = entry.path();
339        if p.extension().and_then(|x| x.to_str()) != Some("md") {
340            continue;
341        }
342        let raw = fs::read_to_string(&p).unwrap_or_default();
343        for line in raw.lines() {
344            let l = line.trim();
345            let Some(rest) = l.strip_prefix("### ") else {
346                continue;
347            };
348            let Some(open) = rest.rfind('(') else {
349                continue;
350            };
351            let Some(close) = rest.rfind(')') else {
352                continue;
353            };
354            if close <= open + 1 {
355                continue;
356            }
357            let id = rest[open + 1..close].trim();
358            if id.starts_with('H') && id.contains(':') {
359                out.insert(id.to_string());
360            }
361        }
362    }
363    Ok(out)
364}