1use crate::core::data_source::DataSource;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::retro::types::{Inputs, RetroAggregates, SkillFileOnDisk};
7use crate::store::Store;
8use crate::sync::outbound::OutboundEvent;
9use anyhow::Result;
10use std::collections::{HashMap, HashSet};
11use std::fs;
12use std::path::Path;
13
14const USAGE_LOOKBACK_MIN_DAYS: u64 = 30;
15
16pub fn load_inputs(
18 store: &Store,
19 workspace_root: &Path,
20 workspace_key: &str,
21 window_start_ms: u64,
22 window_end_ms: u64,
23) -> Result<Inputs> {
24 let events = store.retro_events_in_window(workspace_key, window_start_ms, window_end_ms)?;
25 let files_touched =
26 store.files_touched_in_window(workspace_key, window_start_ms, window_end_ms)?;
27 let skills_used = store.skills_used_in_window(workspace_key, window_start_ms, window_end_ms)?;
28 let tool_spans = store.tool_spans_in_window(workspace_key, window_start_ms, window_end_ms)?;
29
30 let lookback_start = window_end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
31 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
32 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
33
34 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
35 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
36
37 let skill_files_on_disk = scan_skill_files(workspace_root, window_end_ms)?;
38 let rule_files_on_disk = scan_rule_files(workspace_root, window_end_ms)?;
39 let file_facts = latest_file_facts(store, workspace_key)?;
40
41 let aggregates = build_aggregates(&events);
42
43 Ok(Inputs {
44 window_start_ms,
45 window_end_ms,
46 events,
47 files_touched,
48 skills_used,
49 tool_spans,
50 skills_used_recent_slugs,
51 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
52 skill_files_on_disk,
53 rule_files_on_disk,
54 rules_used_recent_slugs,
55 file_facts,
56 aggregates,
57 })
58}
59
60#[allow(clippy::too_many_arguments)]
62pub fn load_inputs_for_data_source(
63 store: &Store,
64 workspace_root: &Path,
65 workspace_key: &str,
66 start_ms: u64,
67 end_ms: u64,
68 source: DataSource,
69 team_id: Option<&str>,
70 workspace_hash: Option<&str>,
71) -> Result<Inputs> {
72 match source {
73 DataSource::Local => load_inputs(store, workspace_root, workspace_key, start_ms, end_ms),
74 DataSource::Provider => {
75 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
76 load_inputs_from_remote_cache(
77 store,
78 workspace_root,
79 workspace_key,
80 start_ms,
81 end_ms,
82 t,
83 wh,
84 )
85 } else {
86 load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)
87 }
88 }
89 DataSource::Mixed => {
90 let mut i = load_inputs(store, workspace_root, workspace_key, start_ms, end_ms)?;
91 if let (Some(t), Some(wh)) = (team_id, workspace_hash) {
92 for raw in store.list_remote_event_jsons(t, wh)? {
93 let o: OutboundEvent = serde_json::from_str(&raw)?;
94 if o.ts_ms < start_ms || o.ts_ms > end_ms {
95 continue;
96 }
97 i.events
98 .push(session_event_from_outbound(&o, workspace_key));
99 }
100 i.events.sort_by(|(a, ea), (b, eb)| {
101 ea.ts_ms
102 .cmp(&eb.ts_ms)
103 .then_with(|| a.id.cmp(&b.id))
104 .then_with(|| ea.seq.cmp(&eb.seq))
105 });
106 i.aggregates = build_aggregates(&i.events);
107 }
108 Ok(i)
109 }
110 }
111}
112
113fn event_kind_from_outbound(s: &str) -> EventKind {
114 match s {
115 "tool_call" => EventKind::ToolCall,
116 "tool_result" => EventKind::ToolResult,
117 "message" => EventKind::Message,
118 "error" => EventKind::Error,
119 "cost" => EventKind::Cost,
120 "hook" => EventKind::Hook,
121 _ => EventKind::Message,
122 }
123}
124
125fn event_source_from_outbound(s: &str) -> EventSource {
126 match s {
127 "tail" => EventSource::Tail,
128 "proxy" => EventSource::Proxy,
129 "hook" => EventSource::Hook,
130 _ => EventSource::Hook,
131 }
132}
133
134fn session_event_from_outbound(o: &OutboundEvent, workspace_key: &str) -> (SessionRecord, Event) {
135 let sid = format!("remote:{}", o.session_id_hash);
136 let session = SessionRecord {
137 id: sid.clone(),
138 agent: o.agent.clone(),
139 model: Some(o.model.clone()),
140 workspace: workspace_key.to_string(),
141 started_at_ms: o.ts_ms,
142 ended_at_ms: None,
143 status: SessionStatus::Done,
144 trace_path: String::new(),
145 start_commit: None,
146 end_commit: None,
147 branch: None,
148 dirty_start: None,
149 dirty_end: None,
150 repo_binding_source: None,
151 };
152 let event = Event {
153 session_id: sid,
154 seq: o.event_seq,
155 ts_ms: o.ts_ms,
156 ts_exact: true,
157 kind: event_kind_from_outbound(&o.kind),
158 source: event_source_from_outbound(&o.source),
159 tool: o.tool.clone(),
160 tool_call_id: o.tool_call_id.clone(),
161 tokens_in: o.tokens_in,
162 tokens_out: o.tokens_out,
163 reasoning_tokens: o.reasoning_tokens,
164 cost_usd_e6: o.cost_usd_e6,
165 payload: o.payload.clone(),
166 };
167 (session, event)
168}
169
170fn load_inputs_from_remote_cache(
171 store: &Store,
172 workspace_root: &Path,
173 workspace_key: &str,
174 start_ms: u64,
175 end_ms: u64,
176 team_id: &str,
177 workspace_hash: &str,
178) -> Result<Inputs> {
179 let mut events = Vec::new();
180 for raw in store.list_remote_event_jsons(team_id, workspace_hash)? {
181 let o: OutboundEvent = serde_json::from_str(&raw)?;
182 if o.ts_ms < start_ms || o.ts_ms > end_ms {
183 continue;
184 }
185 events.push(session_event_from_outbound(&o, workspace_key));
186 }
187 events.sort_by(|(a, ea), (b, eb)| {
188 ea.ts_ms
189 .cmp(&eb.ts_ms)
190 .then_with(|| a.id.cmp(&b.id))
191 .then_with(|| ea.seq.cmp(&eb.seq))
192 });
193 let skill_files_on_disk = scan_skill_files(workspace_root, end_ms)?;
194 let rule_files_on_disk = scan_rule_files(workspace_root, end_ms)?;
195 let lookback_start = end_ms.saturating_sub(USAGE_LOOKBACK_MIN_DAYS * 86_400_000);
196 let recent_slugs_list = store.skills_used_since(workspace_key, lookback_start)?;
197 let skills_used_recent_slugs: HashSet<String> = recent_slugs_list.into_iter().collect();
198 let rules_recent_list = store.rules_used_since(workspace_key, lookback_start)?;
199 let rules_used_recent_slugs: HashSet<String> = rules_recent_list.into_iter().collect();
200 let file_facts = latest_file_facts(store, workspace_key)?;
201 let aggregates = build_aggregates(&events);
202 Ok(Inputs {
203 window_start_ms: start_ms,
204 window_end_ms: end_ms,
205 events,
206 files_touched: vec![],
207 skills_used: vec![],
208 tool_spans: vec![],
209 skills_used_recent_slugs,
210 usage_lookback_ms: USAGE_LOOKBACK_MIN_DAYS * 86_400_000,
211 skill_files_on_disk,
212 rule_files_on_disk,
213 rules_used_recent_slugs,
214 file_facts,
215 aggregates,
216 })
217}
218
219fn latest_file_facts(
220 store: &Store,
221 workspace: &str,
222) -> Result<HashMap<String, crate::metrics::types::FileFact>> {
223 let Some(snapshot) = store.latest_repo_snapshot(workspace)? else {
224 return Ok(HashMap::new());
225 };
226 let facts = store.file_facts_for_snapshot(&snapshot.id)?;
227 Ok(facts
228 .into_iter()
229 .map(|fact| (fact.path.clone(), fact))
230 .collect())
231}
232
233pub fn scan_skill_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
235 let skills_dir = workspace_root.join(".cursor/skills");
236 if !skills_dir.is_dir() {
237 return Ok(vec![]);
238 }
239 let mut out = Vec::new();
240 for entry in fs::read_dir(&skills_dir)? {
241 let entry = entry?;
242 if !entry.file_type()?.is_dir() {
243 continue;
244 }
245 let slug = entry.file_name().to_string_lossy().to_string();
246 let skill_md = entry.path().join("SKILL.md");
247 if !skill_md.is_file() {
248 continue;
249 }
250 let meta = fs::metadata(&skill_md)?;
251 let mtime_ms = meta
252 .modified()
253 .ok()
254 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
255 .map(|d| d.as_millis() as u64)
256 .unwrap_or(now_ms);
257 out.push(SkillFileOnDisk {
258 slug,
259 size_bytes: meta.len(),
260 mtime_ms,
261 });
262 }
263 out.sort_by(|a, b| a.slug.cmp(&b.slug));
264 Ok(out)
265}
266
267pub fn scan_rule_files(workspace_root: &Path, now_ms: u64) -> Result<Vec<SkillFileOnDisk>> {
269 let rules_dir = workspace_root.join(".cursor/rules");
270 if !rules_dir.is_dir() {
271 return Ok(vec![]);
272 }
273 let mut out = Vec::new();
274 for entry in fs::read_dir(&rules_dir)? {
275 let entry = entry?;
276 let path = entry.path();
277 if !path.is_file() {
278 continue;
279 }
280 if !path
281 .extension()
282 .and_then(|x| x.to_str())
283 .is_some_and(|e| e.eq_ignore_ascii_case("mdc"))
284 {
285 continue;
286 }
287 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
288 continue;
289 };
290 let slug = stem.to_string();
291 let meta = fs::metadata(&path)?;
292 let mtime_ms = meta
293 .modified()
294 .ok()
295 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
296 .map(|d| d.as_millis() as u64)
297 .unwrap_or(now_ms);
298 out.push(SkillFileOnDisk {
299 slug,
300 size_bytes: meta.len(),
301 mtime_ms,
302 });
303 }
304 out.sort_by(|a, b| a.slug.cmp(&b.slug));
305 Ok(out)
306}
307
308fn build_aggregates(events: &[(SessionRecord, crate::core::event::Event)]) -> RetroAggregates {
309 let mut agg = RetroAggregates::default();
310 let mut model_once = HashSet::new();
311 for (s, e) in events {
312 agg.unique_session_ids.insert(s.id.clone());
313 if model_once.insert(s.id.clone()) {
314 let mkey = s.model.clone().unwrap_or_else(|| "unknown".into());
315 *agg.model_session_counts.entry(mkey).or_default() += 1;
316 }
317 if let Some(ref t) = e.tool {
318 *agg.tool_event_counts.entry(t.clone()).or_default() += 1;
319 if let Some(c) = e.cost_usd_e6 {
320 *agg.tool_cost_usd_e6.entry(t.clone()).or_default() += c;
321 }
322 }
323 if let Some(c) = e.cost_usd_e6 {
324 agg.total_cost_usd_e6 += c;
325 }
326 }
327 agg
328}
329
330pub fn prior_bet_fingerprints(reports_dir: &Path) -> Result<HashSet<String>> {
332 let mut out = HashSet::new();
333 if !reports_dir.is_dir() {
334 return Ok(out);
335 }
336 for entry in fs::read_dir(reports_dir)? {
337 let entry = entry?;
338 let p = entry.path();
339 if p.extension().and_then(|x| x.to_str()) != Some("md") {
340 continue;
341 }
342 let raw = fs::read_to_string(&p).unwrap_or_default();
343 for line in raw.lines() {
344 let l = line.trim();
345 let Some(rest) = l.strip_prefix("### ") else {
346 continue;
347 };
348 let Some(open) = rest.rfind('(') else {
349 continue;
350 };
351 let Some(close) = rest.rfind(')') else {
352 continue;
353 };
354 if close <= open + 1 {
355 continue;
356 }
357 let id = rest[open + 1..close].trim();
358 if id.starts_with('H') && id.contains(':') {
359 out.insert(id.to_string());
360 }
361 }
362 }
363 Ok(out)
364}