Skip to main content

cc_token_usage/data/
loader.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use rayon::prelude::*;
4use std::collections::{HashMap, HashSet};
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::path::{Path, PathBuf};
8
9use super::models::{
10    DataQuality, GlobalDataQuality, HookUsage, PluginUsage, SessionData, SessionFile,
11    SessionMetadata, SkillUsage, Subagent,
12};
13use super::parser::parse_session_file;
14use super::scanner::{resolve_agent_parents, scan_claude_home};
15use crate::pricing::calculator::PricingCalculator;
16
17/// Extract the Claude Code version string from the first line of a JSONL file.
18///
19/// Both `user` and `assistant` entries carry a `version` field at the top level.
20fn extract_version(path: &Path) -> Option<String> {
21    let file = File::open(path).ok()?;
22    let reader = BufReader::new(file);
23    let first_line = reader.lines().next()?.ok()?;
24    let val: serde_json::Value = serde_json::from_str(&first_line).ok()?;
25    val.get("version")
26        .and_then(|v| v.as_str())
27        .map(|s| s.to_string())
28}
29
30/// Compute the min and max timestamps from a slice of turns that have timestamps.
31fn time_range<'a, I>(timestamps: I) -> (Option<DateTime<Utc>>, Option<DateTime<Utc>>)
32where
33    I: Iterator<Item = &'a DateTime<Utc>>,
34{
35    let mut min: Option<DateTime<Utc>> = None;
36    let mut max: Option<DateTime<Utc>> = None;
37    for ts in timestamps {
38        min = Some(min.map_or(*ts, |m: DateTime<Utc>| m.min(*ts)));
39        max = Some(max.map_or(*ts, |m: DateTime<Utc>| m.max(*ts)));
40    }
41    (min, max)
42}
43
44/// Build a set of requestIds from the main session turns for cross-file dedup.
45fn request_id_set(turns: &[super::models::ValidatedTurn]) -> HashSet<String> {
46    turns
47        .iter()
48        .filter_map(|t| t.request_id.as_ref())
49        .cloned()
50        .collect()
51}
52
53/// Load all session data from a Claude home directory.
54///
55/// 1. Scans for JSONL files (main sessions + agents)
56/// 2. Resolves legacy agent parent relationships
57/// 3. Parses main sessions in parallel; groups agent files by `agent_id` into
58///    `Subagent` entries on their parent session
59/// 4. Aggregates plugins / skills from main turns and hooks from main session
60///    `stop_hook_summary` entries (Claude Code 2.1.104+)
61/// 5. Computes global time range and quality metrics
62///
63/// The `PricingCalculator` is used to populate per-plugin / per-skill `cost`
64/// fields on the aggregated metadata. Cost / token totals on the underlying
65/// turns are untouched.
66pub fn load_all(
67    claude_home: &Path,
68    calc: &PricingCalculator,
69) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
70    let mut files =
71        scan_claude_home(claude_home).context("failed to scan claude home for session files")?;
72    resolve_agent_parents(&mut files).context("failed to resolve agent parent sessions")?;
73    load_from_files(files, claude_home, calc)
74}
75
76/// Parsed result from a single main session file, ready for serial assembly.
77struct ParsedMain {
78    session_id: String,
79    project: Option<String>,
80    turns: Vec<super::models::ValidatedTurn>,
81    version: Option<String>,
82    first_ts: Option<DateTime<Utc>>,
83    last_ts: Option<DateTime<Utc>>,
84    quality: DataQuality,
85    metadata: SessionMetadata,
86    hooks: Vec<HookUsage>,
87}
88
89/// Parsed result from a single agent file. One `ParsedAgent` becomes one
90/// `Subagent` under its parent session.
91struct ParsedAgent {
92    /// The parent session this subagent belongs to.
93    target_id: String,
94    /// Project context (used only when the parent main session is missing).
95    project: Option<String>,
96    /// The subagent ID, taken verbatim from the agent JSONL file stem
97    /// (e.g. `"agent-abc123"`).
98    agent_id: String,
99    /// Path to the agent file (used to derive `.meta.json` lookups if needed).
100    #[allow(dead_code)]
101    path: PathBuf,
102    turns: Vec<super::models::ValidatedTurn>,
103    quality: DataQuality,
104    /// The workflow run id (`wf_<runId>`) this agent file belongs to, if it was
105    /// discovered under `subagents/workflows/wf_<runId>/`. `None` for ordinary
106    /// (legacy / Task-tool) subagent files.
107    workflow_run_id: Option<String>,
108}
109
110/// Shared loading logic: parse files in parallel, group agent turns into
111/// `Subagent` entries, aggregate plugins/skills/hooks, compute time ranges.
112fn load_from_files(
113    files: Vec<SessionFile>,
114    claude_home: &Path,
115    calc: &PricingCalculator,
116) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
117    let (main_files, agent_files): (Vec<_>, Vec<_>) = files.into_iter().partition(|f| !f.is_agent);
118
119    let mut global_quality = GlobalDataQuality {
120        total_session_files: main_files.len(),
121        total_agent_files: agent_files.len(),
122        ..Default::default()
123    };
124
125    // ── Phase 1: Parse all main sessions in parallel ──────────────────────
126    let parsed_mains: Vec<Result<ParsedMain>> = main_files
127        .par_iter()
128        .map(|sf| {
129            let (turns, quality, metadata, hooks) = parse_session_file(&sf.path, false)
130                .with_context(|| format!("failed to parse session: {}", sf.path.display()))?;
131            let version = extract_version(&sf.path);
132            let (first_ts, last_ts) = time_range(turns.iter().map(|t| &t.timestamp));
133            Ok(ParsedMain {
134                session_id: sf.session_id.clone(),
135                project: sf.project.clone(),
136                turns,
137                version,
138                first_ts,
139                last_ts,
140                quality,
141                metadata,
142                hooks,
143            })
144        })
145        .collect();
146
147    // Assemble the sessions map serially (cheap — just moving Vecs)
148    let mut sessions: HashMap<String, SessionData> = HashMap::with_capacity(parsed_mains.len());
149    for result in parsed_mains {
150        let pm = result?;
151        global_quality.total_valid_turns += pm.quality.valid_turns;
152        global_quality.total_skipped += pm.quality.skipped_synthetic
153            + pm.quality.skipped_sidechain
154            + pm.quality.skipped_invalid
155            + pm.quality.skipped_parse_error;
156
157        sessions.insert(
158            pm.session_id.clone(),
159            SessionData {
160                session_id: pm.session_id,
161                project: pm.project,
162                turns: pm.turns,
163                subagents: Vec::new(),
164                plugins: Vec::new(),
165                skills: Vec::new(),
166                hooks: pm.hooks,
167                first_timestamp: pm.first_ts,
168                last_timestamp: pm.last_ts,
169                version: pm.version,
170                quality: pm.quality,
171                metadata: pm.metadata,
172                is_orphan: false,
173            },
174        );
175    }
176
177    // ── Phase 2: Parse all agent files in parallel ────────────────────────
178    let parsed_agents: Vec<Result<ParsedAgent>> = agent_files
179        .par_iter()
180        .map(|sf| {
181            let (turns, quality, _meta, _hooks) = parse_session_file(&sf.path, true)
182                .with_context(|| format!("failed to parse agent file: {}", sf.path.display()))?;
183            let target_id = sf
184                .parent_session_id
185                .clone()
186                .unwrap_or_else(|| sf.session_id.clone());
187            Ok(ParsedAgent {
188                target_id,
189                project: sf.project.clone(),
190                agent_id: sf.session_id.clone(),
191                path: sf.path.clone(),
192                turns,
193                quality,
194                workflow_run_id: sf.workflow_run_id.clone(),
195            })
196        })
197        .collect();
198
199    // Group agent results into a per-parent map: target_id -> Vec<ParsedAgent>
200    let mut agents_by_parent: HashMap<String, Vec<ParsedAgent>> = HashMap::new();
201    for result in parsed_agents {
202        let pa = result?;
203        global_quality.total_valid_turns += pa.quality.valid_turns;
204        global_quality.total_skipped += pa.quality.skipped_synthetic
205            + pa.quality.skipped_sidechain
206            + pa.quality.skipped_invalid
207            + pa.quality.skipped_parse_error;
208        agents_by_parent
209            .entry(pa.target_id.clone())
210            .or_default()
211            .push(pa);
212    }
213
214    // Merge each parent's agents into Subagent records.
215    for (target_id, agents) in agents_by_parent {
216        // Ensure parent session exists (create orphan placeholder if missing).
217        if !sessions.contains_key(&target_id) {
218            let project = agents
219                .iter()
220                .find_map(|a| a.project.clone())
221                .or_else(|| Some("(orphan)".to_string()));
222            sessions.insert(
223                target_id.clone(),
224                SessionData {
225                    session_id: target_id.clone(),
226                    project,
227                    turns: Vec::new(),
228                    subagents: Vec::new(),
229                    plugins: Vec::new(),
230                    skills: Vec::new(),
231                    hooks: Vec::new(),
232                    first_timestamp: None,
233                    last_timestamp: None,
234                    version: None,
235                    quality: DataQuality::default(),
236                    metadata: SessionMetadata::default(),
237                    is_orphan: true,
238                },
239            );
240            global_quality.orphan_agents += 1;
241        }
242
243        // Load .meta.json sidecars once per parent. Keys are stripped of the
244        // "agent-" prefix (matching cc-session-jsonl::load_agent_meta).
245        // The first-level loader only scans `subagents/agent-*.meta.json`; merge
246        // in the workflow-agent sidecars under `subagents/workflows/wf_*/` so
247        // workflow agents also surface their agentType. First-level entries win
248        // on key collisions (none expected — agent ids are unique).
249        let mut agent_meta_map = crate::data::scanner::load_agent_meta(&target_id, claude_home);
250        for (k, v) in crate::data::scanner::load_workflow_agent_meta(&target_id, claude_home) {
251            agent_meta_map.entry(k).or_insert(v);
252        }
253
254        let parent = sessions.get_mut(&target_id).unwrap();
255        let existing_rids = request_id_set(&parent.turns);
256
257        // Build subagents in deterministic order: sorted by agent_id.
258        let mut agents = agents;
259        agents.sort_by(|a, b| a.agent_id.cmp(&b.agent_id));
260
261        for pa in agents {
262            // Cross-file dedup: drop turns whose requestId already appears in
263            // the main session (Claude Code writes agent responses to both
264            // the agent file and the main file).
265            let mut kept_count = 0usize;
266            let mut dropped_count = 0usize;
267            let mut kept_turns: Vec<super::models::ValidatedTurn> =
268                Vec::with_capacity(pa.turns.len());
269            for turn in pa.turns {
270                let dominated = turn
271                    .request_id
272                    .as_ref()
273                    .is_some_and(|rid| existing_rids.contains(rid));
274                if dominated {
275                    dropped_count += 1;
276                } else {
277                    kept_count += 1;
278                    kept_turns.push(turn);
279                }
280            }
281
282            // Accumulate quality into parent's quality (same accounting the
283            // legacy merge_agent_turns helper used).
284            parent.quality.total_lines += pa.quality.total_lines;
285            parent.quality.valid_turns += kept_count;
286            parent.quality.skipped_synthetic += pa.quality.skipped_synthetic;
287            parent.quality.skipped_sidechain += pa.quality.skipped_sidechain;
288            parent.quality.skipped_invalid += pa.quality.skipped_invalid;
289            parent.quality.skipped_parse_error += pa.quality.skipped_parse_error;
290            parent.quality.duplicate_turns += pa.quality.duplicate_turns + dropped_count;
291
292            // Compute per-subagent time range and sort turns by timestamp.
293            kept_turns.sort_by_key(|t| t.timestamp);
294            let (first_ts, last_ts) = time_range(kept_turns.iter().map(|t| &t.timestamp));
295
296            // .meta.json key is the agent_id WITHOUT the "agent-" prefix.
297            let meta_key = pa
298                .agent_id
299                .strip_prefix("agent-")
300                .unwrap_or(&pa.agent_id)
301                .to_string();
302            let (agent_type, description) = agent_meta_map
303                .get(&meta_key)
304                .map(|(t, d)| (Some(t.clone()), Some(d.clone())))
305                .unwrap_or((None, None));
306
307            parent.subagents.push(Subagent {
308                agent_id: pa.agent_id,
309                agent_type,
310                description,
311                turns: kept_turns,
312                first_timestamp: first_ts,
313                last_timestamp: last_ts,
314                workflow_run_id: pa.workflow_run_id,
315            });
316        }
317    }
318
319    // ── Phase 3: Aggregate plugins / skills (main session turns only) ─────
320    for session in sessions.values_mut() {
321        session.plugins = aggregate_plugins(&session.turns, calc);
322        session.skills = aggregate_skills(&session.turns, calc);
323    }
324
325    // ── Phase 4: Recompute time ranges (serial, cheap) ────────────────────
326    let mut result: Vec<SessionData> = sessions.into_values().collect();
327    // Sort by time descending (most recent first) for deterministic output
328    result.sort_by_key(|b| std::cmp::Reverse(b.first_timestamp));
329    let mut global_min: Option<DateTime<Utc>> = None;
330    let mut global_max: Option<DateTime<Utc>> = None;
331
332    for session in &mut result {
333        let all_timestamps = session.all_responses();
334        let (first_ts, last_ts) = time_range(all_timestamps.iter().map(|t| &t.timestamp));
335        session.first_timestamp = first_ts;
336        session.last_timestamp = last_ts;
337
338        if let Some(ts) = first_ts {
339            global_min = Some(global_min.map_or(ts, |m: DateTime<Utc>| m.min(ts)));
340        }
341        if let Some(ts) = last_ts {
342            global_max = Some(global_max.map_or(ts, |m: DateTime<Utc>| m.max(ts)));
343        }
344    }
345
346    global_quality.time_range = match (global_min, global_max) {
347        (Some(min), Some(max)) => Some((min, max)),
348        _ => None,
349    };
350
351    Ok((result, global_quality))
352}
353
354/// Aggregate per-plugin usage from a main session's turns.
355///
356/// Groups turns by `attribution_plugin` (skipping `None`). Output Vec is
357/// sorted by plugin name for deterministic JSON output.
358fn aggregate_plugins(
359    turns: &[super::models::ValidatedTurn],
360    calc: &PricingCalculator,
361) -> Vec<PluginUsage> {
362    let mut acc: HashMap<String, PluginUsage> = HashMap::new();
363    for turn in turns {
364        let plugin = match turn.attribution_plugin.as_deref() {
365            Some(p) if !p.is_empty() => p,
366            _ => continue,
367        };
368        let cost = calc.calculate_turn_cost(&turn.model, &turn.usage).total;
369        let input = turn.usage.input_tokens.unwrap_or(0);
370        let output = turn.usage.output_tokens.unwrap_or(0);
371        let entry = acc
372            .entry(plugin.to_string())
373            .or_insert_with(|| PluginUsage {
374                plugin: plugin.to_string(),
375                turns: 0,
376                cost: 0.0,
377                input_tokens: 0,
378                output_tokens: 0,
379            });
380        entry.turns += 1;
381        entry.cost += cost;
382        entry.input_tokens += input;
383        entry.output_tokens += output;
384    }
385    let mut out: Vec<PluginUsage> = acc.into_values().collect();
386    out.sort_by(|a, b| a.plugin.cmp(&b.plugin));
387    out
388}
389
390/// Aggregate per-skill usage from a main session's turns.
391///
392/// Mirror of `aggregate_plugins` but keyed on `attribution_skill`.
393fn aggregate_skills(
394    turns: &[super::models::ValidatedTurn],
395    calc: &PricingCalculator,
396) -> Vec<SkillUsage> {
397    let mut acc: HashMap<String, SkillUsage> = HashMap::new();
398    for turn in turns {
399        let skill = match turn.attribution_skill.as_deref() {
400            Some(s) if !s.is_empty() => s,
401            _ => continue,
402        };
403        let cost = calc.calculate_turn_cost(&turn.model, &turn.usage).total;
404        let input = turn.usage.input_tokens.unwrap_or(0);
405        let output = turn.usage.output_tokens.unwrap_or(0);
406        let entry = acc.entry(skill.to_string()).or_insert_with(|| SkillUsage {
407            skill: skill.to_string(),
408            turns: 0,
409            cost: 0.0,
410            input_tokens: 0,
411            output_tokens: 0,
412        });
413        entry.turns += 1;
414        entry.cost += cost;
415        entry.input_tokens += input;
416        entry.output_tokens += output;
417    }
418    let mut out: Vec<SkillUsage> = acc.into_values().collect();
419    out.sort_by(|a, b| a.skill.cmp(&b.skill));
420    out
421}
422
423// ─── Tests ───────────────────────────────────────────────────────────────────
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use crate::data::models::{TokenUsage, ValidatedTurn};
429    use std::fs;
430    use tempfile::TempDir;
431
432    /// Helper to build a minimal `ValidatedTurn` with optional attribution fields.
433    fn turn(
434        ts: &str,
435        cost_input: u64,
436        cost_output: u64,
437        attribution: Option<(&str, &str)>,
438    ) -> ValidatedTurn {
439        ValidatedTurn {
440            uuid: format!("u-{ts}"),
441            request_id: Some(format!("r-{ts}")),
442            timestamp: ts.parse().unwrap(),
443            model: "claude-opus-4-6".into(),
444            usage: TokenUsage {
445                input_tokens: Some(cost_input),
446                output_tokens: Some(cost_output),
447                cache_creation_input_tokens: Some(0),
448                cache_read_input_tokens: Some(0),
449                cache_creation: None,
450                server_tool_use: None,
451                service_tier: None,
452                speed: None,
453                inference_geo: None,
454            },
455            stop_reason: None,
456            content_types: vec![],
457            is_agent: false,
458            agent_id: None,
459            user_text: None,
460            assistant_text: None,
461            tool_names: vec![],
462            service_tier: None,
463            speed: None,
464            inference_geo: None,
465            tool_error_count: 0,
466            git_branch: None,
467            attribution_plugin: attribution.map(|(p, _)| p.to_string()),
468            attribution_skill: attribution.map(|(_, s)| s.to_string()),
469        }
470    }
471
472    #[test]
473    fn pipeline_plugins_skills_aggregation() {
474        // Three turns, two share a plugin, one is unattributed.
475        let turns = vec![
476            turn(
477                "2026-05-01T00:00:00Z",
478                10,
479                20,
480                Some(("superpowers", "superpowers:brainstorming")),
481            ),
482            turn(
483                "2026-05-01T00:00:01Z",
484                30,
485                40,
486                Some(("superpowers", "superpowers:brainstorming")),
487            ),
488            turn("2026-05-01T00:00:02Z", 1, 2, None),
489        ];
490        let calc = PricingCalculator::new();
491        let plugins = aggregate_plugins(&turns, &calc);
492        let skills = aggregate_skills(&turns, &calc);
493
494        assert_eq!(plugins.len(), 1, "two plugin turns should fold to one row");
495        assert_eq!(plugins[0].plugin, "superpowers");
496        assert_eq!(plugins[0].turns, 2);
497        assert_eq!(plugins[0].input_tokens, 40);
498        assert_eq!(plugins[0].output_tokens, 60);
499
500        assert_eq!(skills.len(), 1);
501        assert_eq!(skills[0].skill, "superpowers:brainstorming");
502        assert_eq!(skills[0].turns, 2);
503
504        // Costs equal across plugin/skill rollups because both fields are set on
505        // the same two turns.
506        assert!((plugins[0].cost - skills[0].cost).abs() < 1e-9);
507    }
508
509    #[test]
510    fn pipeline_plugins_empty_when_no_attribution() {
511        let turns = vec![
512            turn("2026-05-01T00:00:00Z", 10, 20, None),
513            turn("2026-05-01T00:00:01Z", 30, 40, None),
514        ];
515        let calc = PricingCalculator::new();
516        assert!(aggregate_plugins(&turns, &calc).is_empty());
517        assert!(aggregate_skills(&turns, &calc).is_empty());
518    }
519
520    /// Lay down a fake `~/.claude/projects/<project>/<uuid>.jsonl` plus two
521    /// agent files under `subagents/`. Verify the pipeline groups them as
522    /// `Subagent` records with correct turn counts, metadata, and aggregation
523    /// invariants.
524    fn write_fixture_session() -> (TempDir, String) {
525        let tmp = TempDir::new().unwrap();
526        let project = tmp.path().join("projects").join("-Users-test-proj");
527        fs::create_dir_all(&project).unwrap();
528
529        let session_uuid = "11111111-2222-3333-4444-555555555555";
530        let main_path = project.join(format!("{}.jsonl", session_uuid));
531
532        // Two valid main turns. requestIds r-main-1, r-main-2.
533        // The second carries attributionPlugin/Skill.
534        let main_turn_1 = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"11111111-2222-3333-4444-555555555555","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
535        let main_turn_2 = r#"{"type":"assistant","uuid":"m2","timestamp":"2026-05-01T10:01:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":30,"output_tokens":40,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"bye"}]},"sessionId":"11111111-2222-3333-4444-555555555555","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-2","attributionPlugin":"superpowers","attributionSkill":"superpowers:brainstorming"}"#;
536        // One stop_hook_summary system entry.
537        let main_hook = r#"{"type":"system","subtype":"stop_hook_summary","hookCount":1,"hookInfos":[{"command":"bash hook.sh","durationMs":50}],"hookErrors":[],"preventedContinuation":false,"sessionId":"11111111-2222-3333-4444-555555555555"}"#;
538        fs::write(
539            &main_path,
540            format!("{}\n{}\n{}\n", main_turn_1, main_turn_2, main_hook),
541        )
542        .unwrap();
543
544        // Subagents directory with two agent files and one .meta.json sidecar.
545        let subagents_dir = project.join(session_uuid).join("subagents");
546        fs::create_dir_all(&subagents_dir).unwrap();
547
548        // Agent A: 2 unique turns. r-agentA-1, r-agentA-2.
549        let agent_a_turn_1 = r#"{"type":"assistant","uuid":"a1","timestamp":"2026-05-01T10:02:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":5,"output_tokens":10,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"agent-a-1"}]},"sessionId":"agent-aaa1","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-agentA-1"}"#;
550        let agent_a_turn_2 = r#"{"type":"assistant","uuid":"a2","timestamp":"2026-05-01T10:03:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":7,"output_tokens":11,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"agent-a-2"}]},"sessionId":"agent-aaa1","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-agentA-2"}"#;
551        fs::write(
552            subagents_dir.join("agent-aaa1.jsonl"),
553            format!("{}\n{}\n", agent_a_turn_1, agent_a_turn_2),
554        )
555        .unwrap();
556        // Sidecar — note that the .meta.json key strips the "agent-" prefix.
557        fs::write(
558            subagents_dir.join("agent-aaa1.meta.json"),
559            r#"{"agentType":"builder","description":"Implement Phase 2"}"#,
560        )
561        .unwrap();
562
563        // Agent B: 1 turn that *also* appears in the main session by requestId
564        // (cross-file dup) and 1 unique turn.
565        let agent_b_dup = r#"{"type":"assistant","uuid":"b1","timestamp":"2026-05-01T10:04:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":100,"output_tokens":200,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"dup"}]},"sessionId":"agent-bbb2","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-main-2"}"#; // same rid as main_turn_2
566        let agent_b_unique = r#"{"type":"assistant","uuid":"b2","timestamp":"2026-05-01T10:05:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":4,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"unique"}]},"sessionId":"agent-bbb2","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-agentB-2"}"#;
567        fs::write(
568            subagents_dir.join("agent-bbb2.jsonl"),
569            format!("{}\n{}\n", agent_b_dup, agent_b_unique),
570        )
571        .unwrap();
572        // No meta.json for agent B (verify None fallback).
573
574        (tmp, session_uuid.to_string())
575    }
576
577    #[test]
578    fn pipeline_subagents_grouping_and_meta_injection() {
579        let (tmp, session_uuid) = write_fixture_session();
580        let calc = PricingCalculator::new();
581        let (sessions, _quality) = load_all(tmp.path(), &calc).unwrap();
582
583        assert_eq!(sessions.len(), 1);
584        let s = &sessions[0];
585        assert_eq!(s.session_id, session_uuid);
586
587        // Two subagents, sorted by agent_id (aaa1 < bbb2).
588        assert_eq!(s.subagents.len(), 2, "two agent files -> two subagents");
589        assert_eq!(s.subagents[0].agent_id, "agent-aaa1");
590        assert_eq!(s.subagents[1].agent_id, "agent-bbb2");
591
592        // Agent A: 2 unique turns, .meta.json hydrated.
593        assert_eq!(s.subagents[0].turns.len(), 2);
594        assert_eq!(s.subagents[0].agent_type.as_deref(), Some("builder"));
595        assert_eq!(
596            s.subagents[0].description.as_deref(),
597            Some("Implement Phase 2")
598        );
599        assert!(s.subagents[0].first_timestamp.is_some());
600        assert!(s.subagents[0].last_timestamp.is_some());
601
602        // Agent B: cross-file dedup drops the duplicate (r-main-2) -> 1 unique turn.
603        // No .meta.json -> agent_type/description are None.
604        assert_eq!(
605            s.subagents[1].turns.len(),
606            1,
607            "cross-file dedup should drop the duplicate"
608        );
609        assert!(s.subagents[1].agent_type.is_none());
610        assert!(s.subagents[1].description.is_none());
611
612        // Main session has 2 turns.
613        assert_eq!(s.turns.len(), 2);
614
615        // Plugins / skills aggregated from main turns only (1 turn carries attribution).
616        assert_eq!(s.plugins.len(), 1);
617        assert_eq!(s.plugins[0].plugin, "superpowers");
618        assert_eq!(s.plugins[0].turns, 1);
619        assert_eq!(s.skills.len(), 1);
620        assert_eq!(s.skills[0].skill, "superpowers:brainstorming");
621
622        // Hooks aggregated from main session.
623        assert_eq!(s.hooks.len(), 1);
624        assert_eq!(s.hooks[0].command, "bash hook.sh");
625        assert_eq!(s.hooks[0].invocations, 1);
626        assert_eq!(s.hooks[0].total_duration_ms, 50);
627        assert_eq!(s.hooks[0].error_count, 0);
628        assert_eq!(s.hooks[0].prevented_continuation_count, 0);
629
630        // total_turn_count / agent_turn_count derive from nested subagents.
631        assert_eq!(s.total_turn_count(), 2 + 2 + 1); // main + agent-A + agent-B(deduped)
632        assert_eq!(s.agent_turn_count(), 3);
633    }
634
635    #[test]
636    fn pipeline_aggregation_invariants() {
637        // The 5 spec invariants (section 2.6) bundled into one comprehensive test.
638        let (tmp, _session_uuid) = write_fixture_session();
639        let calc = PricingCalculator::new();
640        let (sessions, _quality) = load_all(tmp.path(), &calc).unwrap();
641        let s = &sessions[0];
642
643        // (1) Reorganization lossless: sum(subagent.turns) equals the number we
644        // accept after cross-file dedup (2 from agent-A + 1 unique from agent-B).
645        let total_sub_turns: usize = s.subagents.iter().map(|sa| sa.turns.len()).sum();
646        assert_eq!(total_sub_turns, s.agent_turn_count());
647        assert_eq!(total_sub_turns, 3);
648
649        // (2) Plugin aggregation no-miss/no-double: sum(plugins.turns) equals
650        // number of main turns with attribution_plugin set.
651        let attributed_turns = s
652            .turns
653            .iter()
654            .filter(|t| t.attribution_plugin.is_some())
655            .count() as u64;
656        let plugin_turn_sum: u64 = s.plugins.iter().map(|p| p.turns).sum();
657        assert_eq!(plugin_turn_sum, attributed_turns);
658
659        // (3) Upper bound: plugin cost <= session main turn cost.
660        let session_turn_cost: f64 = s
661            .turns
662            .iter()
663            .map(|t| calc.calculate_turn_cost(&t.model, &t.usage).total)
664            .sum();
665        let plugin_cost: f64 = s.plugins.iter().map(|p| p.cost).sum();
666        assert!(
667            plugin_cost <= session_turn_cost + 1e-9,
668            "plugin cost {plugin_cost} must be <= session turn cost {session_turn_cost}"
669        );
670
671        // (4) Hook total: every hookInfos[] element in every stop_hook_summary
672        // SystemEntry is counted. Because hooks are grouped by command, the
673        // total invocations sum equals sum(hookInfos[].len()) across all
674        // SystemEntries — which on observed 2.1.104+ data also equals
675        // sum(SystemEntry.hookCount). Asserting a literal count here would
676        // bind the test to a single SystemEntry's fixture; the parser-side
677        // `debug_assert_eq!` (parser.rs) already guards the hookCount ==
678        // hookInfos.len() invariant. Here we only assert the lower bound.
679        let hook_invocations: u64 = s.hooks.iter().map(|h| h.invocations).sum();
680        assert!(
681            hook_invocations >= 1,
682            "expected at least one hook invocation in fixture"
683        );
684
685        // (5) Hypothesis regression: no subagent turn carries attribution.
686        for sa in &s.subagents {
687            for t in &sa.turns {
688                assert!(
689                    t.attribution_plugin.is_none(),
690                    "subagent turn unexpectedly has attributionPlugin"
691                );
692                assert!(
693                    t.attribution_skill.is_none(),
694                    "subagent turn unexpectedly has attributionSkill"
695                );
696            }
697        }
698    }
699
700    #[test]
701    fn pipeline_hooks_aggregation_multi_invocation() {
702        // Build a fixture with the SAME command running 3 times, where one
703        // invocation has errors and another prevents continuation.
704        let tmp = TempDir::new().unwrap();
705        let project = tmp.path().join("projects").join("-Users-test-proj");
706        fs::create_dir_all(&project).unwrap();
707        let uuid = "22222222-3333-4444-5555-666666666666";
708
709        // One assistant turn so the session has some content (otherwise the
710        // session has no first_timestamp).
711        let asst = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"22222222-3333-4444-5555-666666666666","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
712        // Three stop_hook_summary entries: same command, varying flags.
713        let h1 = r#"{"type":"system","subtype":"stop_hook_summary","hookCount":1,"hookInfos":[{"command":"bash run.sh","durationMs":100}],"hookErrors":[],"preventedContinuation":false,"sessionId":"22222222-3333-4444-5555-666666666666"}"#;
714        let h2 = r#"{"type":"system","subtype":"stop_hook_summary","hookCount":1,"hookInfos":[{"command":"bash run.sh","durationMs":200}],"hookErrors":[{"msg":"oops"}],"preventedContinuation":false,"sessionId":"22222222-3333-4444-5555-666666666666"}"#;
715        let h3 = r#"{"type":"system","subtype":"stop_hook_summary","hookCount":1,"hookInfos":[{"command":"bash run.sh","durationMs":300}],"hookErrors":[],"preventedContinuation":true,"sessionId":"22222222-3333-4444-5555-666666666666"}"#;
716        fs::write(
717            project.join(format!("{}.jsonl", uuid)),
718            format!("{asst}\n{h1}\n{h2}\n{h3}\n"),
719        )
720        .unwrap();
721
722        let calc = PricingCalculator::new();
723        let (sessions, _q) = load_all(tmp.path(), &calc).unwrap();
724        assert_eq!(sessions.len(), 1);
725        let s = &sessions[0];
726        assert_eq!(s.hooks.len(), 1, "all three invocations share one command");
727        let h = &s.hooks[0];
728        assert_eq!(h.command, "bash run.sh");
729        assert_eq!(h.invocations, 3);
730        assert_eq!(h.total_duration_ms, 600);
731        assert_eq!(h.error_count, 1);
732        assert_eq!(h.prevented_continuation_count, 1);
733    }
734
735    #[test]
736    fn pipeline_old_session_has_empty_capability_arrays() {
737        // A session JSONL with NO attribution fields and NO stop_hook_summary
738        // entries should produce empty plugins/skills/hooks Vecs (not None).
739        let tmp = TempDir::new().unwrap();
740        let project = tmp.path().join("projects").join("-Users-test-proj");
741        fs::create_dir_all(&project).unwrap();
742        let uuid = "33333333-4444-5555-6666-777777777777";
743        let asst = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"33333333-4444-5555-6666-777777777777","version":"2.1.90","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
744        fs::write(project.join(format!("{}.jsonl", uuid)), format!("{asst}\n")).unwrap();
745
746        let calc = PricingCalculator::new();
747        let (sessions, _q) = load_all(tmp.path(), &calc).unwrap();
748        assert_eq!(sessions.len(), 1);
749        let s = &sessions[0];
750        assert!(
751            s.plugins.is_empty(),
752            "old session must produce empty plugins Vec"
753        );
754        assert!(
755            s.skills.is_empty(),
756            "old session must produce empty skills Vec"
757        );
758        assert!(
759            s.hooks.is_empty(),
760            "old session must produce empty hooks Vec"
761        );
762        assert!(
763            s.subagents.is_empty(),
764            "session without agent files must produce empty subagents Vec"
765        );
766    }
767
768    /// A subagent jsonl exists at `<proj>/<uuid>/subagents/agent-X.jsonl`
769    /// but the parent main session jsonl `<proj>/<uuid>.jsonl` was deleted.
770    /// The loader still picks up the subagent (data is preserved), but flags
771    /// the synthesized parent SessionData as orphan.
772    #[test]
773    fn loader_marks_orphan_subagent_as_orphan() {
774        let tmp = TempDir::new().unwrap();
775        let project = tmp.path().join("projects").join("-Users-test-proj");
776        let parent_uuid = "99999999-aaaa-bbbb-cccc-dddddddddddd";
777        let subagents_dir = project.join(parent_uuid).join("subagents");
778        fs::create_dir_all(&subagents_dir).unwrap();
779        // Note: NO `<project>/<parent_uuid>.jsonl` — the parent main session
780        // was deleted by the user.
781
782        let agent_turn = r#"{"type":"assistant","uuid":"a1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":5,"output_tokens":10,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"orphan-agent"}]},"sessionId":"agent-orphan-1","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-orphan-1"}"#;
783        fs::write(
784            subagents_dir.join("agent-orphan-1.jsonl"),
785            format!("{}\n", agent_turn),
786        )
787        .unwrap();
788
789        let calc = PricingCalculator::new();
790        let (sessions, quality) = load_all(tmp.path(), &calc).unwrap();
791
792        assert_eq!(
793            sessions.len(),
794            1,
795            "loader should reconstruct an orphan parent session"
796        );
797        let s = &sessions[0];
798        assert_eq!(s.session_id, parent_uuid);
799        assert!(s.is_orphan, "synthesized parent must be flagged as orphan");
800        // The subagent's turn is preserved.
801        assert_eq!(s.subagents.len(), 1);
802        assert_eq!(s.subagents[0].turns.len(), 1);
803        // Quality counter also records the orphan.
804        assert_eq!(quality.orphan_agents, 1);
805    }
806
807    /// A normal session with its main `<uuid>.jsonl` present *and* subagent
808    /// files under `<uuid>/subagents/` must NOT be flagged as orphan.
809    #[test]
810    fn loader_marks_normal_session_as_not_orphan() {
811        let (tmp, session_uuid) = write_fixture_session();
812        let calc = PricingCalculator::new();
813        let (sessions, quality) = load_all(tmp.path(), &calc).unwrap();
814        assert_eq!(sessions.len(), 1);
815        let s = &sessions[0];
816        assert_eq!(s.session_id, session_uuid);
817        assert!(
818            !s.is_orphan,
819            "session with parent main jsonl present must not be orphan"
820        );
821        // No orphans counted at the global level either.
822        assert_eq!(quality.orphan_agents, 0);
823    }
824
825    /// Group three subagents (2x builder, 1x code-reviewer) plus one with
826    /// no agent_type (None) → expect three entries: builder x2, code-reviewer
827    /// x1, and "unknown" x1 (data not dropped).
828    #[test]
829    fn subagent_type_aggregation_groups_by_agent_type() {
830        use crate::data::models::{Subagent, ValidatedTurn};
831
832        let calc = PricingCalculator::new();
833
834        // Helper to build a synthetic Subagent with N turns of given token counts.
835        let make_agent = |agent_id: &str,
836                          agent_type: Option<&str>,
837                          description: Option<&str>,
838                          turns: usize|
839         -> Subagent {
840            let mut tlist = Vec::with_capacity(turns);
841            for i in 0..turns {
842                tlist.push(ValidatedTurn {
843                    uuid: format!("{}-{}", agent_id, i),
844                    request_id: Some(format!("{}-r-{}", agent_id, i)),
845                    timestamp: "2026-05-01T10:00:00Z".parse().unwrap(),
846                    model: "claude-opus-4-6".into(),
847                    usage: crate::data::models::TokenUsage {
848                        input_tokens: Some(100),
849                        output_tokens: Some(200),
850                        cache_creation_input_tokens: Some(0),
851                        cache_read_input_tokens: Some(0),
852                        cache_creation: None,
853                        server_tool_use: None,
854                        service_tier: None,
855                        speed: None,
856                        inference_geo: None,
857                    },
858                    stop_reason: None,
859                    content_types: vec![],
860                    is_agent: true,
861                    agent_id: Some(agent_id.to_string()),
862                    user_text: None,
863                    assistant_text: None,
864                    tool_names: vec![],
865                    service_tier: None,
866                    speed: None,
867                    inference_geo: None,
868                    tool_error_count: 0,
869                    git_branch: None,
870                    attribution_plugin: None,
871                    attribution_skill: None,
872                });
873            }
874            Subagent {
875                agent_id: agent_id.to_string(),
876                agent_type: agent_type.map(|s| s.to_string()),
877                description: description.map(|s| s.to_string()),
878                turns: tlist,
879                first_timestamp: None,
880                last_timestamp: None,
881                workflow_run_id: None,
882            }
883        };
884
885        let session = SessionData {
886            session_id: "s1".into(),
887            project: Some("p".into()),
888            turns: Vec::new(),
889            subagents: vec![
890                make_agent("agent-aaa", Some("builder"), Some("task A"), 2),
891                make_agent("agent-bbb", Some("builder"), Some("task B"), 3),
892                make_agent("agent-ccc", Some("code-reviewer"), Some("review X"), 1),
893            ],
894            plugins: Vec::new(),
895            skills: Vec::new(),
896            hooks: Vec::new(),
897            first_timestamp: None,
898            last_timestamp: None,
899            version: None,
900            quality: DataQuality::default(),
901            metadata: super::SessionMetadata::default(),
902            is_orphan: false,
903        };
904
905        let aggs = session.subagent_type_aggregates(&calc);
906        // Sorted alphabetically: builder, code-reviewer.
907        assert_eq!(aggs.len(), 2);
908        assert_eq!(aggs[0].agent_type, "builder");
909        assert_eq!(aggs[0].count, 2);
910        assert_eq!(aggs[0].total_turns, 5); // 2 + 3
911        assert_eq!(aggs[0].total_input_tokens, 500); // (2+3) * 100
912        assert_eq!(aggs[0].total_output_tokens, 1000); // (2+3) * 200
913        assert!(aggs[0].total_cost > 0.0);
914        assert_eq!(
915            aggs[0].descriptions,
916            vec!["task A".to_string(), "task B".to_string()]
917        );
918
919        assert_eq!(aggs[1].agent_type, "code-reviewer");
920        assert_eq!(aggs[1].count, 1);
921        assert_eq!(aggs[1].total_turns, 1);
922        assert_eq!(aggs[1].descriptions, vec!["review X".to_string()]);
923    }
924
925    /// A subagent with `agent_type = None` must be grouped under the literal
926    /// "unknown" key, never silently dropped.
927    #[test]
928    fn subagent_type_aggregation_handles_missing_type() {
929        use crate::data::models::{Subagent, ValidatedTurn};
930
931        let calc = PricingCalculator::new();
932        let make_turn = |id: &str| ValidatedTurn {
933            uuid: id.to_string(),
934            request_id: Some(format!("r-{}", id)),
935            timestamp: "2026-05-01T10:00:00Z".parse().unwrap(),
936            model: "claude-opus-4-6".into(),
937            usage: crate::data::models::TokenUsage {
938                input_tokens: Some(50),
939                output_tokens: Some(50),
940                cache_creation_input_tokens: Some(0),
941                cache_read_input_tokens: Some(0),
942                cache_creation: None,
943                server_tool_use: None,
944                service_tier: None,
945                speed: None,
946                inference_geo: None,
947            },
948            stop_reason: None,
949            content_types: vec![],
950            is_agent: true,
951            agent_id: Some("agent-no-meta".into()),
952            user_text: None,
953            assistant_text: None,
954            tool_names: vec![],
955            service_tier: None,
956            speed: None,
957            inference_geo: None,
958            tool_error_count: 0,
959            git_branch: None,
960            attribution_plugin: None,
961            attribution_skill: None,
962        };
963
964        let session = SessionData {
965            session_id: "s1".into(),
966            project: Some("p".into()),
967            turns: Vec::new(),
968            subagents: vec![Subagent {
969                agent_id: "agent-no-meta".into(),
970                agent_type: None, // .meta.json missing
971                description: None,
972                turns: vec![make_turn("t1")],
973                first_timestamp: None,
974                last_timestamp: None,
975                workflow_run_id: None,
976            }],
977            plugins: Vec::new(),
978            skills: Vec::new(),
979            hooks: Vec::new(),
980            first_timestamp: None,
981            last_timestamp: None,
982            version: None,
983            quality: DataQuality::default(),
984            metadata: super::SessionMetadata::default(),
985            is_orphan: false,
986        };
987
988        let aggs = session.subagent_type_aggregates(&calc);
989        assert_eq!(
990            aggs.len(),
991            1,
992            "agent_type=None should still produce one aggregate, not drop the data"
993        );
994        assert_eq!(aggs[0].agent_type, "unknown");
995        assert_eq!(aggs[0].count, 1);
996        assert_eq!(aggs[0].total_turns, 1);
997    }
998
999    /// Orphan sessions must contribute to the *global* overview totals
1000    /// (cost / turns / tokens). The orphan flag is for display only.
1001    #[test]
1002    fn global_totals_include_orphan_sessions() {
1003        // Same fixture as the orphan-flag test, but verify overview math.
1004        let tmp = TempDir::new().unwrap();
1005        let project = tmp.path().join("projects").join("-Users-test-proj");
1006        let parent_uuid = "88888888-aaaa-bbbb-cccc-dddddddddddd";
1007        let subagents_dir = project.join(parent_uuid).join("subagents");
1008        fs::create_dir_all(&subagents_dir).unwrap();
1009        // Two turns under the orphan parent.
1010        let t1 = r#"{"type":"assistant","uuid":"a1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":1000,"output_tokens":2000,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"x"}]},"sessionId":"agent-orphan-z","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-orph-1"}"#;
1011        let t2 = r#"{"type":"assistant","uuid":"a2","timestamp":"2026-05-01T10:01:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3000,"output_tokens":4000,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"y"}]},"sessionId":"agent-orphan-z","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-orph-2"}"#;
1012        fs::write(
1013            subagents_dir.join("agent-orphan-z.jsonl"),
1014            format!("{}\n{}\n", t1, t2),
1015        )
1016        .unwrap();
1017
1018        let calc = PricingCalculator::new();
1019        let (sessions, quality) = load_all(tmp.path(), &calc).unwrap();
1020        assert_eq!(sessions.len(), 1);
1021        assert!(sessions[0].is_orphan);
1022
1023        // Now drive the overview analysis and ensure totals reflect the
1024        // orphan session's data (cost > 0, agent turns counted).
1025        let overview = crate::analysis::overview::analyze_overview(&sessions, quality, &calc, None);
1026        assert_eq!(overview.total_sessions, 1);
1027        assert_eq!(overview.total_turns, 2);
1028        assert_eq!(overview.total_agent_turns, 2);
1029        assert!(
1030            overview.total_cost > 0.0,
1031            "orphan session's cost must flow into total_cost"
1032        );
1033        // Output tokens accumulated from the two orphan turns.
1034        assert_eq!(overview.total_output_tokens, 6000);
1035    }
1036
1037    /// Task 0: a workflow agent transcript under
1038    /// `<proj>/<uuid>/subagents/workflows/wf_<runId>/agent-*.jsonl` must be
1039    /// discovered by the scanner (Type 4), parsed as an agent, grouped under the
1040    /// correct parent session, tagged with its `workflow_run_id`, and have its
1041    /// tokens/cost flow into the parent's `all_responses()` total. The workflow
1042    /// turns carry `isSidechain=true` (like all agent files) and must survive
1043    /// the sidechain filter (is_agent=true) and cross-file dedup (their
1044    /// requestIds do not appear in the main jsonl).
1045    #[test]
1046    fn workflow_agent_tokens_enter_parent_total_cost() {
1047        let tmp = TempDir::new().unwrap();
1048        let project = tmp.path().join("projects").join("-Users-test-proj");
1049        fs::create_dir_all(&project).unwrap();
1050
1051        let session_uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
1052        let main_path = project.join(format!("{}.jsonl", session_uuid));
1053
1054        // One ordinary main turn (requestId r-main-1).
1055        let main_turn = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee","version":"2.1.159","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
1056        fs::write(&main_path, format!("{}\n", main_turn)).unwrap();
1057
1058        // Workflow run directory: <uuid>/subagents/workflows/wf_run123/
1059        let wf_dir = project
1060            .join(session_uuid)
1061            .join("subagents")
1062            .join("workflows")
1063            .join("wf_run123");
1064        fs::create_dir_all(&wf_dir).unwrap();
1065
1066        // Two workflow agent transcripts, each with one sidechain assistant turn
1067        // carrying real usage. Unique requestIds (not present in the main file).
1068        let wf_agent_a = r#"{"type":"assistant","uuid":"wa1","timestamp":"2026-05-01T10:05:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":1000,"output_tokens":2000,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"wf-a"}]},"sessionId":"agent-wfa","version":"2.1.159","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-wf-a-1"}"#;
1069        let wf_agent_b = r#"{"type":"assistant","uuid":"wb1","timestamp":"2026-05-01T10:06:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3000,"output_tokens":4000,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"wf-b"}]},"sessionId":"agent-wfb","version":"2.1.159","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-wf-b-1"}"#;
1070        fs::write(wf_dir.join("agent-wfa.jsonl"), format!("{}\n", wf_agent_a)).unwrap();
1071        fs::write(wf_dir.join("agent-wfb.jsonl"), format!("{}\n", wf_agent_b)).unwrap();
1072        // Meta sidecar for agent A only (verify workflow meta hydration).
1073        fs::write(
1074            wf_dir.join("agent-wfa.meta.json"),
1075            r#"{"agentType":"researcher","description":"gather facts"}"#,
1076        )
1077        .unwrap();
1078
1079        let calc = PricingCalculator::new();
1080
1081        // Baseline cost WITHOUT the workflow agents (main turn only): compute
1082        // directly so we can assert the delta the workflow turns contribute.
1083        let main_only_cost = {
1084            let usage = TokenUsage {
1085                input_tokens: Some(10),
1086                output_tokens: Some(20),
1087                cache_creation_input_tokens: Some(0),
1088                cache_read_input_tokens: Some(0),
1089                cache_creation: None,
1090                server_tool_use: None,
1091                service_tier: None,
1092                speed: None,
1093                inference_geo: None,
1094            };
1095            calc.calculate_turn_cost("claude-opus-4-6", &usage).total
1096        };
1097
1098        let (sessions, _quality) = load_all(tmp.path(), &calc).unwrap();
1099        assert_eq!(sessions.len(), 1, "one parent session");
1100        let s = &sessions[0];
1101        assert_eq!(s.session_id, session_uuid);
1102
1103        // The two workflow agents were grouped under the parent session.
1104        assert_eq!(
1105            s.subagents.len(),
1106            2,
1107            "two workflow agent files -> two subagents"
1108        );
1109        for sa in &s.subagents {
1110            assert_eq!(
1111                sa.workflow_run_id.as_deref(),
1112                Some("wf_run123"),
1113                "workflow subagent must carry its run id"
1114            );
1115        }
1116        // Workflow meta sidecar hydrated agent A's type.
1117        let agent_a = s
1118            .subagents
1119            .iter()
1120            .find(|sa| sa.agent_id == "agent-wfa")
1121            .expect("agent-wfa present");
1122        assert_eq!(agent_a.agent_type.as_deref(), Some("researcher"));
1123
1124        // The workflow turns are present (not dropped by sidechain/dedup).
1125        assert_eq!(s.agent_turn_count(), 2, "both workflow turns kept");
1126        assert_eq!(s.total_turn_count(), 3, "1 main + 2 workflow");
1127
1128        // all_responses() includes main + workflow turns.
1129        let all = s.all_responses();
1130        assert_eq!(all.len(), 3);
1131
1132        // Total cost over all_responses() includes the workflow turns: it must
1133        // exceed the main-only cost by exactly the two workflow turns' cost.
1134        let total_cost: f64 = all
1135            .iter()
1136            .map(|t| calc.calculate_turn_cost(&t.model, &t.usage).total)
1137            .sum();
1138        let wf_a_cost = {
1139            let usage = TokenUsage {
1140                input_tokens: Some(1000),
1141                output_tokens: Some(2000),
1142                cache_creation_input_tokens: Some(0),
1143                cache_read_input_tokens: Some(0),
1144                cache_creation: None,
1145                server_tool_use: None,
1146                service_tier: None,
1147                speed: None,
1148                inference_geo: None,
1149            };
1150            calc.calculate_turn_cost("claude-opus-4-6", &usage).total
1151        };
1152        let wf_b_cost = {
1153            let usage = TokenUsage {
1154                input_tokens: Some(3000),
1155                output_tokens: Some(4000),
1156                cache_creation_input_tokens: Some(0),
1157                cache_read_input_tokens: Some(0),
1158                cache_creation: None,
1159                server_tool_use: None,
1160                service_tier: None,
1161                speed: None,
1162                inference_geo: None,
1163            };
1164            calc.calculate_turn_cost("claude-opus-4-6", &usage).total
1165        };
1166        assert!(
1167            (total_cost - (main_only_cost + wf_a_cost + wf_b_cost)).abs() < 1e-9,
1168            "total {total_cost} must equal main {main_only_cost} + wf_a {wf_a_cost} + wf_b {wf_b_cost}"
1169        );
1170        assert!(
1171            total_cost > main_only_cost,
1172            "workflow tokens must increase total cost above main-only baseline"
1173        );
1174
1175        // Workflow output tokens (2000 + 4000) are in the total.
1176        let total_output: u64 = all.iter().map(|t| t.usage.output_tokens.unwrap_or(0)).sum();
1177        assert_eq!(total_output, 20 + 2000 + 4000);
1178    }
1179
1180    #[test]
1181    fn pipeline_subagents_many() {
1182        // Construct a fixture with N=10 distinct subagent files to verify the
1183        // grouping scales correctly (spec mentions 69-subagent sessions).
1184        let tmp = TempDir::new().unwrap();
1185        let project = tmp.path().join("projects").join("-Users-test-proj");
1186        fs::create_dir_all(&project).unwrap();
1187        let uuid = "44444444-5555-6666-7777-888888888888";
1188
1189        // Main session with one turn.
1190        let main_turn = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"44444444-5555-6666-7777-888888888888","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
1191        fs::write(
1192            project.join(format!("{}.jsonl", uuid)),
1193            format!("{}\n", main_turn),
1194        )
1195        .unwrap();
1196
1197        let subagents_dir = project.join(uuid).join("subagents");
1198        fs::create_dir_all(&subagents_dir).unwrap();
1199
1200        for i in 0..10 {
1201            // Each agent file has 2 turns, unique request_ids.
1202            let line1 = format!(
1203                r#"{{"type":"assistant","uuid":"a{i}-1","timestamp":"2026-05-01T10:0{i}:00Z","message":{{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{{"input_tokens":1,"output_tokens":2,"cache_creation_input_tokens":0,"cache_read_input_tokens":0}},"content":[{{"type":"text","text":"a"}}]}},"sessionId":"agent-id{i:03}","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-{i}-1"}}"#
1204            );
1205            let line2 = format!(
1206                r#"{{"type":"assistant","uuid":"a{i}-2","timestamp":"2026-05-01T10:0{i}:01Z","message":{{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{{"input_tokens":1,"output_tokens":2,"cache_creation_input_tokens":0,"cache_read_input_tokens":0}},"content":[{{"type":"text","text":"a"}}]}},"sessionId":"agent-id{i:03}","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-{i}-2"}}"#
1207            );
1208            fs::write(
1209                subagents_dir.join(format!("agent-id{i:03}.jsonl")),
1210                format!("{line1}\n{line2}\n"),
1211            )
1212            .unwrap();
1213        }
1214
1215        let calc = PricingCalculator::new();
1216        let (sessions, _q) = load_all(tmp.path(), &calc).unwrap();
1217        assert_eq!(sessions.len(), 1);
1218        let s = &sessions[0];
1219        assert_eq!(s.subagents.len(), 10, "all 10 agent files become subagents");
1220        for sa in &s.subagents {
1221            assert_eq!(sa.turns.len(), 2);
1222        }
1223        // Subagent ordering: ascending by agent_id (deterministic).
1224        let ids: Vec<&str> = s.subagents.iter().map(|sa| sa.agent_id.as_str()).collect();
1225        let mut sorted = ids.clone();
1226        sorted.sort();
1227        assert_eq!(ids, sorted);
1228
1229        // Total turn count: 1 main + 20 subagent.
1230        assert_eq!(s.total_turn_count(), 21);
1231        assert_eq!(s.agent_turn_count(), 20);
1232    }
1233}