Skip to main content

cc_token_usage/data/
loader.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use rayon::prelude::*;
4use std::collections::{HashMap, HashSet};
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::path::Path;
8
9use super::models::{DataQuality, GlobalDataQuality, SessionData, SessionFile, SessionMetadata};
10use super::parser::parse_session_file;
11use super::scanner::{resolve_agent_parents, scan_claude_home};
12
13/// Extract the Claude Code version string from the first line of a JSONL file.
14///
15/// Both `user` and `assistant` entries carry a `version` field at the top level.
16fn extract_version(path: &Path) -> Option<String> {
17    let file = File::open(path).ok()?;
18    let reader = BufReader::new(file);
19    let first_line = reader.lines().next()?.ok()?;
20    let val: serde_json::Value = serde_json::from_str(&first_line).ok()?;
21    val.get("version")
22        .and_then(|v| v.as_str())
23        .map(|s| s.to_string())
24}
25
26/// Compute the min and max timestamps from a slice of turns that have timestamps.
27fn time_range<'a, I>(timestamps: I) -> (Option<DateTime<Utc>>, Option<DateTime<Utc>>)
28where
29    I: Iterator<Item = &'a DateTime<Utc>>,
30{
31    let mut min: Option<DateTime<Utc>> = None;
32    let mut max: Option<DateTime<Utc>> = None;
33    for ts in timestamps {
34        min = Some(min.map_or(*ts, |m: DateTime<Utc>| m.min(*ts)));
35        max = Some(max.map_or(*ts, |m: DateTime<Utc>| m.max(*ts)));
36    }
37    (min, max)
38}
39
40/// Build a set of requestIds from the main session turns for cross-file dedup.
41fn request_id_set(turns: &[super::models::ValidatedTurn]) -> HashSet<String> {
42    turns
43        .iter()
44        .filter_map(|t| t.request_id.as_ref())
45        .cloned()
46        .collect()
47}
48
49/// Merge agent turns into a parent session, deduplicating by requestId.
50///
51/// Claude Code writes agent responses to both the main session file and the
52/// agent file. We keep the main session's copy and skip duplicates from agents.
53fn merge_agent_turns(
54    parent: &mut SessionData,
55    agent_turns: Vec<super::models::ValidatedTurn>,
56    quality: &DataQuality,
57) {
58    let existing_rids = request_id_set(&parent.turns);
59    let before = parent.agent_turns.len();
60
61    for turn in agent_turns {
62        let dominated = turn
63            .request_id
64            .as_ref()
65            .is_some_and(|rid| existing_rids.contains(rid));
66        if !dominated {
67            parent.agent_turns.push(turn);
68        }
69    }
70
71    let added = parent.agent_turns.len() - before;
72    let deduped = quality.valid_turns.saturating_sub(added);
73
74    // Accumulate agent quality into parent's quality
75    parent.quality.total_lines += quality.total_lines;
76    parent.quality.valid_turns += added;
77    parent.quality.skipped_synthetic += quality.skipped_synthetic;
78    parent.quality.skipped_sidechain += quality.skipped_sidechain;
79    parent.quality.skipped_invalid += quality.skipped_invalid;
80    parent.quality.skipped_parse_error += quality.skipped_parse_error;
81    parent.quality.duplicate_turns += quality.duplicate_turns + deduped;
82}
83
84/// Load all session data from a Claude home directory.
85///
86/// 1. Scans for JSONL files (main sessions + agents)
87/// 2. Resolves legacy agent parent relationships
88/// 3. Parses main sessions first, then merges agent turns into their parents
89/// 4. Computes global time range and quality metrics
90pub fn load_all(claude_home: &Path) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
91    let mut files =
92        scan_claude_home(claude_home).context("failed to scan claude home for session files")?;
93    resolve_agent_parents(&mut files).context("failed to resolve agent parent sessions")?;
94    load_from_files(files)
95}
96
97/// Parsed result from a single main session file, ready for serial assembly.
98struct ParsedMain {
99    session_id: String,
100    project: Option<String>,
101    turns: Vec<super::models::ValidatedTurn>,
102    version: Option<String>,
103    first_ts: Option<DateTime<Utc>>,
104    last_ts: Option<DateTime<Utc>>,
105    quality: DataQuality,
106    metadata: SessionMetadata,
107}
108
109/// Parsed result from a single agent file, ready for serial merge.
110struct ParsedAgent {
111    target_id: String,
112    project: Option<String>,
113    turns: Vec<super::models::ValidatedTurn>,
114    quality: DataQuality,
115}
116
117/// Shared loading logic: partition files, parse sessions in parallel, merge agents, compute time ranges.
118fn load_from_files(files: Vec<SessionFile>) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
119    let (main_files, agent_files): (Vec<_>, Vec<_>) = files.into_iter().partition(|f| !f.is_agent);
120
121    let mut global_quality = GlobalDataQuality {
122        total_session_files: main_files.len(),
123        total_agent_files: agent_files.len(),
124        ..Default::default()
125    };
126
127    // ── Phase 1: Parse all main sessions in parallel ──────────────────────
128    let parsed_mains: Vec<Result<ParsedMain>> = main_files
129        .par_iter()
130        .map(|sf| {
131            let (turns, quality, metadata) = parse_session_file(&sf.path, false)
132                .with_context(|| format!("failed to parse session: {}", sf.path.display()))?;
133            let version = extract_version(&sf.path);
134            let (first_ts, last_ts) = time_range(turns.iter().map(|t| &t.timestamp));
135            Ok(ParsedMain {
136                session_id: sf.session_id.clone(),
137                project: sf.project.clone(),
138                turns,
139                version,
140                first_ts,
141                last_ts,
142                quality,
143                metadata,
144            })
145        })
146        .collect();
147
148    // Assemble the sessions map serially (cheap — just moving Vecs)
149    let mut sessions: HashMap<String, SessionData> = HashMap::with_capacity(parsed_mains.len());
150    for result in parsed_mains {
151        let pm = result?;
152        global_quality.total_valid_turns += pm.quality.valid_turns;
153        global_quality.total_skipped += pm.quality.skipped_synthetic
154            + pm.quality.skipped_sidechain
155            + pm.quality.skipped_invalid
156            + pm.quality.skipped_parse_error;
157
158        sessions.insert(
159            pm.session_id.clone(),
160            SessionData {
161                session_id: pm.session_id,
162                project: pm.project,
163                turns: pm.turns,
164                agent_turns: Vec::new(),
165                first_timestamp: pm.first_ts,
166                last_timestamp: pm.last_ts,
167                version: pm.version,
168                quality: pm.quality,
169                metadata: pm.metadata,
170            },
171        );
172    }
173
174    // ── Phase 2: Parse all agent files in parallel ────────────────────────
175    let parsed_agents: Vec<Result<ParsedAgent>> = agent_files
176        .par_iter()
177        .map(|sf| {
178            let (turns, quality, _agent_meta) = parse_session_file(&sf.path, true)
179                .with_context(|| format!("failed to parse agent file: {}", sf.path.display()))?;
180            let target_id = sf
181                .parent_session_id
182                .clone()
183                .unwrap_or_else(|| sf.session_id.clone());
184            Ok(ParsedAgent {
185                target_id,
186                project: sf.project.clone(),
187                turns,
188                quality,
189            })
190        })
191        .collect();
192
193    // Merge agent results into parent sessions serially (needs mutable HashMap)
194    for result in parsed_agents {
195        let pa = result?;
196
197        global_quality.total_valid_turns += pa.quality.valid_turns;
198        global_quality.total_skipped += pa.quality.skipped_synthetic
199            + pa.quality.skipped_sidechain
200            + pa.quality.skipped_invalid
201            + pa.quality.skipped_parse_error;
202
203        if !sessions.contains_key(&pa.target_id) {
204            let project = pa.project.or_else(|| Some("(orphan)".to_string()));
205            sessions.insert(
206                pa.target_id.clone(),
207                SessionData {
208                    session_id: pa.target_id.clone(),
209                    project,
210                    turns: Vec::new(),
211                    agent_turns: Vec::new(),
212                    first_timestamp: None,
213                    last_timestamp: None,
214                    version: None,
215                    quality: DataQuality::default(),
216                    metadata: SessionMetadata::default(),
217                },
218            );
219            global_quality.orphan_agents += 1;
220        }
221
222        let parent = sessions.get_mut(&pa.target_id).unwrap();
223        merge_agent_turns(parent, pa.turns, &pa.quality);
224    }
225
226    // ── Phase 3: Recompute time ranges (serial, cheap) ────────────────────
227    let mut result: Vec<SessionData> = sessions.into_values().collect();
228    // Sort by time descending (most recent first) for deterministic output
229    result.sort_by(|a, b| b.first_timestamp.cmp(&a.first_timestamp));
230    let mut global_min: Option<DateTime<Utc>> = None;
231    let mut global_max: Option<DateTime<Utc>> = None;
232
233    for session in &mut result {
234        let all_timestamps = session.all_responses();
235        let (first_ts, last_ts) = time_range(all_timestamps.iter().map(|t| &t.timestamp));
236        session.first_timestamp = first_ts;
237        session.last_timestamp = last_ts;
238
239        if let Some(ts) = first_ts {
240            global_min = Some(global_min.map_or(ts, |m: DateTime<Utc>| m.min(ts)));
241        }
242        if let Some(ts) = last_ts {
243            global_max = Some(global_max.map_or(ts, |m: DateTime<Utc>| m.max(ts)));
244        }
245    }
246
247    global_quality.time_range = match (global_min, global_max) {
248        (Some(min), Some(max)) => Some((min, max)),
249        _ => None,
250    };
251
252    Ok((result, global_quality))
253}