Skip to main content

kaizen/retro/
inputs.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Build [`Inputs`] from SQLite + workspace filesystem.
3
4use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::ToolSpanView;
7use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk, SpanTreeStats};
8use crate::store::Store;
9use crate::sync::outbound::OutboundEvent;
10use anyhow::Result;
11use std::collections::{HashMap, HashSet};
12use std::fs;
13use std::path::Path;
14
15const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
16
17/// Load retro inputs after the store has been refreshed (e.g. `maybe_scan_all_agents` with `refresh: true` or `scan_all_agents`).
18pub fn load_inputs(
19    store: &Store,
20    workspace_root: &Path,
21    workspace_key: &str,
22    window_start_ms: u64,
23    window_end_ms: u64,
24) -> Result<Inputs> {
25    let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
26    let files_touched =
27        store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
28    let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
29    let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
30
31    let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
32    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
33    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
34
35    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
36    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
37
38    let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
39    let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
40    let file_facts = latest_file_facts(store, workspace_key)?;
41
42    let mut aggregates = build_aggregates(&events);
43    aggregates.span_tree_stats = compute_span_tree_stats(&tool_spans);
44    let eval_scores = store
45        .list_evals_in_window(window_start_ms, window_end_ms)
46        .unwrap_or_default()
47        .into_iter()
48        .map(|r| (r.session_id, r.score))
49        .collect();
50    let prompt_fingerprints = store
51        .sessions_with_prompt_fingerprint(workspace_key, window_start_ms, window_end_ms)
52        .unwrap_or_default();
53
54    let feedback = store
55        .list_feedback_in_window(window_start_ms, window_end_ms)
56        .unwrap_or_default();
57    let session_outcomes = store
58        .list_session_outcomes_in_window(workspace_key, window_start_ms, window_end_ms)
59        .unwrap_or_default();
60    let session_sample_aggs = store
61        .list_session_sample_aggs_in_window(workspace_key, window_start_ms, window_end_ms)
62        .unwrap_or_default();
63    Ok(Inputs {
64        window_start_ms,
65        window_end_ms,
66        events,
67        files_touched,
68        skills_used,
69        tool_spans,
70        skills_used_recent_slugs,
71        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
72        skill_files_on_disk,
73        rule_files_on_disk,
74        rules_used_recent_slugs,
75        file_facts,
76        aggregates,
77        eval_scores,
78        prompt_fingerprints,
79        feedback,
80        session_outcomes,
81        session_sample_aggs,
82    })
83}
84
85/// Same as [`load_inputs`], with optional `remote_events` + local merge when `DataSource` is not local.
86#[allow(clippy::too_many_arguments)]
87pub fn load_inputs_for_data_source(
88    store: &Store,
89    workspace_root: &Path,
90    workspace_key: &str,
91    start_ms: u64,
92    end_ms: u64,
93    source: DataSource,
94    team_id: Option<&str>,
95    workspace_hash: Option<&str>,
96) -> Result<Inputs> {
97    match source {
98        DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
99        DataSource::Provider => {
100            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
101                load_inputs_from_remote_cache(
102                    store,
103                    workspace_root,
104                    workspace_key,
105                    start_ms,
106                    end_ms,
107                    t,
108                    wh,
109                )
110            } else {
111                load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
112            }
113        }
114        DataSource::Mixed => {
115            let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
116            if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
117                for raw in store.list_remote_event_jsons(t, wh)? {
118                    let o: OutboundEvent = serde_json::from_str(&raw)?;
119                    if o.ts_ms < start_ms || o.ts_ms > end_ms {
120                        continue;
121                    }
122                    i.events
123                        .push(session_event_from_outbound(&o, workspace_key));
124                }
125                i.events.sort_by(|(a, ea), (b, eb)| {
126                    ea.ts_ms
127                        .cmp(&eb.ts_ms)
128                        .then_with(|| a.id.cmp(&b.id))
129                        .then_with(|| ea.seq.cmp(&eb.seq))
130                });
131                let mut agg = build_aggregates(&i.events);
132                agg.span_tree_stats = compute_span_tree_stats(&i.tool_spans);
133                i.aggregates = agg;
134            }
135            Ok(i)
136        }
137    }
138}
139
140fn event_kind_from_outbound(s: &str) -> EventKind {
141    match s {
142        "tool_call" => EventKind::ToolCall,
143        "tool_result" => EventKind::ToolResult,
144        "message" => EventKind::Message,
145        "error" => EventKind::Error,
146        "cost" => EventKind::Cost,
147        "hook" => EventKind::Hook,
148        "lifecycle" => EventKind::Lifecycle,
149        _ => EventKind::Message,
150    }
151}
152
153fn event_source_from_outbound(s: &str) -> EventSource {
154    match s {
155        "tail" => EventSource::Tail,
156        "proxy" => EventSource::Proxy,
157        "hook" => EventSource::Hook,
158        _ => EventSource::Hook,
159    }
160}
161
162fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
163    let sid = format!("remote:{}", o.session_id_hash);
164    let session = SessionRecord {
165        id: sid.clone(),
166        agent: o.agent.clone(),
167        model: Some(o.model.clone()),
168        workspace: workspace_key.to_string(),
169        started_at_ms: o.ts_ms,
170        ended_at_ms: None,
171        status: SessionStatus::Done,
172        trace_path: String::new(),
173        start_commit: None,
174        end_commit: None,
175        branch: None,
176        dirty_start: None,
177        dirty_end: None,
178        repo_binding_source: None,
179        prompt_fingerprint: None,
180        parent_session_id: None,
181        agent_version: None,
182        os: None,
183        arch: None,
184        repo_file_count: None,
185        repo_total_loc: None,
186    };
187    let event = Event {
188        session_id: sid,
189        seq: o.event_seq,
190        ts_ms: o.ts_ms,
191        ts_exact: true,
192        kind: event_kind_from_outbound(&o.kind),
193        source: event_source_from_outbound(&o.source),
194        tool: o.tool.clone(),
195        tool_call_id: o.tool_call_id.clone(),
196        tokens_in: o.tokens_in,
197        tokens_out: o.tokens_out,
198        reasoning_tokens: o.reasoning_tokens,
199        cost_usd_e6: o.cost_usd_e6,
200        stop_reason: None,
201        latency_ms: None,
202        ttft_ms: None,
203        retry_count: None,
204        context_used_tokens: None,
205        context_max_tokens: None,
206        cache_creation_tokens: None,
207        cache_read_tokens: None,
208        system_prompt_tokens: None,
209        payload: o.payload.clone(),
210    };
211    (session, event)
212}
213
214fn load_inputs_from_remote_cache(
215    store: &Store,
216    workspace_root: &Path,
217    workspace_key: &str,
218    start_ms: u64,
219    end_ms: u64,
220    team_id: &str,
221    workspace_hash: &str,
222) -> Result<Inputs> {
223    let mut events = Vec::new();
224    for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
225        let o: OutboundEvent = serde_json::from_str(&raw)?;
226        if o.ts_ms < start_ms || o.ts_ms > end_ms {
227            continue;
228        }
229        events.push(session_event_from_outbound(&o, workspace_key));
230    }
231    events.sort_by(|(a, ea), (b, eb)| {
232        ea.ts_ms
233            .cmp(&eb.ts_ms)
234            .then_with(|| a.id.cmp(&b.id))
235            .then_with(|| ea.seq.cmp(&eb.seq))
236    });
237    let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
238    let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
239    let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
240    let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
241    let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
242    let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
243    let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
244    let file_facts = latest_file_facts(store, workspace_key)?;
245    let aggregates = build_aggregates(&events);
246    Ok(Inputs {
247        window_start_ms: start_ms,
248        window_end_ms: end_ms,
249        events,
250        files_touched: vec![],
251        skills_used: vec![],
252        tool_spans: vec![],
253        skills_used_recent_slugs,
254        usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
255        skill_files_on_disk,
256        rule_files_on_disk,
257        rules_used_recent_slugs,
258        file_facts,
259        aggregates,
260        eval_scores: vec![],
261        prompt_fingerprints: vec![],
262        feedback: vec![],
263        session_outcomes: store
264            .list_session_outcomes_in_window(workspace_key, start_ms, end_ms)
265            .unwrap_or_default(),
266        session_sample_aggs: store
267            .list_session_sample_aggs_in_window(workspace_key, start_ms, end_ms)
268            .unwrap_or_default(),
269    })
270}
271
272fn latest_file_facts(
273    store: &Store,
274    workspace: &str,
275) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
276    let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
277        return Ok(HashMap::new());
278    };
279    let facts = store.file_facts_for_snapshot(&snapshot.id)?;
280    Ok(facts
281        .into_iter()
282        .map(|fact| (fact.path.clone(), fact))
283        .collect())
284}
285
286/// `.cursor/skills/<slug>/SKILL.md` on disk.
287pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
288    let skills_dir = workspace_root.join(".cursor/skills");
289    if !skills_dir.is_dir() {
290        return Ok(vec![]);
291    }
292    let mut out = Vec::new();
293    for entry in fs::read_dir(&skills_dir)? {
294        let entry = entry?;
295        if !entry.file_type()?.is_dir() {
296            continue;
297        }
298        let slug = entry.file_name().to_string_lossy().to_string();
299        let skill_md = entry.path().join("SKILL.md");
300        if !skill_md.is_file() {
301            continue;
302        }
303        let meta = fs::metadata(&skill_md)?;
304        let mtime_ms = meta
305            .modified()
306            .ok()
307            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
308            .map(|d| d.as_millis() as u64)
309            .unwrap_or(now_ms);
310        out.push(SkillFileOnDisk {
311            slug,
312            size_bytes: meta.len(),
313            mtime_ms,
314        });
315    }
316    out.sort_by(|a, b| a.slug.cmp(&b.slug));
317    Ok(out)
318}
319
320/// `.cursor/rules/*.mdc` files (stem = rule id).
321pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
322    let rules_dir = workspace_root.join(".cursor/rules");
323    if !rules_dir.is_dir() {
324        return Ok(vec![]);
325    }
326    let mut out = Vec::new();
327    for entry in fs::read_dir(&rules_dir)? {
328        let entry = entry?;
329        let path = entry.path();
330        if !path.is_file() {
331            continue;
332        }
333        if !path
334            .extension()
335            .and_then(|x| x.to_str())
336            .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
337        {
338            continue;
339        }
340        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
341            continue;
342        };
343        let slug = stem.to_string();
344        let meta = fs::metadata(&path)?;
345        let mtime_ms = meta
346            .modified()
347            .ok()
348            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
349            .map(|d| d.as_millis() as u64)
350            .unwrap_or(now_ms);
351        out.push(SkillFileOnDisk {
352            slug,
353            size_bytes: meta.len(),
354            mtime_ms,
355        });
356    }
357    out.sort_by(|a, b| a.slug.cmp(&b.slug));
358    Ok(out)
359}
360
361fn compute_span_tree_stats(spans: &[ToolSpanView]) -> Option<SpanTreeStats> {
362    if spans.is_empty() {
363        return None;
364    }
365    use std::collections::HashMap;
366    let max_depth = spans.iter().map(|s| s.depth).max().unwrap_or(0);
367    let deepest = spans
368        .iter()
369        .filter(|s| s.depth == max_depth)
370        .max_by_key(|s| s.subtree_cost_usd_e6.unwrap_or(0))?;
371    let mut children_counts: HashMap<&str, u32> = HashMap::new();
372    for s in spans {
373        if let Some(ref pid) = s.parent_span_id {
374            *children_counts.entry(pid.as_str()).or_default() += 1;
375        }
376    }
377    let max_fan_out = children_counts.values().copied().max().unwrap_or(0);
378    Some(SpanTreeStats {
379        max_depth,
380        max_fan_out,
381        deepest_span_id: deepest.span_id.clone(),
382    })
383}
384
385fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
386    let mut agg = RetroAggregates::default();
387    let mut model_once = HashSet::new();
388    for (s, e) in events {
389        agg.unique_session_ids.insert(s.id.clone());
390        if model_once.insert(s.id.clone()) {
391            let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
392            *agg.model_session_counts.entry(mkey).or_default() += 1;
393        }
394        if let Some(ref t) = e.tool {
395            *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
396            if let Some(c) = e.cost_usd_e6 {
397                *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
398            }
399        }
400        if let Some(c) = e.cost_usd_e6 {
401            agg.total_cost_usd_e6 += c;
402        }
403    }
404    agg
405}
406
407/// Collect bet ids from existing markdown reports (`###` headings embed id in body or title line).
408pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
409    let mut out = HashSet::new();
410    if !reports_dir.is_dir() {
411        return Ok(out);
412    }
413    for entry in fs::read_dir(reports_dir)? {
414        let entry = entry?;
415        let p = entry.path();
416        if p.extension().and_then(|x| x.to_str()) != Some("md") {
417            continue;
418        }
419        let raw = fs::read_to_string(&p).unwrap_or_default();
420        for line in raw.lines() {
421            let l = line.trim();
422            let Some(rest) = l.strip_prefix("### ") else {
423                continue;
424            };
425            let Some(open) = rest.rfind('(') else {
426                continue;
427            };
428            let Some(close) = rest.rfind(')') else {
429                continue;
430            };
431            if close <= open + 1 {
432                continue;
433            }
434            let id = rest[open + 1..close].trim();
435            if id.starts_with('H') && id.contains(':') {
436                out.insert(id.to_string());
437            }
438        }
439    }
440    Ok(out)
441}