1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use rayon::prelude::*;
4use std::collections::{HashMap, HashSet};
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::path::Path;
8
9use super::models::{DataQuality, GlobalDataQuality, SessionData, SessionFile, SessionMetadata};
10use super::parser::parse_session_file;
11use super::scanner::{resolve_agent_parents, scan_claude_home};
12
13fn extract_version(path: &Path) -> Option<String> {
17 let file = File::open(path).ok()?;
18 let reader = BufReader::new(file);
19 let first_line = reader.lines().next()?.ok()?;
20 let val: serde_json::Value = serde_json::from_str(&first_line).ok()?;
21 val.get("version")
22 .and_then(|v| v.as_str())
23 .map(|s| s.to_string())
24}
25
26fn time_range<'a, I>(timestamps: I) -> (Option<DateTime<Utc>>, Option<DateTime<Utc>>)
28where
29 I: Iterator<Item = &'a DateTime<Utc>>,
30{
31 let mut min: Option<DateTime<Utc>> = None;
32 let mut max: Option<DateTime<Utc>> = None;
33 for ts in timestamps {
34 min = Some(min.map_or(*ts, |m: DateTime<Utc>| m.min(*ts)));
35 max = Some(max.map_or(*ts, |m: DateTime<Utc>| m.max(*ts)));
36 }
37 (min, max)
38}
39
40fn request_id_set(turns: &[super::models::ValidatedTurn]) -> HashSet<String> {
42 turns
43 .iter()
44 .filter_map(|t| t.request_id.as_ref())
45 .cloned()
46 .collect()
47}
48
49fn merge_agent_turns(
54 parent: &mut SessionData,
55 agent_turns: Vec<super::models::ValidatedTurn>,
56 quality: &DataQuality,
57) {
58 let existing_rids = request_id_set(&parent.turns);
59 let before = parent.agent_turns.len();
60
61 for turn in agent_turns {
62 let dominated = turn
63 .request_id
64 .as_ref()
65 .is_some_and(|rid| existing_rids.contains(rid));
66 if !dominated {
67 parent.agent_turns.push(turn);
68 }
69 }
70
71 let added = parent.agent_turns.len() - before;
72 let deduped = quality.valid_turns.saturating_sub(added);
73
74 parent.quality.total_lines += quality.total_lines;
76 parent.quality.valid_turns += added;
77 parent.quality.skipped_synthetic += quality.skipped_synthetic;
78 parent.quality.skipped_sidechain += quality.skipped_sidechain;
79 parent.quality.skipped_invalid += quality.skipped_invalid;
80 parent.quality.skipped_parse_error += quality.skipped_parse_error;
81 parent.quality.duplicate_turns += quality.duplicate_turns + deduped;
82}
83
84pub fn load_all(claude_home: &Path) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
91 let mut files =
92 scan_claude_home(claude_home).context("failed to scan claude home for session files")?;
93 resolve_agent_parents(&mut files).context("failed to resolve agent parent sessions")?;
94 load_from_files(files)
95}
96
97struct ParsedMain {
99 session_id: String,
100 project: Option<String>,
101 turns: Vec<super::models::ValidatedTurn>,
102 version: Option<String>,
103 first_ts: Option<DateTime<Utc>>,
104 last_ts: Option<DateTime<Utc>>,
105 quality: DataQuality,
106 metadata: SessionMetadata,
107}
108
109struct ParsedAgent {
111 target_id: String,
112 project: Option<String>,
113 turns: Vec<super::models::ValidatedTurn>,
114 quality: DataQuality,
115}
116
117fn load_from_files(files: Vec<SessionFile>) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
119 let (main_files, agent_files): (Vec<_>, Vec<_>) = files.into_iter().partition(|f| !f.is_agent);
120
121 let mut global_quality = GlobalDataQuality {
122 total_session_files: main_files.len(),
123 total_agent_files: agent_files.len(),
124 ..Default::default()
125 };
126
127 let parsed_mains: Vec<Result<ParsedMain>> = main_files
129 .par_iter()
130 .map(|sf| {
131 let (turns, quality, metadata) = parse_session_file(&sf.path, false)
132 .with_context(|| format!("failed to parse session: {}", sf.path.display()))?;
133 let version = extract_version(&sf.path);
134 let (first_ts, last_ts) = time_range(turns.iter().map(|t| &t.timestamp));
135 Ok(ParsedMain {
136 session_id: sf.session_id.clone(),
137 project: sf.project.clone(),
138 turns,
139 version,
140 first_ts,
141 last_ts,
142 quality,
143 metadata,
144 })
145 })
146 .collect();
147
148 let mut sessions: HashMap<String, SessionData> = HashMap::with_capacity(parsed_mains.len());
150 for result in parsed_mains {
151 let pm = result?;
152 global_quality.total_valid_turns += pm.quality.valid_turns;
153 global_quality.total_skipped += pm.quality.skipped_synthetic
154 + pm.quality.skipped_sidechain
155 + pm.quality.skipped_invalid
156 + pm.quality.skipped_parse_error;
157
158 sessions.insert(
159 pm.session_id.clone(),
160 SessionData {
161 session_id: pm.session_id,
162 project: pm.project,
163 turns: pm.turns,
164 agent_turns: Vec::new(),
165 first_timestamp: pm.first_ts,
166 last_timestamp: pm.last_ts,
167 version: pm.version,
168 quality: pm.quality,
169 metadata: pm.metadata,
170 },
171 );
172 }
173
174 let parsed_agents: Vec<Result<ParsedAgent>> = agent_files
176 .par_iter()
177 .map(|sf| {
178 let (turns, quality, _agent_meta) = parse_session_file(&sf.path, true)
179 .with_context(|| format!("failed to parse agent file: {}", sf.path.display()))?;
180 let target_id = sf
181 .parent_session_id
182 .clone()
183 .unwrap_or_else(|| sf.session_id.clone());
184 Ok(ParsedAgent {
185 target_id,
186 project: sf.project.clone(),
187 turns,
188 quality,
189 })
190 })
191 .collect();
192
193 for result in parsed_agents {
195 let pa = result?;
196
197 global_quality.total_valid_turns += pa.quality.valid_turns;
198 global_quality.total_skipped += pa.quality.skipped_synthetic
199 + pa.quality.skipped_sidechain
200 + pa.quality.skipped_invalid
201 + pa.quality.skipped_parse_error;
202
203 if !sessions.contains_key(&pa.target_id) {
204 let project = pa.project.or_else(|| Some("(orphan)".to_string()));
205 sessions.insert(
206 pa.target_id.clone(),
207 SessionData {
208 session_id: pa.target_id.clone(),
209 project,
210 turns: Vec::new(),
211 agent_turns: Vec::new(),
212 first_timestamp: None,
213 last_timestamp: None,
214 version: None,
215 quality: DataQuality::default(),
216 metadata: SessionMetadata::default(),
217 },
218 );
219 global_quality.orphan_agents += 1;
220 }
221
222 let parent = sessions.get_mut(&pa.target_id).unwrap();
223 merge_agent_turns(parent, pa.turns, &pa.quality);
224 }
225
226 let mut result: Vec<SessionData> = sessions.into_values().collect();
228 result.sort_by(|a, b| b.first_timestamp.cmp(&a.first_timestamp));
230 let mut global_min: Option<DateTime<Utc>> = None;
231 let mut global_max: Option<DateTime<Utc>> = None;
232
233 for session in &mut result {
234 let all_timestamps = session.all_responses();
235 let (first_ts, last_ts) = time_range(all_timestamps.iter().map(|t| &t.timestamp));
236 session.first_timestamp = first_ts;
237 session.last_timestamp = last_ts;
238
239 if let Some(ts) = first_ts {
240 global_min = Some(global_min.map_or(ts, |m: DateTime<Utc>| m.min(ts)));
241 }
242 if let Some(ts) = last_ts {
243 global_max = Some(global_max.map_or(ts, |m: DateTime<Utc>| m.max(ts)));
244 }
245 }
246
247 global_quality.time_range = match (global_min, global_max) {
248 (Some(min), Some(max)) => Some((min, max)),
249 _ => None,
250 };
251
252 Ok((result, global_quality))
253}