1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use rayon::prelude::*;
4use std::collections::{HashMap, HashSet};
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::path::{Path, PathBuf};
8
9use super::models::{
10 DataQuality, GlobalDataQuality, HookUsage, PluginUsage, SessionData, SessionFile,
11 SessionMetadata, SkillUsage, Subagent,
12};
13use super::parser::parse_session_file;
14use super::scanner::{resolve_agent_parents, scan_claude_home};
15use crate::pricing::calculator::PricingCalculator;
16
17fn extract_version(path: &Path) -> Option<String> {
21 let file = File::open(path).ok()?;
22 let reader = BufReader::new(file);
23 let first_line = reader.lines().next()?.ok()?;
24 let val: serde_json::Value = serde_json::from_str(&first_line).ok()?;
25 val.get("version")
26 .and_then(|v| v.as_str())
27 .map(|s| s.to_string())
28}
29
30fn time_range<'a, I>(timestamps: I) -> (Option<DateTime<Utc>>, Option<DateTime<Utc>>)
32where
33 I: Iterator<Item = &'a DateTime<Utc>>,
34{
35 let mut min: Option<DateTime<Utc>> = None;
36 let mut max: Option<DateTime<Utc>> = None;
37 for ts in timestamps {
38 min = Some(min.map_or(*ts, |m: DateTime<Utc>| m.min(*ts)));
39 max = Some(max.map_or(*ts, |m: DateTime<Utc>| m.max(*ts)));
40 }
41 (min, max)
42}
43
44fn request_id_set(turns: &[super::models::ValidatedTurn]) -> HashSet<String> {
46 turns
47 .iter()
48 .filter_map(|t| t.request_id.as_ref())
49 .cloned()
50 .collect()
51}
52
53pub fn load_all(
67 claude_home: &Path,
68 calc: &PricingCalculator,
69) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
70 let mut files =
71 scan_claude_home(claude_home).context("failed to scan claude home for session files")?;
72 resolve_agent_parents(&mut files).context("failed to resolve agent parent sessions")?;
73 load_from_files(files, claude_home, calc)
74}
75
76struct ParsedMain {
78 session_id: String,
79 project: Option<String>,
80 turns: Vec<super::models::ValidatedTurn>,
81 version: Option<String>,
82 first_ts: Option<DateTime<Utc>>,
83 last_ts: Option<DateTime<Utc>>,
84 quality: DataQuality,
85 metadata: SessionMetadata,
86 hooks: Vec<HookUsage>,
87}
88
89struct ParsedAgent {
92 target_id: String,
94 project: Option<String>,
96 agent_id: String,
99 #[allow(dead_code)]
101 path: PathBuf,
102 turns: Vec<super::models::ValidatedTurn>,
103 quality: DataQuality,
104}
105
106fn load_from_files(
109 files: Vec<SessionFile>,
110 claude_home: &Path,
111 calc: &PricingCalculator,
112) -> Result<(Vec<SessionData>, GlobalDataQuality)> {
113 let (main_files, agent_files): (Vec<_>, Vec<_>) = files.into_iter().partition(|f| !f.is_agent);
114
115 let mut global_quality = GlobalDataQuality {
116 total_session_files: main_files.len(),
117 total_agent_files: agent_files.len(),
118 ..Default::default()
119 };
120
121 let parsed_mains: Vec<Result<ParsedMain>> = main_files
123 .par_iter()
124 .map(|sf| {
125 let (turns, quality, metadata, hooks) = parse_session_file(&sf.path, false)
126 .with_context(|| format!("failed to parse session: {}", sf.path.display()))?;
127 let version = extract_version(&sf.path);
128 let (first_ts, last_ts) = time_range(turns.iter().map(|t| &t.timestamp));
129 Ok(ParsedMain {
130 session_id: sf.session_id.clone(),
131 project: sf.project.clone(),
132 turns,
133 version,
134 first_ts,
135 last_ts,
136 quality,
137 metadata,
138 hooks,
139 })
140 })
141 .collect();
142
143 let mut sessions: HashMap<String, SessionData> = HashMap::with_capacity(parsed_mains.len());
145 for result in parsed_mains {
146 let pm = result?;
147 global_quality.total_valid_turns += pm.quality.valid_turns;
148 global_quality.total_skipped += pm.quality.skipped_synthetic
149 + pm.quality.skipped_sidechain
150 + pm.quality.skipped_invalid
151 + pm.quality.skipped_parse_error;
152
153 sessions.insert(
154 pm.session_id.clone(),
155 SessionData {
156 session_id: pm.session_id,
157 project: pm.project,
158 turns: pm.turns,
159 subagents: Vec::new(),
160 plugins: Vec::new(),
161 skills: Vec::new(),
162 hooks: pm.hooks,
163 first_timestamp: pm.first_ts,
164 last_timestamp: pm.last_ts,
165 version: pm.version,
166 quality: pm.quality,
167 metadata: pm.metadata,
168 is_orphan: false,
169 },
170 );
171 }
172
173 let parsed_agents: Vec<Result<ParsedAgent>> = agent_files
175 .par_iter()
176 .map(|sf| {
177 let (turns, quality, _meta, _hooks) = parse_session_file(&sf.path, true)
178 .with_context(|| format!("failed to parse agent file: {}", sf.path.display()))?;
179 let target_id = sf
180 .parent_session_id
181 .clone()
182 .unwrap_or_else(|| sf.session_id.clone());
183 Ok(ParsedAgent {
184 target_id,
185 project: sf.project.clone(),
186 agent_id: sf.session_id.clone(),
187 path: sf.path.clone(),
188 turns,
189 quality,
190 })
191 })
192 .collect();
193
194 let mut agents_by_parent: HashMap<String, Vec<ParsedAgent>> = HashMap::new();
196 for result in parsed_agents {
197 let pa = result?;
198 global_quality.total_valid_turns += pa.quality.valid_turns;
199 global_quality.total_skipped += pa.quality.skipped_synthetic
200 + pa.quality.skipped_sidechain
201 + pa.quality.skipped_invalid
202 + pa.quality.skipped_parse_error;
203 agents_by_parent
204 .entry(pa.target_id.clone())
205 .or_default()
206 .push(pa);
207 }
208
209 for (target_id, agents) in agents_by_parent {
211 if !sessions.contains_key(&target_id) {
213 let project = agents
214 .iter()
215 .find_map(|a| a.project.clone())
216 .or_else(|| Some("(orphan)".to_string()));
217 sessions.insert(
218 target_id.clone(),
219 SessionData {
220 session_id: target_id.clone(),
221 project,
222 turns: Vec::new(),
223 subagents: Vec::new(),
224 plugins: Vec::new(),
225 skills: Vec::new(),
226 hooks: Vec::new(),
227 first_timestamp: None,
228 last_timestamp: None,
229 version: None,
230 quality: DataQuality::default(),
231 metadata: SessionMetadata::default(),
232 is_orphan: true,
233 },
234 );
235 global_quality.orphan_agents += 1;
236 }
237
238 let agent_meta_map = crate::data::scanner::load_agent_meta(&target_id, claude_home);
241
242 let parent = sessions.get_mut(&target_id).unwrap();
243 let existing_rids = request_id_set(&parent.turns);
244
245 let mut agents = agents;
247 agents.sort_by(|a, b| a.agent_id.cmp(&b.agent_id));
248
249 for pa in agents {
250 let mut kept_count = 0usize;
254 let mut dropped_count = 0usize;
255 let mut kept_turns: Vec<super::models::ValidatedTurn> =
256 Vec::with_capacity(pa.turns.len());
257 for turn in pa.turns {
258 let dominated = turn
259 .request_id
260 .as_ref()
261 .is_some_and(|rid| existing_rids.contains(rid));
262 if dominated {
263 dropped_count += 1;
264 } else {
265 kept_count += 1;
266 kept_turns.push(turn);
267 }
268 }
269
270 parent.quality.total_lines += pa.quality.total_lines;
273 parent.quality.valid_turns += kept_count;
274 parent.quality.skipped_synthetic += pa.quality.skipped_synthetic;
275 parent.quality.skipped_sidechain += pa.quality.skipped_sidechain;
276 parent.quality.skipped_invalid += pa.quality.skipped_invalid;
277 parent.quality.skipped_parse_error += pa.quality.skipped_parse_error;
278 parent.quality.duplicate_turns += pa.quality.duplicate_turns + dropped_count;
279
280 kept_turns.sort_by_key(|t| t.timestamp);
282 let (first_ts, last_ts) = time_range(kept_turns.iter().map(|t| &t.timestamp));
283
284 let meta_key = pa
286 .agent_id
287 .strip_prefix("agent-")
288 .unwrap_or(&pa.agent_id)
289 .to_string();
290 let (agent_type, description) = agent_meta_map
291 .get(&meta_key)
292 .map(|(t, d)| (Some(t.clone()), Some(d.clone())))
293 .unwrap_or((None, None));
294
295 parent.subagents.push(Subagent {
296 agent_id: pa.agent_id,
297 agent_type,
298 description,
299 turns: kept_turns,
300 first_timestamp: first_ts,
301 last_timestamp: last_ts,
302 });
303 }
304 }
305
306 for session in sessions.values_mut() {
308 session.plugins = aggregate_plugins(&session.turns, calc);
309 session.skills = aggregate_skills(&session.turns, calc);
310 }
311
312 let mut result: Vec<SessionData> = sessions.into_values().collect();
314 result.sort_by_key(|b| std::cmp::Reverse(b.first_timestamp));
316 let mut global_min: Option<DateTime<Utc>> = None;
317 let mut global_max: Option<DateTime<Utc>> = None;
318
319 for session in &mut result {
320 let all_timestamps = session.all_responses();
321 let (first_ts, last_ts) = time_range(all_timestamps.iter().map(|t| &t.timestamp));
322 session.first_timestamp = first_ts;
323 session.last_timestamp = last_ts;
324
325 if let Some(ts) = first_ts {
326 global_min = Some(global_min.map_or(ts, |m: DateTime<Utc>| m.min(ts)));
327 }
328 if let Some(ts) = last_ts {
329 global_max = Some(global_max.map_or(ts, |m: DateTime<Utc>| m.max(ts)));
330 }
331 }
332
333 global_quality.time_range = match (global_min, global_max) {
334 (Some(min), Some(max)) => Some((min, max)),
335 _ => None,
336 };
337
338 Ok((result, global_quality))
339}
340
341fn aggregate_plugins(
346 turns: &[super::models::ValidatedTurn],
347 calc: &PricingCalculator,
348) -> Vec<PluginUsage> {
349 let mut acc: HashMap<String, PluginUsage> = HashMap::new();
350 for turn in turns {
351 let plugin = match turn.attribution_plugin.as_deref() {
352 Some(p) if !p.is_empty() => p,
353 _ => continue,
354 };
355 let cost = calc.calculate_turn_cost(&turn.model, &turn.usage).total;
356 let input = turn.usage.input_tokens.unwrap_or(0);
357 let output = turn.usage.output_tokens.unwrap_or(0);
358 let entry = acc
359 .entry(plugin.to_string())
360 .or_insert_with(|| PluginUsage {
361 plugin: plugin.to_string(),
362 turns: 0,
363 cost: 0.0,
364 input_tokens: 0,
365 output_tokens: 0,
366 });
367 entry.turns += 1;
368 entry.cost += cost;
369 entry.input_tokens += input;
370 entry.output_tokens += output;
371 }
372 let mut out: Vec<PluginUsage> = acc.into_values().collect();
373 out.sort_by(|a, b| a.plugin.cmp(&b.plugin));
374 out
375}
376
377fn aggregate_skills(
381 turns: &[super::models::ValidatedTurn],
382 calc: &PricingCalculator,
383) -> Vec<SkillUsage> {
384 let mut acc: HashMap<String, SkillUsage> = HashMap::new();
385 for turn in turns {
386 let skill = match turn.attribution_skill.as_deref() {
387 Some(s) if !s.is_empty() => s,
388 _ => continue,
389 };
390 let cost = calc.calculate_turn_cost(&turn.model, &turn.usage).total;
391 let input = turn.usage.input_tokens.unwrap_or(0);
392 let output = turn.usage.output_tokens.unwrap_or(0);
393 let entry = acc.entry(skill.to_string()).or_insert_with(|| SkillUsage {
394 skill: skill.to_string(),
395 turns: 0,
396 cost: 0.0,
397 input_tokens: 0,
398 output_tokens: 0,
399 });
400 entry.turns += 1;
401 entry.cost += cost;
402 entry.input_tokens += input;
403 entry.output_tokens += output;
404 }
405 let mut out: Vec<SkillUsage> = acc.into_values().collect();
406 out.sort_by(|a, b| a.skill.cmp(&b.skill));
407 out
408}
409
410#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::data::models::{TokenUsage, ValidatedTurn};
416 use std::fs;
417 use tempfile::TempDir;
418
419 fn turn(
421 ts: &str,
422 cost_input: u64,
423 cost_output: u64,
424 attribution: Option<(&str, &str)>,
425 ) -> ValidatedTurn {
426 ValidatedTurn {
427 uuid: format!("u-{ts}"),
428 request_id: Some(format!("r-{ts}")),
429 timestamp: ts.parse().unwrap(),
430 model: "claude-opus-4-6".into(),
431 usage: TokenUsage {
432 input_tokens: Some(cost_input),
433 output_tokens: Some(cost_output),
434 cache_creation_input_tokens: Some(0),
435 cache_read_input_tokens: Some(0),
436 cache_creation: None,
437 server_tool_use: None,
438 service_tier: None,
439 speed: None,
440 inference_geo: None,
441 },
442 stop_reason: None,
443 content_types: vec![],
444 is_agent: false,
445 agent_id: None,
446 user_text: None,
447 assistant_text: None,
448 tool_names: vec![],
449 service_tier: None,
450 speed: None,
451 inference_geo: None,
452 tool_error_count: 0,
453 git_branch: None,
454 attribution_plugin: attribution.map(|(p, _)| p.to_string()),
455 attribution_skill: attribution.map(|(_, s)| s.to_string()),
456 }
457 }
458
459 #[test]
460 fn pipeline_plugins_skills_aggregation() {
461 let turns = vec![
463 turn(
464 "2026-05-01T00:00:00Z",
465 10,
466 20,
467 Some(("superpowers", "superpowers:brainstorming")),
468 ),
469 turn(
470 "2026-05-01T00:00:01Z",
471 30,
472 40,
473 Some(("superpowers", "superpowers:brainstorming")),
474 ),
475 turn("2026-05-01T00:00:02Z", 1, 2, None),
476 ];
477 let calc = PricingCalculator::new();
478 let plugins = aggregate_plugins(&turns, &calc);
479 let skills = aggregate_skills(&turns, &calc);
480
481 assert_eq!(plugins.len(), 1, "two plugin turns should fold to one row");
482 assert_eq!(plugins[0].plugin, "superpowers");
483 assert_eq!(plugins[0].turns, 2);
484 assert_eq!(plugins[0].input_tokens, 40);
485 assert_eq!(plugins[0].output_tokens, 60);
486
487 assert_eq!(skills.len(), 1);
488 assert_eq!(skills[0].skill, "superpowers:brainstorming");
489 assert_eq!(skills[0].turns, 2);
490
491 assert!((plugins[0].cost - skills[0].cost).abs() < 1e-9);
494 }
495
496 #[test]
497 fn pipeline_plugins_empty_when_no_attribution() {
498 let turns = vec![
499 turn("2026-05-01T00:00:00Z", 10, 20, None),
500 turn("2026-05-01T00:00:01Z", 30, 40, None),
501 ];
502 let calc = PricingCalculator::new();
503 assert!(aggregate_plugins(&turns, &calc).is_empty());
504 assert!(aggregate_skills(&turns, &calc).is_empty());
505 }
506
507 fn write_fixture_session() -> (TempDir, String) {
512 let tmp = TempDir::new().unwrap();
513 let project = tmp.path().join("projects").join("-Users-test-proj");
514 fs::create_dir_all(&project).unwrap();
515
516 let session_uuid = "11111111-2222-3333-4444-555555555555";
517 let main_path = project.join(format!("{}.jsonl", session_uuid));
518
519 let main_turn_1 = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"11111111-2222-3333-4444-555555555555","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
522 let main_turn_2 = r#"{"type":"assistant","uuid":"m2","timestamp":"2026-05-01T10:01:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":30,"output_tokens":40,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"bye"}]},"sessionId":"11111111-2222-3333-4444-555555555555","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-2","attributionPlugin":"superpowers","attributionSkill":"superpowers:brainstorming"}"#;
523 let main_hook = r#"{"type":"system","subtype":"stop_hook_summary","hookCount":1,"hookInfos":[{"command":"bash hook.sh","durationMs":50}],"hookErrors":[],"preventedContinuation":false,"sessionId":"11111111-2222-3333-4444-555555555555"}"#;
525 fs::write(
526 &main_path,
527 format!("{}\n{}\n{}\n", main_turn_1, main_turn_2, main_hook),
528 )
529 .unwrap();
530
531 let subagents_dir = project.join(session_uuid).join("subagents");
533 fs::create_dir_all(&subagents_dir).unwrap();
534
535 let agent_a_turn_1 = r#"{"type":"assistant","uuid":"a1","timestamp":"2026-05-01T10:02:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":5,"output_tokens":10,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"agent-a-1"}]},"sessionId":"agent-aaa1","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-agentA-1"}"#;
537 let agent_a_turn_2 = r#"{"type":"assistant","uuid":"a2","timestamp":"2026-05-01T10:03:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":7,"output_tokens":11,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"agent-a-2"}]},"sessionId":"agent-aaa1","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-agentA-2"}"#;
538 fs::write(
539 subagents_dir.join("agent-aaa1.jsonl"),
540 format!("{}\n{}\n", agent_a_turn_1, agent_a_turn_2),
541 )
542 .unwrap();
543 fs::write(
545 subagents_dir.join("agent-aaa1.meta.json"),
546 r#"{"agentType":"builder","description":"Implement Phase 2"}"#,
547 )
548 .unwrap();
549
550 let agent_b_dup = r#"{"type":"assistant","uuid":"b1","timestamp":"2026-05-01T10:04:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":100,"output_tokens":200,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"dup"}]},"sessionId":"agent-bbb2","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-main-2"}"#; let agent_b_unique = r#"{"type":"assistant","uuid":"b2","timestamp":"2026-05-01T10:05:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":4,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"unique"}]},"sessionId":"agent-bbb2","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-agentB-2"}"#;
554 fs::write(
555 subagents_dir.join("agent-bbb2.jsonl"),
556 format!("{}\n{}\n", agent_b_dup, agent_b_unique),
557 )
558 .unwrap();
559 (tmp, session_uuid.to_string())
562 }
563
564 #[test]
565 fn pipeline_subagents_grouping_and_meta_injection() {
566 let (tmp, session_uuid) = write_fixture_session();
567 let calc = PricingCalculator::new();
568 let (sessions, _quality) = load_all(tmp.path(), &calc).unwrap();
569
570 assert_eq!(sessions.len(), 1);
571 let s = &sessions[0];
572 assert_eq!(s.session_id, session_uuid);
573
574 assert_eq!(s.subagents.len(), 2, "two agent files -> two subagents");
576 assert_eq!(s.subagents[0].agent_id, "agent-aaa1");
577 assert_eq!(s.subagents[1].agent_id, "agent-bbb2");
578
579 assert_eq!(s.subagents[0].turns.len(), 2);
581 assert_eq!(s.subagents[0].agent_type.as_deref(), Some("builder"));
582 assert_eq!(
583 s.subagents[0].description.as_deref(),
584 Some("Implement Phase 2")
585 );
586 assert!(s.subagents[0].first_timestamp.is_some());
587 assert!(s.subagents[0].last_timestamp.is_some());
588
589 assert_eq!(
592 s.subagents[1].turns.len(),
593 1,
594 "cross-file dedup should drop the duplicate"
595 );
596 assert!(s.subagents[1].agent_type.is_none());
597 assert!(s.subagents[1].description.is_none());
598
599 assert_eq!(s.turns.len(), 2);
601
602 assert_eq!(s.plugins.len(), 1);
604 assert_eq!(s.plugins[0].plugin, "superpowers");
605 assert_eq!(s.plugins[0].turns, 1);
606 assert_eq!(s.skills.len(), 1);
607 assert_eq!(s.skills[0].skill, "superpowers:brainstorming");
608
609 assert_eq!(s.hooks.len(), 1);
611 assert_eq!(s.hooks[0].command, "bash hook.sh");
612 assert_eq!(s.hooks[0].invocations, 1);
613 assert_eq!(s.hooks[0].total_duration_ms, 50);
614 assert_eq!(s.hooks[0].error_count, 0);
615 assert_eq!(s.hooks[0].prevented_continuation_count, 0);
616
617 assert_eq!(s.total_turn_count(), 2 + 2 + 1); assert_eq!(s.agent_turn_count(), 3);
620 }
621
622 #[test]
623 fn pipeline_aggregation_invariants() {
624 let (tmp, _session_uuid) = write_fixture_session();
626 let calc = PricingCalculator::new();
627 let (sessions, _quality) = load_all(tmp.path(), &calc).unwrap();
628 let s = &sessions[0];
629
630 let total_sub_turns: usize = s.subagents.iter().map(|sa| sa.turns.len()).sum();
633 assert_eq!(total_sub_turns, s.agent_turn_count());
634 assert_eq!(total_sub_turns, 3);
635
636 let attributed_turns = s
639 .turns
640 .iter()
641 .filter(|t| t.attribution_plugin.is_some())
642 .count() as u64;
643 let plugin_turn_sum: u64 = s.plugins.iter().map(|p| p.turns).sum();
644 assert_eq!(plugin_turn_sum, attributed_turns);
645
646 let session_turn_cost: f64 = s
648 .turns
649 .iter()
650 .map(|t| calc.calculate_turn_cost(&t.model, &t.usage).total)
651 .sum();
652 let plugin_cost: f64 = s.plugins.iter().map(|p| p.cost).sum();
653 assert!(
654 plugin_cost <= session_turn_cost + 1e-9,
655 "plugin cost {plugin_cost} must be <= session turn cost {session_turn_cost}"
656 );
657
658 let hook_invocations: u64 = s.hooks.iter().map(|h| h.invocations).sum();
667 assert!(
668 hook_invocations >= 1,
669 "expected at least one hook invocation in fixture"
670 );
671
672 for sa in &s.subagents {
674 for t in &sa.turns {
675 assert!(
676 t.attribution_plugin.is_none(),
677 "subagent turn unexpectedly has attributionPlugin"
678 );
679 assert!(
680 t.attribution_skill.is_none(),
681 "subagent turn unexpectedly has attributionSkill"
682 );
683 }
684 }
685 }
686
687 #[test]
688 fn pipeline_hooks_aggregation_multi_invocation() {
689 let tmp = TempDir::new().unwrap();
692 let project = tmp.path().join("projects").join("-Users-test-proj");
693 fs::create_dir_all(&project).unwrap();
694 let uuid = "22222222-3333-4444-5555-666666666666";
695
696 let asst = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"22222222-3333-4444-5555-666666666666","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
699 let h1 = r#"{"type":"system","subtype":"stop_hook_summary","hookCount":1,"hookInfos":[{"command":"bash run.sh","durationMs":100}],"hookErrors":[],"preventedContinuation":false,"sessionId":"22222222-3333-4444-5555-666666666666"}"#;
701 let h2 = r#"{"type":"system","subtype":"stop_hook_summary","hookCount":1,"hookInfos":[{"command":"bash run.sh","durationMs":200}],"hookErrors":[{"msg":"oops"}],"preventedContinuation":false,"sessionId":"22222222-3333-4444-5555-666666666666"}"#;
702 let h3 = r#"{"type":"system","subtype":"stop_hook_summary","hookCount":1,"hookInfos":[{"command":"bash run.sh","durationMs":300}],"hookErrors":[],"preventedContinuation":true,"sessionId":"22222222-3333-4444-5555-666666666666"}"#;
703 fs::write(
704 project.join(format!("{}.jsonl", uuid)),
705 format!("{asst}\n{h1}\n{h2}\n{h3}\n"),
706 )
707 .unwrap();
708
709 let calc = PricingCalculator::new();
710 let (sessions, _q) = load_all(tmp.path(), &calc).unwrap();
711 assert_eq!(sessions.len(), 1);
712 let s = &sessions[0];
713 assert_eq!(s.hooks.len(), 1, "all three invocations share one command");
714 let h = &s.hooks[0];
715 assert_eq!(h.command, "bash run.sh");
716 assert_eq!(h.invocations, 3);
717 assert_eq!(h.total_duration_ms, 600);
718 assert_eq!(h.error_count, 1);
719 assert_eq!(h.prevented_continuation_count, 1);
720 }
721
722 #[test]
723 fn pipeline_old_session_has_empty_capability_arrays() {
724 let tmp = TempDir::new().unwrap();
727 let project = tmp.path().join("projects").join("-Users-test-proj");
728 fs::create_dir_all(&project).unwrap();
729 let uuid = "33333333-4444-5555-6666-777777777777";
730 let asst = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"33333333-4444-5555-6666-777777777777","version":"2.1.90","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
731 fs::write(project.join(format!("{}.jsonl", uuid)), format!("{asst}\n")).unwrap();
732
733 let calc = PricingCalculator::new();
734 let (sessions, _q) = load_all(tmp.path(), &calc).unwrap();
735 assert_eq!(sessions.len(), 1);
736 let s = &sessions[0];
737 assert!(
738 s.plugins.is_empty(),
739 "old session must produce empty plugins Vec"
740 );
741 assert!(
742 s.skills.is_empty(),
743 "old session must produce empty skills Vec"
744 );
745 assert!(
746 s.hooks.is_empty(),
747 "old session must produce empty hooks Vec"
748 );
749 assert!(
750 s.subagents.is_empty(),
751 "session without agent files must produce empty subagents Vec"
752 );
753 }
754
755 #[test]
760 fn loader_marks_orphan_subagent_as_orphan() {
761 let tmp = TempDir::new().unwrap();
762 let project = tmp.path().join("projects").join("-Users-test-proj");
763 let parent_uuid = "99999999-aaaa-bbbb-cccc-dddddddddddd";
764 let subagents_dir = project.join(parent_uuid).join("subagents");
765 fs::create_dir_all(&subagents_dir).unwrap();
766 let agent_turn = r#"{"type":"assistant","uuid":"a1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":5,"output_tokens":10,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"orphan-agent"}]},"sessionId":"agent-orphan-1","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-orphan-1"}"#;
770 fs::write(
771 subagents_dir.join("agent-orphan-1.jsonl"),
772 format!("{}\n", agent_turn),
773 )
774 .unwrap();
775
776 let calc = PricingCalculator::new();
777 let (sessions, quality) = load_all(tmp.path(), &calc).unwrap();
778
779 assert_eq!(
780 sessions.len(),
781 1,
782 "loader should reconstruct an orphan parent session"
783 );
784 let s = &sessions[0];
785 assert_eq!(s.session_id, parent_uuid);
786 assert!(s.is_orphan, "synthesized parent must be flagged as orphan");
787 assert_eq!(s.subagents.len(), 1);
789 assert_eq!(s.subagents[0].turns.len(), 1);
790 assert_eq!(quality.orphan_agents, 1);
792 }
793
794 #[test]
797 fn loader_marks_normal_session_as_not_orphan() {
798 let (tmp, session_uuid) = write_fixture_session();
799 let calc = PricingCalculator::new();
800 let (sessions, quality) = load_all(tmp.path(), &calc).unwrap();
801 assert_eq!(sessions.len(), 1);
802 let s = &sessions[0];
803 assert_eq!(s.session_id, session_uuid);
804 assert!(
805 !s.is_orphan,
806 "session with parent main jsonl present must not be orphan"
807 );
808 assert_eq!(quality.orphan_agents, 0);
810 }
811
812 #[test]
816 fn subagent_type_aggregation_groups_by_agent_type() {
817 use crate::data::models::{Subagent, ValidatedTurn};
818
819 let calc = PricingCalculator::new();
820
821 let make_agent = |agent_id: &str,
823 agent_type: Option<&str>,
824 description: Option<&str>,
825 turns: usize|
826 -> Subagent {
827 let mut tlist = Vec::with_capacity(turns);
828 for i in 0..turns {
829 tlist.push(ValidatedTurn {
830 uuid: format!("{}-{}", agent_id, i),
831 request_id: Some(format!("{}-r-{}", agent_id, i)),
832 timestamp: "2026-05-01T10:00:00Z".parse().unwrap(),
833 model: "claude-opus-4-6".into(),
834 usage: crate::data::models::TokenUsage {
835 input_tokens: Some(100),
836 output_tokens: Some(200),
837 cache_creation_input_tokens: Some(0),
838 cache_read_input_tokens: Some(0),
839 cache_creation: None,
840 server_tool_use: None,
841 service_tier: None,
842 speed: None,
843 inference_geo: None,
844 },
845 stop_reason: None,
846 content_types: vec![],
847 is_agent: true,
848 agent_id: Some(agent_id.to_string()),
849 user_text: None,
850 assistant_text: None,
851 tool_names: vec![],
852 service_tier: None,
853 speed: None,
854 inference_geo: None,
855 tool_error_count: 0,
856 git_branch: None,
857 attribution_plugin: None,
858 attribution_skill: None,
859 });
860 }
861 Subagent {
862 agent_id: agent_id.to_string(),
863 agent_type: agent_type.map(|s| s.to_string()),
864 description: description.map(|s| s.to_string()),
865 turns: tlist,
866 first_timestamp: None,
867 last_timestamp: None,
868 }
869 };
870
871 let session = SessionData {
872 session_id: "s1".into(),
873 project: Some("p".into()),
874 turns: Vec::new(),
875 subagents: vec![
876 make_agent("agent-aaa", Some("builder"), Some("task A"), 2),
877 make_agent("agent-bbb", Some("builder"), Some("task B"), 3),
878 make_agent("agent-ccc", Some("code-reviewer"), Some("review X"), 1),
879 ],
880 plugins: Vec::new(),
881 skills: Vec::new(),
882 hooks: Vec::new(),
883 first_timestamp: None,
884 last_timestamp: None,
885 version: None,
886 quality: DataQuality::default(),
887 metadata: super::SessionMetadata::default(),
888 is_orphan: false,
889 };
890
891 let aggs = session.subagent_type_aggregates(&calc);
892 assert_eq!(aggs.len(), 2);
894 assert_eq!(aggs[0].agent_type, "builder");
895 assert_eq!(aggs[0].count, 2);
896 assert_eq!(aggs[0].total_turns, 5); assert_eq!(aggs[0].total_input_tokens, 500); assert_eq!(aggs[0].total_output_tokens, 1000); assert!(aggs[0].total_cost > 0.0);
900 assert_eq!(
901 aggs[0].descriptions,
902 vec!["task A".to_string(), "task B".to_string()]
903 );
904
905 assert_eq!(aggs[1].agent_type, "code-reviewer");
906 assert_eq!(aggs[1].count, 1);
907 assert_eq!(aggs[1].total_turns, 1);
908 assert_eq!(aggs[1].descriptions, vec!["review X".to_string()]);
909 }
910
911 #[test]
914 fn subagent_type_aggregation_handles_missing_type() {
915 use crate::data::models::{Subagent, ValidatedTurn};
916
917 let calc = PricingCalculator::new();
918 let make_turn = |id: &str| ValidatedTurn {
919 uuid: id.to_string(),
920 request_id: Some(format!("r-{}", id)),
921 timestamp: "2026-05-01T10:00:00Z".parse().unwrap(),
922 model: "claude-opus-4-6".into(),
923 usage: crate::data::models::TokenUsage {
924 input_tokens: Some(50),
925 output_tokens: Some(50),
926 cache_creation_input_tokens: Some(0),
927 cache_read_input_tokens: Some(0),
928 cache_creation: None,
929 server_tool_use: None,
930 service_tier: None,
931 speed: None,
932 inference_geo: None,
933 },
934 stop_reason: None,
935 content_types: vec![],
936 is_agent: true,
937 agent_id: Some("agent-no-meta".into()),
938 user_text: None,
939 assistant_text: None,
940 tool_names: vec![],
941 service_tier: None,
942 speed: None,
943 inference_geo: None,
944 tool_error_count: 0,
945 git_branch: None,
946 attribution_plugin: None,
947 attribution_skill: None,
948 };
949
950 let session = SessionData {
951 session_id: "s1".into(),
952 project: Some("p".into()),
953 turns: Vec::new(),
954 subagents: vec![Subagent {
955 agent_id: "agent-no-meta".into(),
956 agent_type: None, description: None,
958 turns: vec![make_turn("t1")],
959 first_timestamp: None,
960 last_timestamp: None,
961 }],
962 plugins: Vec::new(),
963 skills: Vec::new(),
964 hooks: Vec::new(),
965 first_timestamp: None,
966 last_timestamp: None,
967 version: None,
968 quality: DataQuality::default(),
969 metadata: super::SessionMetadata::default(),
970 is_orphan: false,
971 };
972
973 let aggs = session.subagent_type_aggregates(&calc);
974 assert_eq!(
975 aggs.len(),
976 1,
977 "agent_type=None should still produce one aggregate, not drop the data"
978 );
979 assert_eq!(aggs[0].agent_type, "unknown");
980 assert_eq!(aggs[0].count, 1);
981 assert_eq!(aggs[0].total_turns, 1);
982 }
983
984 #[test]
987 fn global_totals_include_orphan_sessions() {
988 let tmp = TempDir::new().unwrap();
990 let project = tmp.path().join("projects").join("-Users-test-proj");
991 let parent_uuid = "88888888-aaaa-bbbb-cccc-dddddddddddd";
992 let subagents_dir = project.join(parent_uuid).join("subagents");
993 fs::create_dir_all(&subagents_dir).unwrap();
994 let t1 = r#"{"type":"assistant","uuid":"a1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":1000,"output_tokens":2000,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"x"}]},"sessionId":"agent-orphan-z","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-orph-1"}"#;
996 let t2 = r#"{"type":"assistant","uuid":"a2","timestamp":"2026-05-01T10:01:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3000,"output_tokens":4000,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"y"}]},"sessionId":"agent-orphan-z","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-orph-2"}"#;
997 fs::write(
998 subagents_dir.join("agent-orphan-z.jsonl"),
999 format!("{}\n{}\n", t1, t2),
1000 )
1001 .unwrap();
1002
1003 let calc = PricingCalculator::new();
1004 let (sessions, quality) = load_all(tmp.path(), &calc).unwrap();
1005 assert_eq!(sessions.len(), 1);
1006 assert!(sessions[0].is_orphan);
1007
1008 let overview = crate::analysis::overview::analyze_overview(&sessions, quality, &calc, None);
1011 assert_eq!(overview.total_sessions, 1);
1012 assert_eq!(overview.total_turns, 2);
1013 assert_eq!(overview.total_agent_turns, 2);
1014 assert!(
1015 overview.total_cost > 0.0,
1016 "orphan session's cost must flow into total_cost"
1017 );
1018 assert_eq!(overview.total_output_tokens, 6000);
1020 }
1021
1022 #[test]
1023 fn pipeline_subagents_many() {
1024 let tmp = TempDir::new().unwrap();
1027 let project = tmp.path().join("projects").join("-Users-test-proj");
1028 fs::create_dir_all(&project).unwrap();
1029 let uuid = "44444444-5555-6666-7777-888888888888";
1030
1031 let main_turn = r#"{"type":"assistant","uuid":"m1","timestamp":"2026-05-01T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"44444444-5555-6666-7777-888888888888","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r-main-1"}"#;
1033 fs::write(
1034 project.join(format!("{}.jsonl", uuid)),
1035 format!("{}\n", main_turn),
1036 )
1037 .unwrap();
1038
1039 let subagents_dir = project.join(uuid).join("subagents");
1040 fs::create_dir_all(&subagents_dir).unwrap();
1041
1042 for i in 0..10 {
1043 let line1 = format!(
1045 r#"{{"type":"assistant","uuid":"a{i}-1","timestamp":"2026-05-01T10:0{i}:00Z","message":{{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{{"input_tokens":1,"output_tokens":2,"cache_creation_input_tokens":0,"cache_read_input_tokens":0}},"content":[{{"type":"text","text":"a"}}]}},"sessionId":"agent-id{i:03}","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-{i}-1"}}"#
1046 );
1047 let line2 = format!(
1048 r#"{{"type":"assistant","uuid":"a{i}-2","timestamp":"2026-05-01T10:0{i}:01Z","message":{{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{{"input_tokens":1,"output_tokens":2,"cache_creation_input_tokens":0,"cache_read_input_tokens":0}},"content":[{{"type":"text","text":"a"}}]}},"sessionId":"agent-id{i:03}","version":"2.1.140","cwd":"/tmp","gitBranch":"main","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r-{i}-2"}}"#
1049 );
1050 fs::write(
1051 subagents_dir.join(format!("agent-id{i:03}.jsonl")),
1052 format!("{line1}\n{line2}\n"),
1053 )
1054 .unwrap();
1055 }
1056
1057 let calc = PricingCalculator::new();
1058 let (sessions, _q) = load_all(tmp.path(), &calc).unwrap();
1059 assert_eq!(sessions.len(), 1);
1060 let s = &sessions[0];
1061 assert_eq!(s.subagents.len(), 10, "all 10 agent files become subagents");
1062 for sa in &s.subagents {
1063 assert_eq!(sa.turns.len(), 2);
1064 }
1065 let ids: Vec<&str> = s.subagents.iter().map(|sa| sa.agent_id.as_str()).collect();
1067 let mut sorted = ids.clone();
1068 sorted.sort();
1069 assert_eq!(ids, sorted);
1070
1071 assert_eq!(s.total_turn_count(), 21);
1073 assert_eq!(s.agent_turn_count(), 20);
1074 }
1075}