1use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk};
7use crate::store::Store;
8use crate::sync::outbound::OutboundEvent;
9use anyhow::Result;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::Path;
13
14const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
15
16pub fn load_inputs(
18 store: &Store,
19 workspace_root: &Path,
20 workspace_key: &str,
21 window_start_ms: u64,
22 window_end_ms: u64,
23) -> Result<Inputs> {
24 let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
25 let files_touched =
26 store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
27 let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
28 let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
29
30 let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
31 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
32 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
33
34 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
35 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
36
37 let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
38 let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
39 let file_facts = latest_file_facts(store, workspace_key)?;
40
41 let aggregates = build_aggregates(&events);
42 let eval_scores = store
43 .list_evals_in_window(window_start_ms, window_end_ms)
44 .unwrap_or_default()
45 .into_iter()
46 .map(|r| (r.session_id, r.score))
47 .collect();
48
49 Ok(Inputs {
50 window_start_ms,
51 window_end_ms,
52 events,
53 files_touched,
54 skills_used,
55 tool_spans,
56 skills_used_recent_slugs,
57 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
58 skill_files_on_disk,
59 rule_files_on_disk,
60 rules_used_recent_slugs,
61 file_facts,
62 aggregates,
63 eval_scores,
64 })
65}
66
67#[allow(clippy::too_many_arguments)]
69pub fn load_inputs_for_data_source(
70 store: &Store,
71 workspace_root: &Path,
72 workspace_key: &str,
73 start_ms: u64,
74 end_ms: u64,
75 source: DataSource,
76 team_id: Option<&str>,
77 workspace_hash: Option<&str>,
78) -> Result<Inputs> {
79 match source {
80 DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
81 DataSource::Provider => {
82 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
83 load_inputs_from_remote_cache(
84 store,
85 workspace_root,
86 workspace_key,
87 start_ms,
88 end_ms,
89 t,
90 wh,
91 )
92 } else {
93 load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
94 }
95 }
96 DataSource::Mixed => {
97 let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
98 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
99 for raw in store.list_remote_event_jsons(t, wh)? {
100 let o: OutboundEvent = serde_json::from_str(&raw)?;
101 if o.ts_ms < start_ms || o.ts_ms > end_ms {
102 continue;
103 }
104 i.events
105 .push(session_event_from_outbound(&o, workspace_key));
106 }
107 i.events.sort_by(|(a, ea), (b, eb)| {
108 ea.ts_ms
109 .cmp(&eb.ts_ms)
110 .then_with(|| a.id.cmp(&b.id))
111 .then_with(|| ea.seq.cmp(&eb.seq))
112 });
113 i.aggregates = build_aggregates(&i.events);
114 }
115 Ok(i)
116 }
117 }
118}
119
120fn event_kind_from_outbound(s: &str) -> EventKind {
121 match s {
122 "tool_call" => EventKind::ToolCall,
123 "tool_result" => EventKind::ToolResult,
124 "message" => EventKind::Message,
125 "error" => EventKind::Error,
126 "cost" => EventKind::Cost,
127 "hook" => EventKind::Hook,
128 _ => EventKind::Message,
129 }
130}
131
132fn event_source_from_outbound(s: &str) -> EventSource {
133 match s {
134 "tail" => EventSource::Tail,
135 "proxy" => EventSource::Proxy,
136 "hook" => EventSource::Hook,
137 _ => EventSource::Hook,
138 }
139}
140
141fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
142 let sid = format!("remote:{}", o.session_id_hash);
143 let session = SessionRecord {
144 id: sid.clone(),
145 agent: o.agent.clone(),
146 model: Some(o.model.clone()),
147 workspace: workspace_key.to_string(),
148 started_at_ms: o.ts_ms,
149 ended_at_ms: None,
150 status: SessionStatus::Done,
151 trace_path: String::new(),
152 start_commit: None,
153 end_commit: None,
154 branch: None,
155 dirty_start: None,
156 dirty_end: None,
157 repo_binding_source: None,
158 };
159 let event = Event {
160 session_id: sid,
161 seq: o.event_seq,
162 ts_ms: o.ts_ms,
163 ts_exact: true,
164 kind: event_kind_from_outbound(&o.kind),
165 source: event_source_from_outbound(&o.source),
166 tool: o.tool.clone(),
167 tool_call_id: o.tool_call_id.clone(),
168 tokens_in: o.tokens_in,
169 tokens_out: o.tokens_out,
170 reasoning_tokens: o.reasoning_tokens,
171 cost_usd_e6: o.cost_usd_e6,
172 payload: o.payload.clone(),
173 };
174 (session, event)
175}
176
177fn load_inputs_from_remote_cache(
178 store: &Store,
179 workspace_root: &Path,
180 workspace_key: &str,
181 start_ms: u64,
182 end_ms: u64,
183 team_id: &str,
184 workspace_hash: &str,
185) -> Result<Inputs> {
186 let mut events = Vec::new();
187 for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
188 let o: OutboundEvent = serde_json::from_str(&raw)?;
189 if o.ts_ms < start_ms || o.ts_ms > end_ms {
190 continue;
191 }
192 events.push(session_event_from_outbound(&o, workspace_key));
193 }
194 events.sort_by(|(a, ea), (b, eb)| {
195 ea.ts_ms
196 .cmp(&eb.ts_ms)
197 .then_with(|| a.id.cmp(&b.id))
198 .then_with(|| ea.seq.cmp(&eb.seq))
199 });
200 let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
201 let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
202 let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
203 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
204 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
205 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
206 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
207 let file_facts = latest_file_facts(store, workspace_key)?;
208 let aggregates = build_aggregates(&events);
209 Ok(Inputs {
210 window_start_ms: start_ms,
211 window_end_ms: end_ms,
212 events,
213 files_touched: vec![],
214 skills_used: vec![],
215 tool_spans: vec![],
216 skills_used_recent_slugs,
217 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
218 skill_files_on_disk,
219 rule_files_on_disk,
220 rules_used_recent_slugs,
221 file_facts,
222 aggregates,
223 eval_scores: vec![],
224 })
225}
226
227fn latest_file_facts(
228 store: &Store,
229 workspace: &str,
230) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
231 let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
232 return Ok(HashMap::new());
233 };
234 let facts = store.file_facts_for_snapshot(&snapshot.id)?;
235 Ok(facts
236 .into_iter()
237 .map(|fact| (fact.path.clone(), fact))
238 .collect())
239}
240
241pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
243 let skills_dir = workspace_root.join(".cursor/skills");
244 if !skills_dir.is_dir() {
245 return Ok(vec![]);
246 }
247 let mut out = Vec::new();
248 for entry in fs::read_dir(&skills_dir)? {
249 let entry = entry?;
250 if !entry.file_type()?.is_dir() {
251 continue;
252 }
253 let slug = entry.file_name().to_string_lossy().to_string();
254 let skill_md = entry.path().join("SKILL.md");
255 if !skill_md.is_file() {
256 continue;
257 }
258 let meta = fs::metadata(&skill_md)?;
259 let mtime_ms = meta
260 .modified()
261 .ok()
262 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
263 .map(|d| d.as_millis() as u64)
264 .unwrap_or(now_ms);
265 out.push(SkillFileOnDisk {
266 slug,
267 size_bytes: meta.len(),
268 mtime_ms,
269 });
270 }
271 out.sort_by(|a, b| a.slug.cmp(&b.slug));
272 Ok(out)
273}
274
275pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
277 let rules_dir = workspace_root.join(".cursor/rules");
278 if !rules_dir.is_dir() {
279 return Ok(vec![]);
280 }
281 let mut out = Vec::new();
282 for entry in fs::read_dir(&rules_dir)? {
283 let entry = entry?;
284 let path = entry.path();
285 if !path.is_file() {
286 continue;
287 }
288 if !path
289 .extension()
290 .and_then(|x| x.to_str())
291 .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
292 {
293 continue;
294 }
295 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
296 continue;
297 };
298 let slug = stem.to_string();
299 let meta = fs::metadata(&path)?;
300 let mtime_ms = meta
301 .modified()
302 .ok()
303 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
304 .map(|d| d.as_millis() as u64)
305 .unwrap_or(now_ms);
306 out.push(SkillFileOnDisk {
307 slug,
308 size_bytes: meta.len(),
309 mtime_ms,
310 });
311 }
312 out.sort_by(|a, b| a.slug.cmp(&b.slug));
313 Ok(out)
314}
315
316fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
317 let mut agg = RetroAggregates::default();
318 let mut model_once = HashSet::new();
319 for (s, e) in events {
320 agg.unique_session_ids.insert(s.id.clone());
321 if model_once.insert(s.id.clone()) {
322 let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
323 *agg.model_session_counts.entry(mkey).or_default() += 1;
324 }
325 if let Some(ref t) = e.tool {
326 *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
327 if let Some(c) = e.cost_usd_e6 {
328 *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
329 }
330 }
331 if let Some(c) = e.cost_usd_e6 {
332 agg.total_cost_usd_e6 += c;
333 }
334 }
335 agg
336}
337
338pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
340 let mut out = HashSet::new();
341 if !reports_dir.is_dir() {
342 return Ok(out);
343 }
344 for entry in fs::read_dir(reports_dir)? {
345 let entry = entry?;
346 let p = entry.path();
347 if p.extension().and_then(|x| x.to_str()) != Some("md") {
348 continue;
349 }
350 let raw = fs::read_to_string(&p).unwrap_or_default();
351 for line in raw.lines() {
352 let l = line.trim();
353 let Some(rest) = l.strip_prefix("### ") else {
354 continue;
355 };
356 let Some(open) = rest.rfind('(') else {
357 continue;
358 };
359 let Some(close) = rest.rfind(')') else {
360 continue;
361 };
362 if close <= open + 1 {
363 continue;
364 }
365 let id = rest[open + 1..close].trim();
366 if id.starts_with('H') && id.contains(':') {
367 out.insert(id.to_string());
368 }
369 }
370 }
371 Ok(out)
372}