1use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk};
7use crate::store::Store;
8use crate::sync::outbound::OutboundEvent;
9use anyhow::Result;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::Path;
13
14const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
15
16pub fn load_inputs(
18 store: &Store,
19 workspace_root: &Path,
20 workspace_key: &str,
21 window_start_ms: u64,
22 window_end_ms: u64,
23) -> Result<Inputs> {
24 let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
25 let files_touched =
26 store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
27 let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
28 let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
29
30 let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
31 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
32 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
33
34 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
35 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
36
37 let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
38 let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
39 let file_facts = latest_file_facts(store, workspace_key)?;
40
41 let aggregates = build_aggregates(&events);
42 let eval_scores = store
43 .list_evals_in_window(window_start_ms, window_end_ms)
44 .unwrap_or_default()
45 .into_iter()
46 .map(|r| (r.session_id, r.score))
47 .collect();
48 let prompt_fingerprints = store
49 .sessions_with_prompt_fingerprint(workspace_key, window_start_ms, window_end_ms)
50 .unwrap_or_default();
51
52 let feedback = store
53 .list_feedback_in_window(window_start_ms, window_end_ms)
54 .unwrap_or_default();
55 Ok(Inputs {
56 window_start_ms,
57 window_end_ms,
58 events,
59 files_touched,
60 skills_used,
61 tool_spans,
62 skills_used_recent_slugs,
63 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
64 skill_files_on_disk,
65 rule_files_on_disk,
66 rules_used_recent_slugs,
67 file_facts,
68 aggregates,
69 eval_scores,
70 prompt_fingerprints,
71 feedback,
72 })
73}
74
75#[allow(clippy::too_many_arguments)]
77pub fn load_inputs_for_data_source(
78 store: &Store,
79 workspace_root: &Path,
80 workspace_key: &str,
81 start_ms: u64,
82 end_ms: u64,
83 source: DataSource,
84 team_id: Option<&str>,
85 workspace_hash: Option<&str>,
86) -> Result<Inputs> {
87 match source {
88 DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
89 DataSource::Provider => {
90 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
91 load_inputs_from_remote_cache(
92 store,
93 workspace_root,
94 workspace_key,
95 start_ms,
96 end_ms,
97 t,
98 wh,
99 )
100 } else {
101 load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
102 }
103 }
104 DataSource::Mixed => {
105 let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
106 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
107 for raw in store.list_remote_event_jsons(t, wh)? {
108 let o: OutboundEvent = serde_json::from_str(&raw)?;
109 if o.ts_ms < start_ms || o.ts_ms > end_ms {
110 continue;
111 }
112 i.events
113 .push(session_event_from_outbound(&o, workspace_key));
114 }
115 i.events.sort_by(|(a, ea), (b, eb)| {
116 ea.ts_ms
117 .cmp(&eb.ts_ms)
118 .then_with(|| a.id.cmp(&b.id))
119 .then_with(|| ea.seq.cmp(&eb.seq))
120 });
121 i.aggregates = build_aggregates(&i.events);
122 }
123 Ok(i)
124 }
125 }
126}
127
128fn event_kind_from_outbound(s: &str) -> EventKind {
129 match s {
130 "tool_call" => EventKind::ToolCall,
131 "tool_result" => EventKind::ToolResult,
132 "message" => EventKind::Message,
133 "error" => EventKind::Error,
134 "cost" => EventKind::Cost,
135 "hook" => EventKind::Hook,
136 _ => EventKind::Message,
137 }
138}
139
140fn event_source_from_outbound(s: &str) -> EventSource {
141 match s {
142 "tail" => EventSource::Tail,
143 "proxy" => EventSource::Proxy,
144 "hook" => EventSource::Hook,
145 _ => EventSource::Hook,
146 }
147}
148
149fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
150 let sid = format!("remote:{}", o.session_id_hash);
151 let session = SessionRecord {
152 id: sid.clone(),
153 agent: o.agent.clone(),
154 model: Some(o.model.clone()),
155 workspace: workspace_key.to_string(),
156 started_at_ms: o.ts_ms,
157 ended_at_ms: None,
158 status: SessionStatus::Done,
159 trace_path: String::new(),
160 start_commit: None,
161 end_commit: None,
162 branch: None,
163 dirty_start: None,
164 dirty_end: None,
165 repo_binding_source: None,
166 prompt_fingerprint: None,
167 };
168 let event = Event {
169 session_id: sid,
170 seq: o.event_seq,
171 ts_ms: o.ts_ms,
172 ts_exact: true,
173 kind: event_kind_from_outbound(&o.kind),
174 source: event_source_from_outbound(&o.source),
175 tool: o.tool.clone(),
176 tool_call_id: o.tool_call_id.clone(),
177 tokens_in: o.tokens_in,
178 tokens_out: o.tokens_out,
179 reasoning_tokens: o.reasoning_tokens,
180 cost_usd_e6: o.cost_usd_e6,
181 payload: o.payload.clone(),
182 };
183 (session, event)
184}
185
186fn load_inputs_from_remote_cache(
187 store: &Store,
188 workspace_root: &Path,
189 workspace_key: &str,
190 start_ms: u64,
191 end_ms: u64,
192 team_id: &str,
193 workspace_hash: &str,
194) -> Result<Inputs> {
195 let mut events = Vec::new();
196 for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
197 let o: OutboundEvent = serde_json::from_str(&raw)?;
198 if o.ts_ms < start_ms || o.ts_ms > end_ms {
199 continue;
200 }
201 events.push(session_event_from_outbound(&o, workspace_key));
202 }
203 events.sort_by(|(a, ea), (b, eb)| {
204 ea.ts_ms
205 .cmp(&eb.ts_ms)
206 .then_with(|| a.id.cmp(&b.id))
207 .then_with(|| ea.seq.cmp(&eb.seq))
208 });
209 let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
210 let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
211 let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
212 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
213 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
214 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
215 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
216 let file_facts = latest_file_facts(store, workspace_key)?;
217 let aggregates = build_aggregates(&events);
218 Ok(Inputs {
219 window_start_ms: start_ms,
220 window_end_ms: end_ms,
221 events,
222 files_touched: vec![],
223 skills_used: vec![],
224 tool_spans: vec![],
225 skills_used_recent_slugs,
226 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
227 skill_files_on_disk,
228 rule_files_on_disk,
229 rules_used_recent_slugs,
230 file_facts,
231 aggregates,
232 eval_scores: vec![],
233 prompt_fingerprints: vec![],
234 feedback: vec![],
235 })
236}
237
238fn latest_file_facts(
239 store: &Store,
240 workspace: &str,
241) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
242 let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
243 return Ok(HashMap::new());
244 };
245 let facts = store.file_facts_for_snapshot(&snapshot.id)?;
246 Ok(facts
247 .into_iter()
248 .map(|fact| (fact.path.clone(), fact))
249 .collect())
250}
251
252pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
254 let skills_dir = workspace_root.join(".cursor/skills");
255 if !skills_dir.is_dir() {
256 return Ok(vec![]);
257 }
258 let mut out = Vec::new();
259 for entry in fs::read_dir(&skills_dir)? {
260 let entry = entry?;
261 if !entry.file_type()?.is_dir() {
262 continue;
263 }
264 let slug = entry.file_name().to_string_lossy().to_string();
265 let skill_md = entry.path().join("SKILL.md");
266 if !skill_md.is_file() {
267 continue;
268 }
269 let meta = fs::metadata(&skill_md)?;
270 let mtime_ms = meta
271 .modified()
272 .ok()
273 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
274 .map(|d| d.as_millis() as u64)
275 .unwrap_or(now_ms);
276 out.push(SkillFileOnDisk {
277 slug,
278 size_bytes: meta.len(),
279 mtime_ms,
280 });
281 }
282 out.sort_by(|a, b| a.slug.cmp(&b.slug));
283 Ok(out)
284}
285
286pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
288 let rules_dir = workspace_root.join(".cursor/rules");
289 if !rules_dir.is_dir() {
290 return Ok(vec![]);
291 }
292 let mut out = Vec::new();
293 for entry in fs::read_dir(&rules_dir)? {
294 let entry = entry?;
295 let path = entry.path();
296 if !path.is_file() {
297 continue;
298 }
299 if !path
300 .extension()
301 .and_then(|x| x.to_str())
302 .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
303 {
304 continue;
305 }
306 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
307 continue;
308 };
309 let slug = stem.to_string();
310 let meta = fs::metadata(&path)?;
311 let mtime_ms = meta
312 .modified()
313 .ok()
314 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
315 .map(|d| d.as_millis() as u64)
316 .unwrap_or(now_ms);
317 out.push(SkillFileOnDisk {
318 slug,
319 size_bytes: meta.len(),
320 mtime_ms,
321 });
322 }
323 out.sort_by(|a, b| a.slug.cmp(&b.slug));
324 Ok(out)
325}
326
327fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
328 let mut agg = RetroAggregates::default();
329 let mut model_once = HashSet::new();
330 for (s, e) in events {
331 agg.unique_session_ids.insert(s.id.clone());
332 if model_once.insert(s.id.clone()) {
333 let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
334 *agg.model_session_counts.entry(mkey).or_default() += 1;
335 }
336 if let Some(ref t) = e.tool {
337 *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
338 if let Some(c) = e.cost_usd_e6 {
339 *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
340 }
341 }
342 if let Some(c) = e.cost_usd_e6 {
343 agg.total_cost_usd_e6 += c;
344 }
345 }
346 agg
347}
348
349pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
351 let mut out = HashSet::new();
352 if !reports_dir.is_dir() {
353 return Ok(out);
354 }
355 for entry in fs::read_dir(reports_dir)? {
356 let entry = entry?;
357 let p = entry.path();
358 if p.extension().and_then(|x| x.to_str()) != Some("md") {
359 continue;
360 }
361 let raw = fs::read_to_string(&p).unwrap_or_default();
362 for line in raw.lines() {
363 let l = line.trim();
364 let Some(rest) = l.strip_prefix("### ") else {
365 continue;
366 };
367 let Some(open) = rest.rfind('(') else {
368 continue;
369 };
370 let Some(close) = rest.rfind(')') else {
371 continue;
372 };
373 if close <= open + 1 {
374 continue;
375 }
376 let id = rest[open + 1..close].trim();
377 if id.starts_with('H') && id.contains(':') {
378 out.insert(id.to_string());
379 }
380 }
381 }
382 Ok(out)
383}