1use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk};
7use crate::store::Store;
8use crate::sync::outbound::OutboundEvent;
9use anyhow::Result;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::Path;
13
14const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
15
16pub fn load_inputs(
18 store: &Store,
19 workspace_root: &Path,
20 workspace_key: &str,
21 window_start_ms: u64,
22 window_end_ms: u64,
23) -> Result<Inputs> {
24 let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
25 let files_touched =
26 store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
27 let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
28 let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
29
30 let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
31 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
32 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
33
34 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
35 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
36
37 let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
38 let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
39 let file_facts = latest_file_facts(store, workspace_key)?;
40
41 let aggregates = build_aggregates(&events);
42 let eval_scores = store
43 .list_evals_in_window(window_start_ms, window_end_ms)
44 .unwrap_or_default()
45 .into_iter()
46 .map(|r| (r.session_id, r.score))
47 .collect();
48 let prompt_fingerprints = store
49 .sessions_with_prompt_fingerprint(workspace_key, window_start_ms, window_end_ms)
50 .unwrap_or_default();
51
52 Ok(Inputs {
53 window_start_ms,
54 window_end_ms,
55 events,
56 files_touched,
57 skills_used,
58 tool_spans,
59 skills_used_recent_slugs,
60 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
61 skill_files_on_disk,
62 rule_files_on_disk,
63 rules_used_recent_slugs,
64 file_facts,
65 aggregates,
66 eval_scores,
67 prompt_fingerprints,
68 })
69}
70
71#[allow(clippy::too_many_arguments)]
73pub fn load_inputs_for_data_source(
74 store: &Store,
75 workspace_root: &Path,
76 workspace_key: &str,
77 start_ms: u64,
78 end_ms: u64,
79 source: DataSource,
80 team_id: Option<&str>,
81 workspace_hash: Option<&str>,
82) -> Result<Inputs> {
83 match source {
84 DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
85 DataSource::Provider => {
86 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
87 load_inputs_from_remote_cache(
88 store,
89 workspace_root,
90 workspace_key,
91 start_ms,
92 end_ms,
93 t,
94 wh,
95 )
96 } else {
97 load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
98 }
99 }
100 DataSource::Mixed => {
101 let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
102 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
103 for raw in store.list_remote_event_jsons(t, wh)? {
104 let o: OutboundEvent = serde_json::from_str(&raw)?;
105 if o.ts_ms < start_ms || o.ts_ms > end_ms {
106 continue;
107 }
108 i.events
109 .push(session_event_from_outbound(&o, workspace_key));
110 }
111 i.events.sort_by(|(a, ea), (b, eb)| {
112 ea.ts_ms
113 .cmp(&eb.ts_ms)
114 .then_with(|| a.id.cmp(&b.id))
115 .then_with(|| ea.seq.cmp(&eb.seq))
116 });
117 i.aggregates = build_aggregates(&i.events);
118 }
119 Ok(i)
120 }
121 }
122}
123
124fn event_kind_from_outbound(s: &str) -> EventKind {
125 match s {
126 "tool_call" => EventKind::ToolCall,
127 "tool_result" => EventKind::ToolResult,
128 "message" => EventKind::Message,
129 "error" => EventKind::Error,
130 "cost" => EventKind::Cost,
131 "hook" => EventKind::Hook,
132 _ => EventKind::Message,
133 }
134}
135
136fn event_source_from_outbound(s: &str) -> EventSource {
137 match s {
138 "tail" => EventSource::Tail,
139 "proxy" => EventSource::Proxy,
140 "hook" => EventSource::Hook,
141 _ => EventSource::Hook,
142 }
143}
144
145fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
146 let sid = format!("remote:{}", o.session_id_hash);
147 let session = SessionRecord {
148 id: sid.clone(),
149 agent: o.agent.clone(),
150 model: Some(o.model.clone()),
151 workspace: workspace_key.to_string(),
152 started_at_ms: o.ts_ms,
153 ended_at_ms: None,
154 status: SessionStatus::Done,
155 trace_path: String::new(),
156 start_commit: None,
157 end_commit: None,
158 branch: None,
159 dirty_start: None,
160 dirty_end: None,
161 repo_binding_source: None,
162 prompt_fingerprint: None,
163 };
164 let event = Event {
165 session_id: sid,
166 seq: o.event_seq,
167 ts_ms: o.ts_ms,
168 ts_exact: true,
169 kind: event_kind_from_outbound(&o.kind),
170 source: event_source_from_outbound(&o.source),
171 tool: o.tool.clone(),
172 tool_call_id: o.tool_call_id.clone(),
173 tokens_in: o.tokens_in,
174 tokens_out: o.tokens_out,
175 reasoning_tokens: o.reasoning_tokens,
176 cost_usd_e6: o.cost_usd_e6,
177 payload: o.payload.clone(),
178 };
179 (session, event)
180}
181
182fn load_inputs_from_remote_cache(
183 store: &Store,
184 workspace_root: &Path,
185 workspace_key: &str,
186 start_ms: u64,
187 end_ms: u64,
188 team_id: &str,
189 workspace_hash: &str,
190) -> Result<Inputs> {
191 let mut events = Vec::new();
192 for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
193 let o: OutboundEvent = serde_json::from_str(&raw)?;
194 if o.ts_ms < start_ms || o.ts_ms > end_ms {
195 continue;
196 }
197 events.push(session_event_from_outbound(&o, workspace_key));
198 }
199 events.sort_by(|(a, ea), (b, eb)| {
200 ea.ts_ms
201 .cmp(&eb.ts_ms)
202 .then_with(|| a.id.cmp(&b.id))
203 .then_with(|| ea.seq.cmp(&eb.seq))
204 });
205 let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
206 let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
207 let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
208 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
209 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
210 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
211 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
212 let file_facts = latest_file_facts(store, workspace_key)?;
213 let aggregates = build_aggregates(&events);
214 Ok(Inputs {
215 window_start_ms: start_ms,
216 window_end_ms: end_ms,
217 events,
218 files_touched: vec![],
219 skills_used: vec![],
220 tool_spans: vec![],
221 skills_used_recent_slugs,
222 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
223 skill_files_on_disk,
224 rule_files_on_disk,
225 rules_used_recent_slugs,
226 file_facts,
227 aggregates,
228 eval_scores: vec![],
229 prompt_fingerprints: vec![],
230 })
231}
232
233fn latest_file_facts(
234 store: &Store,
235 workspace: &str,
236) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
237 let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
238 return Ok(HashMap::new());
239 };
240 let facts = store.file_facts_for_snapshot(&snapshot.id)?;
241 Ok(facts
242 .into_iter()
243 .map(|fact| (fact.path.clone(), fact))
244 .collect())
245}
246
247pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
249 let skills_dir = workspace_root.join(".cursor/skills");
250 if !skills_dir.is_dir() {
251 return Ok(vec![]);
252 }
253 let mut out = Vec::new();
254 for entry in fs::read_dir(&skills_dir)? {
255 let entry = entry?;
256 if !entry.file_type()?.is_dir() {
257 continue;
258 }
259 let slug = entry.file_name().to_string_lossy().to_string();
260 let skill_md = entry.path().join("SKILL.md");
261 if !skill_md.is_file() {
262 continue;
263 }
264 let meta = fs::metadata(&skill_md)?;
265 let mtime_ms = meta
266 .modified()
267 .ok()
268 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
269 .map(|d| d.as_millis() as u64)
270 .unwrap_or(now_ms);
271 out.push(SkillFileOnDisk {
272 slug,
273 size_bytes: meta.len(),
274 mtime_ms,
275 });
276 }
277 out.sort_by(|a, b| a.slug.cmp(&b.slug));
278 Ok(out)
279}
280
281pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
283 let rules_dir = workspace_root.join(".cursor/rules");
284 if !rules_dir.is_dir() {
285 return Ok(vec![]);
286 }
287 let mut out = Vec::new();
288 for entry in fs::read_dir(&rules_dir)? {
289 let entry = entry?;
290 let path = entry.path();
291 if !path.is_file() {
292 continue;
293 }
294 if !path
295 .extension()
296 .and_then(|x| x.to_str())
297 .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
298 {
299 continue;
300 }
301 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
302 continue;
303 };
304 let slug = stem.to_string();
305 let meta = fs::metadata(&path)?;
306 let mtime_ms = meta
307 .modified()
308 .ok()
309 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
310 .map(|d| d.as_millis() as u64)
311 .unwrap_or(now_ms);
312 out.push(SkillFileOnDisk {
313 slug,
314 size_bytes: meta.len(),
315 mtime_ms,
316 });
317 }
318 out.sort_by(|a, b| a.slug.cmp(&b.slug));
319 Ok(out)
320}
321
322fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
323 let mut agg = RetroAggregates::default();
324 let mut model_once = HashSet::new();
325 for (s, e) in events {
326 agg.unique_session_ids.insert(s.id.clone());
327 if model_once.insert(s.id.clone()) {
328 let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
329 *agg.model_session_counts.entry(mkey).or_default() += 1;
330 }
331 if let Some(ref t) = e.tool {
332 *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
333 if let Some(c) = e.cost_usd_e6 {
334 *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
335 }
336 }
337 if let Some(c) = e.cost_usd_e6 {
338 agg.total_cost_usd_e6 += c;
339 }
340 }
341 agg
342}
343
344pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
346 let mut out = HashSet::new();
347 if !reports_dir.is_dir() {
348 return Ok(out);
349 }
350 for entry in fs::read_dir(reports_dir)? {
351 let entry = entry?;
352 let p = entry.path();
353 if p.extension().and_then(|x| x.to_str()) != Some("md") {
354 continue;
355 }
356 let raw = fs::read_to_string(&p).unwrap_or_default();
357 for line in raw.lines() {
358 let l = line.trim();
359 let Some(rest) = l.strip_prefix("### ") else {
360 continue;
361 };
362 let Some(open) = rest.rfind('(') else {
363 continue;
364 };
365 let Some(close) = rest.rfind(')') else {
366 continue;
367 };
368 if close <= open + 1 {
369 continue;
370 }
371 let id = rest[open + 1..close].trim();
372 if id.starts_with('H') && id.contains(':') {
373 out.insert(id.to_string());
374 }
375 }
376 }
377 Ok(out)
378}