1use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::metrics::types::ToolSpanView;
7use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk, SpanTreeStats};
8use crate::store::Store;
9use crate::sync::outbound::OutboundEvent;
10use anyhow::Result;
11use std::collections::{HashMap, HashSet};
12use std::fs;
13use std::path::Path;
14
15const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
16
17pub fn load_inputs(
19 store: &Store,
20 workspace_root: &Path,
21 workspace_key: &str,
22 window_start_ms: u64,
23 window_end_ms: u64,
24) -> Result<Inputs> {
25 let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
26 let files_touched =
27 store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
28 let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
29 let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
30
31 let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
32 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
33 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
34
35 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
36 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
37
38 let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
39 let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
40 let file_facts = latest_file_facts(store, workspace_key)?;
41
42 let mut aggregates = build_aggregates(&events);
43 aggregates.span_tree_stats = compute_span_tree_stats(&tool_spans);
44 let eval_scores = store
45 .list_evals_in_window(window_start_ms, window_end_ms)
46 .unwrap_or_default()
47 .into_iter()
48 .map(|r| (r.session_id, r.score))
49 .collect();
50 let prompt_fingerprints = store
51 .sessions_with_prompt_fingerprint(workspace_key, window_start_ms, window_end_ms)
52 .unwrap_or_default();
53
54 let feedback = store
55 .list_feedback_in_window(window_start_ms, window_end_ms)
56 .unwrap_or_default();
57 let session_outcomes = store
58 .list_session_outcomes_in_window(workspace_key, window_start_ms, window_end_ms)
59 .unwrap_or_default();
60 let session_sample_aggs = store
61 .list_session_sample_aggs_in_window(workspace_key, window_start_ms, window_end_ms)
62 .unwrap_or_default();
63 Ok(Inputs {
64 window_start_ms,
65 window_end_ms,
66 events,
67 files_touched,
68 skills_used,
69 tool_spans,
70 skills_used_recent_slugs,
71 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
72 skill_files_on_disk,
73 rule_files_on_disk,
74 rules_used_recent_slugs,
75 file_facts,
76 aggregates,
77 eval_scores,
78 prompt_fingerprints,
79 feedback,
80 session_outcomes,
81 session_sample_aggs,
82 })
83}
84
85#[allow(clippy::too_many_arguments)]
87pub fn load_inputs_for_data_source(
88 store: &Store,
89 workspace_root: &Path,
90 workspace_key: &str,
91 start_ms: u64,
92 end_ms: u64,
93 source: DataSource,
94 team_id: Option<&str>,
95 workspace_hash: Option<&str>,
96) -> Result<Inputs> {
97 match source {
98 DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
99 DataSource::Provider => {
100 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
101 load_inputs_from_remote_cache(
102 store,
103 workspace_root,
104 workspace_key,
105 start_ms,
106 end_ms,
107 t,
108 wh,
109 )
110 } else {
111 load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
112 }
113 }
114 DataSource::Mixed => {
115 let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
116 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
117 for raw in store.list_remote_event_jsons(t, wh)? {
118 let o: OutboundEvent = serde_json::from_str(&raw)?;
119 if o.ts_ms < start_ms || o.ts_ms > end_ms {
120 continue;
121 }
122 i.events
123 .push(session_event_from_outbound(&o, workspace_key));
124 }
125 i.events.sort_by(|(a, ea), (b, eb)| {
126 ea.ts_ms
127 .cmp(&eb.ts_ms)
128 .then_with(|| a.id.cmp(&b.id))
129 .then_with(|| ea.seq.cmp(&eb.seq))
130 });
131 let mut agg = build_aggregates(&i.events);
132 agg.span_tree_stats = compute_span_tree_stats(&i.tool_spans);
133 i.aggregates = agg;
134 }
135 Ok(i)
136 }
137 }
138}
139
140fn event_kind_from_outbound(s: &str) -> EventKind {
141 match s {
142 "tool_call" => EventKind::ToolCall,
143 "tool_result" => EventKind::ToolResult,
144 "message" => EventKind::Message,
145 "error" => EventKind::Error,
146 "cost" => EventKind::Cost,
147 "hook" => EventKind::Hook,
148 "lifecycle" => EventKind::Lifecycle,
149 _ => EventKind::Message,
150 }
151}
152
153fn event_source_from_outbound(s: &str) -> EventSource {
154 match s {
155 "tail" => EventSource::Tail,
156 "proxy" => EventSource::Proxy,
157 "hook" => EventSource::Hook,
158 _ => EventSource::Hook,
159 }
160}
161
162fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
163 let sid = format!("remote:{}", o.session_id_hash);
164 let session = SessionRecord {
165 id: sid.clone(),
166 agent: o.agent.clone(),
167 model: Some(o.model.clone()),
168 workspace: workspace_key.to_string(),
169 started_at_ms: o.ts_ms,
170 ended_at_ms: None,
171 status: SessionStatus::Done,
172 trace_path: String::new(),
173 start_commit: None,
174 end_commit: None,
175 branch: None,
176 dirty_start: None,
177 dirty_end: None,
178 repo_binding_source: None,
179 prompt_fingerprint: None,
180 parent_session_id: None,
181 agent_version: None,
182 os: None,
183 arch: None,
184 repo_file_count: None,
185 repo_total_loc: None,
186 };
187 let event = Event {
188 session_id: sid,
189 seq: o.event_seq,
190 ts_ms: o.ts_ms,
191 ts_exact: true,
192 kind: event_kind_from_outbound(&o.kind),
193 source: event_source_from_outbound(&o.source),
194 tool: o.tool.clone(),
195 tool_call_id: o.tool_call_id.clone(),
196 tokens_in: o.tokens_in,
197 tokens_out: o.tokens_out,
198 reasoning_tokens: o.reasoning_tokens,
199 cost_usd_e6: o.cost_usd_e6,
200 stop_reason: None,
201 latency_ms: None,
202 ttft_ms: None,
203 retry_count: None,
204 context_used_tokens: None,
205 context_max_tokens: None,
206 cache_creation_tokens: None,
207 cache_read_tokens: None,
208 system_prompt_tokens: None,
209 payload: o.payload.clone(),
210 };
211 (session, event)
212}
213
214fn load_inputs_from_remote_cache(
215 store: &Store,
216 workspace_root: &Path,
217 workspace_key: &str,
218 start_ms: u64,
219 end_ms: u64,
220 team_id: &str,
221 workspace_hash: &str,
222) -> Result<Inputs> {
223 let mut events = Vec::new();
224 for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
225 let o: OutboundEvent = serde_json::from_str(&raw)?;
226 if o.ts_ms < start_ms || o.ts_ms > end_ms {
227 continue;
228 }
229 events.push(session_event_from_outbound(&o, workspace_key));
230 }
231 events.sort_by(|(a, ea), (b, eb)| {
232 ea.ts_ms
233 .cmp(&eb.ts_ms)
234 .then_with(|| a.id.cmp(&b.id))
235 .then_with(|| ea.seq.cmp(&eb.seq))
236 });
237 let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
238 let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
239 let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
240 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
241 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
242 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
243 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
244 let file_facts = latest_file_facts(store, workspace_key)?;
245 let aggregates = build_aggregates(&events);
246 Ok(Inputs {
247 window_start_ms: start_ms,
248 window_end_ms: end_ms,
249 events,
250 files_touched: vec![],
251 skills_used: vec![],
252 tool_spans: vec![],
253 skills_used_recent_slugs,
254 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
255 skill_files_on_disk,
256 rule_files_on_disk,
257 rules_used_recent_slugs,
258 file_facts,
259 aggregates,
260 eval_scores: vec![],
261 prompt_fingerprints: vec![],
262 feedback: vec![],
263 session_outcomes: store
264 .list_session_outcomes_in_window(workspace_key, start_ms, end_ms)
265 .unwrap_or_default(),
266 session_sample_aggs: store
267 .list_session_sample_aggs_in_window(workspace_key, start_ms, end_ms)
268 .unwrap_or_default(),
269 })
270}
271
272fn latest_file_facts(
273 store: &Store,
274 workspace: &str,
275) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
276 let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
277 return Ok(HashMap::new());
278 };
279 let facts = store.file_facts_for_snapshot(&snapshot.id)?;
280 Ok(facts
281 .into_iter()
282 .map(|fact| (fact.path.clone(), fact))
283 .collect())
284}
285
286pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
288 let skills_dir = workspace_root.join(".cursor/skills");
289 if !skills_dir.is_dir() {
290 return Ok(vec![]);
291 }
292 let mut out = Vec::new();
293 for entry in fs::read_dir(&skills_dir)? {
294 let entry = entry?;
295 if !entry.file_type()?.is_dir() {
296 continue;
297 }
298 let slug = entry.file_name().to_string_lossy().to_string();
299 let skill_md = entry.path().join("SKILL.md");
300 if !skill_md.is_file() {
301 continue;
302 }
303 let meta = fs::metadata(&skill_md)?;
304 let mtime_ms = meta
305 .modified()
306 .ok()
307 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
308 .map(|d| d.as_millis() as u64)
309 .unwrap_or(now_ms);
310 out.push(SkillFileOnDisk {
311 slug,
312 size_bytes: meta.len(),
313 mtime_ms,
314 });
315 }
316 out.sort_by(|a, b| a.slug.cmp(&b.slug));
317 Ok(out)
318}
319
320pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
322 let rules_dir = workspace_root.join(".cursor/rules");
323 if !rules_dir.is_dir() {
324 return Ok(vec![]);
325 }
326 let mut out = Vec::new();
327 for entry in fs::read_dir(&rules_dir)? {
328 let entry = entry?;
329 let path = entry.path();
330 if !path.is_file() {
331 continue;
332 }
333 if !path
334 .extension()
335 .and_then(|x| x.to_str())
336 .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
337 {
338 continue;
339 }
340 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
341 continue;
342 };
343 let slug = stem.to_string();
344 let meta = fs::metadata(&path)?;
345 let mtime_ms = meta
346 .modified()
347 .ok()
348 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
349 .map(|d| d.as_millis() as u64)
350 .unwrap_or(now_ms);
351 out.push(SkillFileOnDisk {
352 slug,
353 size_bytes: meta.len(),
354 mtime_ms,
355 });
356 }
357 out.sort_by(|a, b| a.slug.cmp(&b.slug));
358 Ok(out)
359}
360
361fn compute_span_tree_stats(spans: &[ToolSpanView]) -> Option<SpanTreeStats> {
362 if spans.is_empty() {
363 return None;
364 }
365 use std::collections::HashMap;
366 let max_depth = spans.iter().map(|s| s.depth).max().unwrap_or(0);
367 let deepest = spans
368 .iter()
369 .filter(|s| s.depth == max_depth)
370 .max_by_key(|s| s.subtree_cost_usd_e6.unwrap_or(0))?;
371 let mut children_counts: HashMap<&str, u32> = HashMap::new();
372 for s in spans {
373 if let Some(ref pid) = s.parent_span_id {
374 *children_counts.entry(pid.as_str()).or_default() += 1;
375 }
376 }
377 let max_fan_out = children_counts.values().copied().max().unwrap_or(0);
378 Some(SpanTreeStats {
379 max_depth,
380 max_fan_out,
381 deepest_span_id: deepest.span_id.clone(),
382 })
383}
384
385fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
386 let mut agg = RetroAggregates::default();
387 let mut model_once = HashSet::new();
388 for (s, e) in events {
389 agg.unique_session_ids.insert(s.id.clone());
390 if model_once.insert(s.id.clone()) {
391 let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
392 *agg.model_session_counts.entry(mkey).or_default() += 1;
393 }
394 if let Some(ref t) = e.tool {
395 *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
396 if let Some(c) = e.cost_usd_e6 {
397 *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
398 }
399 }
400 if let Some(c) = e.cost_usd_e6 {
401 agg.total_cost_usd_e6 += c;
402 }
403 }
404 agg
405}
406
407pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
409 let mut out = HashSet::new();
410 if !reports_dir.is_dir() {
411 return Ok(out);
412 }
413 for entry in fs::read_dir(reports_dir)? {
414 let entry = entry?;
415 let p = entry.path();
416 if p.extension().and_then(|x| x.to_str()) != Some("md") {
417 continue;
418 }
419 let raw = fs::read_to_string(&p).unwrap_or_default();
420 for line in raw.lines() {
421 let l = line.trim();
422 let Some(rest) = l.strip_prefix("### ") else {
423 continue;
424 };
425 let Some(open) = rest.rfind('(') else {
426 continue;
427 };
428 let Some(close) = rest.rfind(')') else {
429 continue;
430 };
431 if close <= open + 1 {
432 continue;
433 }
434 let id = rest[open + 1..close].trim();
435 if id.starts_with('H') && id.contains(':') {
436 out.insert(id.to_string());
437 }
438 }
439 }
440 Ok(out)
441}