Skip to main content

cc_token_usage/data/
loader.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::{HashMap, HashSet};
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{DataQuality, GlobalDataQuality, SessionData};
9use super::parser::parse_session_file;
10use super::scanner::{resolve_agent_parents, scan_claude_home, scan_projects_dir};
11
12/// Extract the Claude Code version string from the first line of a JSONL file.
13///
14/// Both `user` and `assistant` entries carry a `version` field at the top level.
15fn extract_version(path: &Path) -> Option<String> {
16    let file = File::open(path).ok()?;
17    let reader = BufReader::new(file);
18    let first_line = reader.lines().next()?.ok()?;
19    let val: serde_json::Value = serde_json::from_str(&first_line).ok()?;
20    val.get("version")
21        .and_then(|v| v.as_str())
22        .map(|s| s.to_string())
23}
24
25/// Compute the min and max timestamps from a slice of turns that have timestamps.
26fn time_range<'a, I>(timestamps: I) -> (Option<DateTime<Utc>>, Option<DateTime<Utc>>)
27where
28    I: Iterator<Item = &'a DateTime<Utc>>,
29{
30    let mut min: Option<DateTime<Utc>> = None;
31    let mut max: Option<DateTime<Utc>> = None;
32    for ts in timestamps {
33        min = Some(min.map_or(*ts, |m: DateTime<Utc>| m.min(*ts)));
34        max = Some(max.map_or(*ts, |m: DateTime<Utc>| m.max(*ts)));
35    }
36    (min, max)
37}
38
39/// Build a set of requestIds from the main session turns for cross-file dedup.
40fn request_id_set(turns: &[super::models::ValidatedTurn]) -> HashSet<String> {
41    turns
42        .iter()
43        .filter_map(|t| t.request_id.as_ref())
44        .cloned()
45        .collect()
46}
47
48/// Merge agent turns into a parent session, deduplicating by requestId.
49///
50/// Claude Code writes agent responses to both the main session file and the
51/// agent file. We keep the main session's copy and skip duplicates from agents.
52fn merge_agent_turns(parent: &mut SessionData, agent_turns: Vec<super::models::ValidatedTurn>, quality: &DataQuality) {
53    let existing_rids = request_id_set(&parent.turns);
54    let before = parent.agent_turns.len();
55
56    for turn in agent_turns {
57        let dominated = turn
58            .request_id
59            .as_ref()
60            .is_some_and(|rid| existing_rids.contains(rid));
61        if !dominated {
62            parent.agent_turns.push(turn);
63        }
64    }
65
66    let added = parent.agent_turns.len() - before;
67    let deduped = quality.valid_turns.saturating_sub(added);
68
69    // Accumulate agent quality into parent's quality
70    parent.quality.total_lines += quality.total_lines;
71    parent.quality.valid_turns += added;
72    parent.quality.skipped_synthetic += quality.skipped_synthetic;
73    parent.quality.skipped_sidechain += quality.skipped_sidechain;
74    parent.quality.skipped_invalid += quality.skipped_invalid;
75    parent.quality.skipped_parse_error += quality.skipped_parse_error;
76    parent.quality.duplicate_turns += quality.duplicate_turns + deduped;
77}
78
79/// Load all session data from a Claude home directory.
80///
81/// 1. Scans for JSONL files (main sessions + agents)
82/// 2. Resolves legacy agent parent relationships
83/// 3. Parses main sessions first, then merges agent turns into their parents
84/// 4. Computes global time range and quality metrics
85pub fn load_all(claude_home: &Path) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
86    // Step 1: Scan for all session files
87    let mut files = scan_claude_home(claude_home)
88        .context("failed to scan claude home for session files")?;
89
90    // Step 2: Resolve legacy agent parent relationships
91    resolve_agent_parents(&mut files)
92        .context("failed to resolve agent parent sessions")?;
93
94    // Partition into main sessions and agent files
95    let (main_files, agent_files): (Vec<_>, Vec<_>) =
96        files.into_iter().partition(|f| !f.is_agent);
97
98    let mut global_quality = GlobalDataQuality {
99        total_session_files: main_files.len(),
100        total_agent_files: agent_files.len(),
101        ..Default::default()
102    };
103
104    // Step 3a: Process all main sessions
105    let mut sessions: HashMap<String, SessionData> = HashMap::new();
106
107    for sf in &main_files {
108        let (turns, quality) = parse_session_file(&sf.file_path, false)
109            .with_context(|| format!("failed to parse session: {}", sf.file_path.display()))?;
110
111        let version = extract_version(&sf.file_path);
112
113        let (first_ts, last_ts) = time_range(turns.iter().map(|t| &t.timestamp));
114
115        global_quality.total_valid_turns += quality.valid_turns;
116        global_quality.total_skipped +=
117            quality.skipped_synthetic + quality.skipped_sidechain + quality.skipped_invalid + quality.skipped_parse_error;
118
119        let session = SessionData {
120            session_id: sf.session_id.clone(),
121            project: sf.project.clone(),
122            turns,
123            agent_turns: Vec::new(),
124            first_timestamp: first_ts,
125            last_timestamp: last_ts,
126            version,
127            quality,
128        };
129
130        sessions.insert(sf.session_id.clone(), session);
131    }
132
133    // Step 3b: Process agent files and merge into parent sessions
134    for sf in &agent_files {
135        let (agent_turns, quality) = parse_session_file(&sf.file_path, true)
136            .with_context(|| format!("failed to parse agent file: {}", sf.file_path.display()))?;
137
138        global_quality.total_valid_turns += quality.valid_turns;
139        global_quality.total_skipped +=
140            quality.skipped_synthetic + quality.skipped_sidechain + quality.skipped_invalid + quality.skipped_parse_error;
141
142        let target_id = match &sf.parent_session_id {
143            Some(parent_id) => {
144                if !sessions.contains_key(parent_id) {
145                    let project = sf.project.clone().or_else(|| Some("(orphan)".to_string()));
146                    sessions.insert(parent_id.clone(), SessionData {
147                        session_id: parent_id.clone(),
148                        project,
149                        turns: Vec::new(),
150                        agent_turns: Vec::new(),
151                        first_timestamp: None,
152                        last_timestamp: None,
153                        version: None,
154                        quality: DataQuality::default(),
155                    });
156                    global_quality.orphan_agents += 1;
157                }
158                parent_id.clone()
159            }
160            None => {
161                let virtual_id = sf.session_id.clone();
162                if !sessions.contains_key(&virtual_id) {
163                    let project = sf.project.clone().or_else(|| Some("(orphan)".to_string()));
164                    sessions.insert(virtual_id.clone(), SessionData {
165                        session_id: virtual_id.clone(),
166                        project,
167                        turns: Vec::new(),
168                        agent_turns: Vec::new(),
169                        first_timestamp: None,
170                        last_timestamp: None,
171                        version: None,
172                        quality: DataQuality::default(),
173                    });
174                    global_quality.orphan_agents += 1;
175                }
176                virtual_id
177            }
178        };
179
180        let parent = sessions.get_mut(&target_id).unwrap();
181        merge_agent_turns(parent, agent_turns, &quality);
182    }
183
184    // Step 4: Recompute time ranges to include agent turns, and collect results
185    let mut result: Vec<SessionData> = sessions.into_values().collect();
186
187    // Compute global time range
188    let mut global_min: Option<DateTime<Utc>> = None;
189    let mut global_max: Option<DateTime<Utc>> = None;
190
191    for session in &mut result {
192        // Recompute session time range including agent turns
193        let all_timestamps = session
194            .turns
195            .iter()
196            .chain(session.agent_turns.iter())
197            .map(|t| &t.timestamp);
198        let (first_ts, last_ts) = time_range(all_timestamps);
199        session.first_timestamp = first_ts;
200        session.last_timestamp = last_ts;
201
202        // Update global range
203        if let Some(ts) = first_ts {
204            global_min = Some(global_min.map_or(ts, |m: DateTime<Utc>| m.min(ts)));
205        }
206        if let Some(ts) = last_ts {
207            global_max = Some(global_max.map_or(ts, |m: DateTime<Utc>| m.max(ts)));
208        }
209    }
210
211    global_quality.time_range = match (global_min, global_max) {
212        (Some(min), Some(max)) => Some((min, max)),
213        _ => None,
214    };
215
216    Ok((result, global_quality))
217}
218
219/// Load all session data from a projects directory directly.
220///
221/// Unlike `load_all` which expects a Claude home directory (and appends `projects/`),
222/// this function takes the projects directory itself. Useful for loading data from
223/// archive directories like `~/.config/superpowers/conversation-archive/`.
224pub fn load_from_projects_dir(projects_dir: &Path) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
225    // Step 1: Scan for all session files directly from projects_dir
226    let mut files = scan_projects_dir(projects_dir)
227        .context("failed to scan projects dir for session files")?;
228
229    // Step 2: Resolve legacy agent parent relationships
230    resolve_agent_parents(&mut files)
231        .context("failed to resolve agent parent sessions")?;
232
233    // Partition into main sessions and agent files
234    let (main_files, agent_files): (Vec<_>, Vec<_>) =
235        files.into_iter().partition(|f| !f.is_agent);
236
237    let mut global_quality = GlobalDataQuality {
238        total_session_files: main_files.len(),
239        total_agent_files: agent_files.len(),
240        ..Default::default()
241    };
242
243    // Step 3a: Process all main sessions
244    let mut sessions: HashMap<String, SessionData> = HashMap::new();
245
246    for sf in &main_files {
247        let (turns, quality) = parse_session_file(&sf.file_path, false)
248            .with_context(|| format!("failed to parse session: {}", sf.file_path.display()))?;
249
250        let version = extract_version(&sf.file_path);
251        let (first_ts, last_ts) = time_range(turns.iter().map(|t| &t.timestamp));
252
253        global_quality.total_valid_turns += quality.valid_turns;
254        global_quality.total_skipped +=
255            quality.skipped_synthetic + quality.skipped_sidechain + quality.skipped_invalid + quality.skipped_parse_error;
256
257        let session = SessionData {
258            session_id: sf.session_id.clone(),
259            project: sf.project.clone(),
260            turns,
261            agent_turns: Vec::new(),
262            first_timestamp: first_ts,
263            last_timestamp: last_ts,
264            version,
265            quality,
266        };
267
268        sessions.insert(sf.session_id.clone(), session);
269    }
270
271    // Step 3b: Process agent files and merge into parent sessions
272    for sf in &agent_files {
273        let (agent_turns, quality) = parse_session_file(&sf.file_path, true)
274            .with_context(|| format!("failed to parse agent file: {}", sf.file_path.display()))?;
275
276        global_quality.total_valid_turns += quality.valid_turns;
277        global_quality.total_skipped +=
278            quality.skipped_synthetic + quality.skipped_sidechain + quality.skipped_invalid + quality.skipped_parse_error;
279
280        let target_id = match &sf.parent_session_id {
281            Some(parent_id) => {
282                if !sessions.contains_key(parent_id) {
283                    let project = sf.project.clone().or_else(|| Some("(orphan)".to_string()));
284                    sessions.insert(parent_id.clone(), SessionData {
285                        session_id: parent_id.clone(),
286                        project,
287                        turns: Vec::new(),
288                        agent_turns: Vec::new(),
289                        first_timestamp: None,
290                        last_timestamp: None,
291                        version: None,
292                        quality: DataQuality::default(),
293                    });
294                    global_quality.orphan_agents += 1;
295                }
296                parent_id.clone()
297            }
298            None => {
299                let virtual_id = sf.session_id.clone();
300                if !sessions.contains_key(&virtual_id) {
301                    let project = sf.project.clone().or_else(|| Some("(orphan)".to_string()));
302                    sessions.insert(virtual_id.clone(), SessionData {
303                        session_id: virtual_id.clone(),
304                        project,
305                        turns: Vec::new(),
306                        agent_turns: Vec::new(),
307                        first_timestamp: None,
308                        last_timestamp: None,
309                        version: None,
310                        quality: DataQuality::default(),
311                    });
312                    global_quality.orphan_agents += 1;
313                }
314                virtual_id
315            }
316        };
317
318        let parent = sessions.get_mut(&target_id).unwrap();
319        merge_agent_turns(parent, agent_turns, &quality);
320    }
321
322    // Step 4: Recompute time ranges to include agent turns, and collect results
323    let mut result: Vec<SessionData> = sessions.into_values().collect();
324
325    let mut global_min: Option<DateTime<Utc>> = None;
326    let mut global_max: Option<DateTime<Utc>> = None;
327
328    for session in &mut result {
329        let all_timestamps = session
330            .turns
331            .iter()
332            .chain(session.agent_turns.iter())
333            .map(|t| &t.timestamp);
334        let (first_ts, last_ts) = time_range(all_timestamps);
335        session.first_timestamp = first_ts;
336        session.last_timestamp = last_ts;
337
338        if let Some(ts) = first_ts {
339            global_min = Some(global_min.map_or(ts, |m: DateTime<Utc>| m.min(ts)));
340        }
341        if let Some(ts) = last_ts {
342            global_max = Some(global_max.map_or(ts, |m: DateTime<Utc>| m.max(ts)));
343        }
344    }
345
346    global_quality.time_range = match (global_min, global_max) {
347        (Some(min), Some(max)) => Some((min, max)),
348        _ => None,
349    };
350
351    Ok((result, global_quality))
352}