1use anyhow::{Context, Result};
2use std::collections::{HashMap, HashSet};
3use std::fmt;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use crate::data::models::{GlobalDataQuality, SessionData, SessionFile};
9use crate::data::scanner;
10use crate::pricing::calculator::PricingCalculator;
11
12#[derive(Debug)]
15pub struct ValidationReport {
16 pub session_results: Vec<SessionValidation>,
17 pub structure_checks: Vec<Check>,
18 pub summary: ValidationSummary,
19}
20
21#[derive(Debug)]
22pub struct SessionValidation {
23 pub session_id: String,
24 pub project: String,
25 pub token_checks: Vec<Check>,
26 pub agent_checks: Vec<Check>,
27}
28
29#[derive(Debug)]
30pub struct Check {
31 pub name: String,
32 pub expected: String,
33 pub actual: String,
34 pub passed: bool,
35}
36
37impl Check {
38 fn pass(name: impl Into<String>, value: impl fmt::Display) -> Self {
39 let v = value.to_string();
40 Self { name: name.into(), expected: v.clone(), actual: v, passed: true }
41 }
42
43 fn compare(name: impl Into<String>, expected: impl fmt::Display, actual: impl fmt::Display) -> Self {
44 let e = expected.to_string();
45 let a = actual.to_string();
46 let passed = e == a;
47 Self { name: name.into(), expected: e, actual: a, passed }
48 }
49
50 #[allow(dead_code)]
51 fn compare_f64(name: impl Into<String>, expected: f64, actual: f64, tolerance: f64) -> Self {
52 let passed = (expected - actual).abs() < tolerance;
53 Self {
54 name: name.into(),
55 expected: format!("{:.2}", expected),
56 actual: format!("{:.2}", actual),
57 passed,
58 }
59 }
60}
61
62#[derive(Debug, Default)]
63pub struct ValidationSummary {
64 pub total_checks: usize,
65 pub passed: usize,
66 pub failed: usize,
67 pub sessions_validated: usize,
68 pub sessions_passed: usize,
69}
70
71#[derive(Debug, Default)]
76struct RawTokenCount {
77 input_tokens: u64,
78 output_tokens: u64,
79 cache_creation_tokens: u64,
80 cache_read_tokens: u64,
81 turn_count: usize,
82}
83
84fn is_valid_assistant(val: &serde_json::Value, skip_sidechain: bool, now: &chrono::DateTime<chrono::Utc>) -> bool {
91 if val.get("type").and_then(|t| t.as_str()) != Some("assistant") {
92 return false;
93 }
94 if skip_sidechain && val.get("isSidechain").and_then(|v| v.as_bool()) == Some(true) {
95 return false;
96 }
97 let model = val.pointer("/message/model").and_then(|m| m.as_str());
98 if model == Some("<synthetic>") || model.is_none() {
99 return false;
100 }
101 if val.pointer("/message/usage").is_none() {
103 return false;
104 }
105 let input = val.pointer("/message/usage/input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
106 let output = val.pointer("/message/usage/output_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
107 let cache_creation = val.pointer("/message/usage/cache_creation_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
108 let cache_read = val.pointer("/message/usage/cache_read_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
109 if input + output + cache_creation + cache_read == 0 {
110 return false;
111 }
112 if let Some(ts_str) = val.get("timestamp").and_then(|t| t.as_str()) {
114 if let Ok(ts) = ts_str.parse::<chrono::DateTime<chrono::Utc>>() {
115 if ts > *now {
116 return false;
117 }
118 } else {
119 return false;
120 }
121 } else {
122 return false;
123 }
124 true
125}
126
127fn count_raw_tokens(path: &Path, skip_sidechain: bool) -> Result<RawTokenCount> {
130 let file = File::open(path)
131 .with_context(|| format!("raw counter: failed to open {}", path.display()))?;
132 let reader = BufReader::new(file);
133 let now = chrono::Utc::now();
134
135 let mut by_request: HashMap<String, (u64, u64, u64, u64)> = HashMap::new();
137 let mut no_request_id_count = RawTokenCount::default();
138
139 for line in reader.lines() {
140 let line = line?;
141 let val: serde_json::Value = match serde_json::from_str(&line) {
142 Ok(v) => v,
143 Err(_) => continue,
144 };
145
146 if !is_valid_assistant(&val, skip_sidechain, &now) {
147 continue;
148 }
149
150 let input = val.pointer("/message/usage/input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
151 let output = val.pointer("/message/usage/output_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
152 let cache_creation = val.pointer("/message/usage/cache_creation_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
153 let cache_read = val.pointer("/message/usage/cache_read_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
154
155 let request_id = val.get("requestId").and_then(|r| r.as_str());
156
157 match request_id {
158 Some(rid) if !rid.is_empty() => {
159 by_request.insert(rid.to_string(), (input, output, cache_creation, cache_read));
160 }
161 _ => {
162 no_request_id_count.input_tokens += input;
163 no_request_id_count.output_tokens += output;
164 no_request_id_count.cache_creation_tokens += cache_creation;
165 no_request_id_count.cache_read_tokens += cache_read;
166 no_request_id_count.turn_count += 1;
167 }
168 }
169 }
170
171 let mut result = no_request_id_count;
172 for (input, output, cc, cr) in by_request.values() {
173 result.input_tokens += input;
174 result.output_tokens += output;
175 result.cache_creation_tokens += cc;
176 result.cache_read_tokens += cr;
177 result.turn_count += 1;
178 }
179
180 Ok(result)
181}
182
183fn count_tokens_by_request_id(path: &Path, skip_sidechain: bool) -> Result<(HashMap<String, u64>, u64)> {
187 let file = File::open(path)?;
188 let reader = BufReader::new(file);
189 let now = chrono::Utc::now();
190 let mut by_rid: HashMap<String, u64> = HashMap::new();
191 let mut no_rid_output: u64 = 0;
192
193 for line in reader.lines() {
194 let line = line?;
195 let val: serde_json::Value = match serde_json::from_str(&line) {
196 Ok(v) => v,
197 Err(_) => continue,
198 };
199 if !is_valid_assistant(&val, skip_sidechain, &now) {
200 continue;
201 }
202 let output = val.pointer("/message/usage/output_tokens")
203 .and_then(|v| v.as_u64()).unwrap_or(0);
204 match val.get("requestId").and_then(|r| r.as_str()) {
205 Some(rid) if !rid.is_empty() => {
206 by_rid.insert(rid.to_string(), output);
207 }
208 _ => {
209 no_rid_output += output;
210 }
211 }
212 }
213 Ok((by_rid, no_rid_output))
214}
215
216fn collect_valid_request_ids(path: &Path, skip_sidechain: bool) -> Result<HashSet<String>> {
219 let file = File::open(path)?;
220 let reader = BufReader::new(file);
221 let now = chrono::Utc::now();
222 let mut ids = HashSet::new();
223
224 for line in reader.lines() {
225 let line = line?;
226 let val: serde_json::Value = match serde_json::from_str(&line) {
227 Ok(v) => v,
228 Err(_) => continue,
229 };
230 if !is_valid_assistant(&val, skip_sidechain, &now) {
231 continue;
232 }
233 if let Some(rid) = val.get("requestId").and_then(|r| r.as_str()) {
234 if !rid.is_empty() {
235 ids.insert(rid.to_string());
236 }
237 }
238 }
239 Ok(ids)
240}
241
242pub fn validate_all(
246 sessions: &[&SessionData],
247 quality: &GlobalDataQuality,
248 claude_home: &Path,
249 calc: &PricingCalculator,
250) -> Result<ValidationReport> {
251 let mut files = scanner::scan_claude_home(claude_home)?;
253 scanner::resolve_agent_parents(&mut files)?;
254
255 let (main_files, agent_files): (Vec<&SessionFile>, Vec<&SessionFile>) =
256 files.iter().partition(|f| !f.is_agent);
257
258 let mut structure_checks = Vec::new();
259 let mut session_results = Vec::new();
260
261 structure_checks.push(Check::compare(
265 "session_count == main_file_count",
266 main_files.len(),
267 quality.total_session_files,
268 ));
269
270 structure_checks.push(Check::compare(
272 "agent_file_count",
273 agent_files.len(),
274 quality.total_agent_files,
275 ));
276
277 let main_session_ids: HashSet<&str> = main_files.iter()
279 .map(|f| f.session_id.as_str())
280 .collect();
281 let orphan_count = agent_files.iter()
282 .filter(|f| {
283 let parent = f.parent_session_id.as_deref()
284 .unwrap_or(&f.session_id);
285 !main_session_ids.contains(parent)
286 })
287 .count();
288 structure_checks.push(Check::pass(
289 format!("orphan_agents (no main session file): {}", orphan_count),
290 orphan_count,
291 ));
292
293 let unique_main_ids: HashSet<&str> = main_files.iter()
295 .map(|f| f.session_id.as_str())
296 .collect();
297 let dup_count = main_files.len() - unique_main_ids.len();
298 structure_checks.push(Check::pass(
299 format!("main_session_files: {} files, {} unique IDs ({} duplicates)", main_files.len(), unique_main_ids.len(), dup_count),
300 main_files.len(),
301 ));
302
303 let mut cross_file_overlap = 0usize;
305 for agent in &agent_files {
306 let parent_id = agent.parent_session_id.as_deref()
307 .unwrap_or(&agent.session_id);
308 let parent_file = main_files.iter()
309 .find(|f| f.session_id == parent_id);
310 if let Some(pf) = parent_file {
311 let parent_rids = collect_valid_request_ids(&pf.file_path, true).unwrap_or_default();
312 let agent_rids = collect_valid_request_ids(&agent.file_path, false).unwrap_or_default();
313 cross_file_overlap += parent_rids.intersection(&agent_rids).count();
314 }
315 }
316 structure_checks.push(Check::pass(
317 format!("cross_file_overlapping_request_ids (deduped: {})", cross_file_overlap),
318 cross_file_overlap,
319 ));
320
321 let mut agents_by_parent: HashMap<&str, Vec<&SessionFile>> = HashMap::new();
325 for af in &agent_files {
326 let parent_id = af.parent_session_id.as_deref()
327 .unwrap_or(&af.session_id);
328 agents_by_parent.entry(parent_id).or_default().push(af);
329 }
330
331 let main_file_map: HashMap<&str, &SessionFile> = main_files.iter()
333 .map(|f| (f.session_id.as_str(), *f))
334 .collect();
335
336 for session in sessions {
337 let mut token_checks = Vec::new();
338 let mut agent_checks = Vec::new();
339
340 if let Some(mf) = main_file_map.get(session.session_id.as_str()) {
342 let raw_main = count_raw_tokens(&mf.file_path, true)
343 .unwrap_or_default();
344
345 let pipeline_main_input: u64 = session.turns.iter()
347 .map(|t| t.usage.input_tokens.unwrap_or(0)).sum();
348 let pipeline_main_output: u64 = session.turns.iter()
349 .map(|t| t.usage.output_tokens.unwrap_or(0)).sum();
350 let pipeline_main_cache_creation: u64 = session.turns.iter()
351 .map(|t| t.usage.cache_creation_input_tokens.unwrap_or(0)).sum();
352 let pipeline_main_cache_read: u64 = session.turns.iter()
353 .map(|t| t.usage.cache_read_input_tokens.unwrap_or(0)).sum();
354 let pipeline_main_turns = session.turns.len();
355
356 token_checks.push(Check::compare(
357 "main_turn_count",
358 raw_main.turn_count,
359 pipeline_main_turns,
360 ));
361 token_checks.push(Check::compare(
362 "main_input_tokens",
363 raw_main.input_tokens,
364 pipeline_main_input,
365 ));
366 token_checks.push(Check::compare(
367 "main_output_tokens",
368 raw_main.output_tokens,
369 pipeline_main_output,
370 ));
371 token_checks.push(Check::compare(
372 "main_cache_creation_tokens",
373 raw_main.cache_creation_tokens,
374 pipeline_main_cache_creation,
375 ));
376 token_checks.push(Check::compare(
377 "main_cache_read_tokens",
378 raw_main.cache_read_tokens,
379 pipeline_main_cache_read,
380 ));
381 }
382
383 let agent_session_files = agents_by_parent.get(session.session_id.as_str());
385 let expected_agent_files = agent_session_files.map_or(0, |v| v.len());
386 let actual_agent_file_count = if expected_agent_files > 0 { expected_agent_files } else { 0 };
387
388 agent_checks.push(Check::compare(
389 "agent_file_count (from scanner)",
390 actual_agent_file_count,
391 expected_agent_files,
392 ));
393
394 if expected_agent_files > 0 {
396 if let Some(afs) = agent_session_files {
397 let main_file = main_file_map.get(session.session_id.as_str());
399 let main_rids = main_file
400 .map(|mf| collect_valid_request_ids(&mf.file_path, true).unwrap_or_default())
401 .unwrap_or_default();
402
403 let mut expected_unique_agent_turns = 0usize;
405 let mut raw_agent_output: u64 = 0;
406
407 for af in afs {
408 let raw = count_raw_tokens(&af.file_path, false)
409 .unwrap_or_default();
410 let file_rids = collect_valid_request_ids(&af.file_path, false)
411 .unwrap_or_default();
412 let file_overlap = file_rids.intersection(&main_rids).count();
413 let unique_turns = raw.turn_count.saturating_sub(file_overlap);
414 expected_unique_agent_turns += unique_turns;
415
416 let (per_rid, no_rid_output) = count_tokens_by_request_id(&af.file_path, false)
418 .unwrap_or_default();
419 for (rid, output) in &per_rid {
420 if !main_rids.contains(rid) {
421 raw_agent_output += output;
422 }
423 }
424 raw_agent_output += no_rid_output;
425 }
426
427 agent_checks.push(Check::compare(
429 "agent_turn_count (after cross-file dedup)",
430 expected_unique_agent_turns,
431 session.agent_turns.len(),
432 ));
433
434 if expected_unique_agent_turns > 0 {
436 agent_checks.push(Check::compare(
437 "has_agent_turns (non-overlapping exist)",
438 "true",
439 (!session.agent_turns.is_empty()).to_string(),
440 ));
441 }
442
443 let pipeline_agent_output: u64 = session.agent_turns.iter()
445 .map(|t| t.usage.output_tokens.unwrap_or(0)).sum();
446
447 let agent_output_match = {
448 if raw_agent_output == 0 && pipeline_agent_output == 0 { true }
449 else {
450 let max_val = raw_agent_output.max(pipeline_agent_output) as f64;
451 if max_val == 0.0 { true }
452 else { (raw_agent_output as f64 - pipeline_agent_output as f64).abs() / max_val < 0.05 }
453 }
454 };
455
456 agent_checks.push(Check {
457 name: "agent_output_tokens (±5%)".into(),
458 expected: raw_agent_output.to_string(),
459 actual: pipeline_agent_output.to_string(),
460 passed: agent_output_match,
461 });
462
463 let all_marked_agent = session.agent_turns.iter().all(|t| t.is_agent);
465 agent_checks.push(Check::compare(
466 "all agent_turns have is_agent=true",
467 "true",
468 all_marked_agent.to_string(),
469 ));
470 }
471 }
472
473 let pipeline_total_output: u64 = session.turns.iter()
475 .chain(session.agent_turns.iter())
476 .map(|t| t.usage.output_tokens.unwrap_or(0)).sum();
477 let pipeline_total_turns = session.turns.len() + session.agent_turns.len();
478
479 token_checks.push(Check::compare(
481 "total_turn_count == turns + agent_turns",
482 pipeline_total_turns,
483 session.all_responses().len(),
484 ));
485
486 if pipeline_total_turns > 0 {
488 token_checks.push(Check::compare(
489 "total_output_tokens > 0",
490 "true",
491 (pipeline_total_output > 0).to_string(),
492 ));
493 }
494
495 let pipeline_cost: f64 = session.turns.iter()
497 .chain(session.agent_turns.iter())
498 .map(|t| calc.calculate_turn_cost(&t.model, &t.usage).total)
499 .sum();
500
501 let has_tokens = session.turns.iter().chain(session.agent_turns.iter())
503 .any(|t| {
504 t.usage.input_tokens.unwrap_or(0) > 0
505 || t.usage.output_tokens.unwrap_or(0) > 0
506 });
507 if has_tokens {
508 token_checks.push(Check::compare(
509 "cost > 0 when tokens exist",
510 "true",
511 (pipeline_cost > 0.0).to_string(),
512 ));
513 }
514
515 if let Some(mf) = main_file_map.get(session.session_id.as_str()) {
517 token_checks.push(Check::compare(
518 "project_association",
519 mf.project.as_deref().unwrap_or("(none)"),
520 session.project.as_deref().unwrap_or("(none)"),
521 ));
522 }
523
524 let project_name = session.project.as_deref().unwrap_or("(unknown)").to_string();
525
526 session_results.push(SessionValidation {
527 session_id: session.session_id.clone(),
528 project: project_name,
529 token_checks,
530 agent_checks,
531 });
532 }
533
534 let mut summary = ValidationSummary::default();
537
538 for check in &structure_checks {
539 summary.total_checks += 1;
540 if check.passed { summary.passed += 1; } else { summary.failed += 1; }
541 }
542
543 for sv in &session_results {
544 summary.sessions_validated += 1;
545 let mut session_pass = true;
546 for check in sv.token_checks.iter().chain(sv.agent_checks.iter()) {
547 summary.total_checks += 1;
548 if check.passed {
549 summary.passed += 1;
550 } else {
551 summary.failed += 1;
552 session_pass = false;
553 }
554 }
555 if session_pass {
556 summary.sessions_passed += 1;
557 }
558 }
559
560 Ok(ValidationReport {
561 session_results,
562 structure_checks,
563 summary,
564 })
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570 use std::io::Write;
571 use tempfile::NamedTempFile;
572
573 fn make_assistant_line(request_id: &str, input: u64, output: u64) -> String {
574 format!(
575 r#"{{"type":"assistant","uuid":"u-{}","timestamp":"2026-03-16T10:00:00Z","message":{{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{{"input_tokens":{},"output_tokens":{},"cache_creation_input_tokens":0,"cache_read_input_tokens":0}},"content":[{{"type":"text","text":"hi"}}]}},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"{}"}}"#,
576 request_id, input, output, request_id
577 )
578 }
579
580 #[test]
581 fn raw_counter_basic() {
582 let mut f = NamedTempFile::new().unwrap();
583 writeln!(f, "{}", make_assistant_line("r1", 100, 50)).unwrap();
584 writeln!(f, "{}", make_assistant_line("r2", 200, 75)).unwrap();
585 f.flush().unwrap();
586
587 let result = count_raw_tokens(f.path(), true).unwrap();
588 assert_eq!(result.turn_count, 2);
589 assert_eq!(result.input_tokens, 300);
590 assert_eq!(result.output_tokens, 125);
591 }
592
593 #[test]
594 fn raw_counter_deduplicates_streaming() {
595 let mut f = NamedTempFile::new().unwrap();
596 writeln!(f, "{}", make_assistant_line("r1", 100, 50)).unwrap();
598 writeln!(f, "{}", make_assistant_line("r1", 200, 75)).unwrap();
599 f.flush().unwrap();
600
601 let result = count_raw_tokens(f.path(), true).unwrap();
602 assert_eq!(result.turn_count, 1);
603 assert_eq!(result.input_tokens, 200);
604 assert_eq!(result.output_tokens, 75);
605 }
606
607 #[test]
608 fn raw_counter_skips_synthetic() {
609 let mut f = NamedTempFile::new().unwrap();
610 writeln!(f, r#"{{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{{"input_tokens":100,"output_tokens":50,"cache_creation_input_tokens":0,"cache_read_input_tokens":0}},"content":[]}},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}}"#).unwrap();
611 writeln!(f, "{}", make_assistant_line("r2", 200, 75)).unwrap();
612 f.flush().unwrap();
613
614 let result = count_raw_tokens(f.path(), true).unwrap();
615 assert_eq!(result.turn_count, 1);
616 assert_eq!(result.input_tokens, 200);
617 }
618
619 #[test]
620 fn raw_counter_respects_sidechain_flag() {
621 let sidechain_line = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":100,"output_tokens":50,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":null,"requestId":"r1"}"#;
622 let mut f = NamedTempFile::new().unwrap();
623 writeln!(f, "{}", sidechain_line).unwrap();
624 f.flush().unwrap();
625
626 let result = count_raw_tokens(f.path(), true).unwrap();
628 assert_eq!(result.turn_count, 0);
629
630 let result = count_raw_tokens(f.path(), false).unwrap();
632 assert_eq!(result.turn_count, 1);
633 assert_eq!(result.input_tokens, 100);
634 }
635
636 #[test]
637 fn raw_counter_skips_non_assistant() {
638 let mut f = NamedTempFile::new().unwrap();
639 writeln!(f, r#"{{"type":"user","uuid":"u1","message":{{"role":"user","content":"hi"}},"timestamp":"2026-03-16T10:00:00Z","sessionId":"s1"}}"#).unwrap();
640 writeln!(f, r#"{{"type":"progress","data":{{"type":"hook"}},"uuid":"u2","timestamp":"2026-03-16T10:00:00Z","sessionId":"s1"}}"#).unwrap();
641 writeln!(f, "{}", make_assistant_line("r1", 100, 50)).unwrap();
642 f.flush().unwrap();
643
644 let result = count_raw_tokens(f.path(), true).unwrap();
645 assert_eq!(result.turn_count, 1);
646 }
647}