Skip to main content

agx_core/
corpus.rs

1//! Corpus-level analytics for `agx corpus <dir>`. Walks a directory tree,
2//! loads every session file it finds in parallel, and aggregates
3//! cross-session stats (tokens, cost, per-model / per-tool / per-format
4//! breakdowns).
5//!
6//! Design notes:
7//!
8//! - **Silent skip on non-session files.** The directory scan has no
9//!   file-extension heuristic; we try every file and silently drop
10//!   anything `format::detect` rejects. That lets users point agx at a
11//!   dump of assorted files without getting noisy errors from `.DS_Store`
12//!   / `README.md` / binaries. A file that LOOKS like a session but
13//!   fails to parse still counts as an error — real format drift, not
14//!   "this isn't a session file".
15//!
16//! - **Parallel parse via rayon.** Session files are embarrassingly
17//!   parallel; on a typical corpus of a few hundred sessions the
18//!   load phase fits under a second on a modern laptop. The walk
19//!   itself stays serial (directory traversal is IO-bound enough that
20//!   parallelism doesn't help, and a single `read_dir` iterator is
21//!   simpler than managing a thread pool for the walk).
22//!
23//! - **Filters are AND-combined.** `--filter model=X --filter tool=Y`
24//!   keeps only sessions that used both. Filter predicates run after
25//!   per-session parse so we can filter on observed content.
26
27use crate::format::{self, Format};
28use crate::loader::load_session;
29use crate::timeline::{SessionTotals, Step, compute_session_totals, compute_tool_stats};
30use anyhow::{Result, anyhow};
31use rayon::prelude::*;
32use serde::Serialize;
33use std::collections::HashMap;
34use std::path::{Path, PathBuf};
35
36/// One filter predicate from the `--filter` CLI flag. Multiple filters
37/// are AND-combined by the caller.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum Filter {
40    /// `--filter model=X` — keep sessions whose unique-models list includes X.
41    Model(String),
42    /// `--filter tool=X` — keep sessions that invoked tool X at least once.
43    Tool(String),
44    /// `--filter errored` — keep sessions where at least one tool_result
45    /// matched `is_error_result`.
46    Errored,
47    /// `--filter annotated` — keep sessions with at least one user note
48    /// stored under `~/.agx/notes/`.
49    Annotated,
50}
51
52impl Filter {
53    /// Parse one `--filter` value. Accepts `model=X`, `tool=X`, or one
54    /// of the bare keywords `errored` / `annotated`.
55    pub fn parse(s: &str) -> Result<Self> {
56        let s = s.trim();
57        if s.eq_ignore_ascii_case("errored") {
58            return Ok(Filter::Errored);
59        }
60        if s.eq_ignore_ascii_case("annotated") {
61            return Ok(Filter::Annotated);
62        }
63        let (key, value) = s.split_once('=').ok_or_else(|| {
64            anyhow!("--filter expects `key=value`, `errored`, or `annotated`, got `{s}`")
65        })?;
66        match key.trim() {
67            "model" => Ok(Filter::Model(value.trim().to_string())),
68            "tool" => Ok(Filter::Tool(value.trim().to_string())),
69            other => Err(anyhow!(
70                "unknown --filter key `{other}` (expected `model`, `tool`, `errored`, or `annotated`)"
71            )),
72        }
73    }
74
75    fn matches(&self, parsed: &ParsedSession) -> bool {
76        match self {
77            Filter::Model(m) => parsed.totals.unique_models.iter().any(|s| s == m),
78            Filter::Tool(t) => parsed
79                .tool_stats
80                .iter()
81                .any(|s| s.name.eq_ignore_ascii_case(t)),
82            Filter::Errored => parsed.tool_stats.iter().any(|s| s.error_count > 0),
83            Filter::Annotated => parsed.annotation_count > 0,
84        }
85    }
86}
87
88/// Result of parsing a single session file. Either a successful parse
89/// with its derived aggregates, or a format-drift error we want to
90/// surface in the corpus summary.
91///
92/// `Serialize` so downstream bindings (`agx-py`, `agx-wasm`) can
93/// round-trip the full struct through `serde_json::to_value` without
94/// hand-rolling field extraction — keeps the Python / JS surfaces
95/// honest against any new field added here.
96#[derive(Debug, Serialize)]
97pub struct ParsedSession {
98    pub path: PathBuf,
99    pub format: Format,
100    pub totals: SessionTotals,
101    pub tool_stats: Vec<crate::timeline::ToolStats>,
102    pub step_count: usize,
103    /// Unix timestamp in seconds of the session file's mtime, used by the
104    /// corpus TUI to sort by recency. `None` when we couldn't stat the
105    /// file (permission error, file replaced mid-walk, etc).
106    pub mtime_secs: Option<u64>,
107    /// Number of annotations stored for this session at the time of the
108    /// scan (read from `~/.agx/notes/`). Used by `Filter::Annotated`
109    /// and surfaced in `--jsonl` output for downstream tooling.
110    pub annotation_count: usize,
111    /// Number of fork-root steps in this session. Non-zero only for
112    /// Claude Code sessions with edit/resume branches (Phase 5.1).
113    /// Feeds `--trajectory-stats` branch-rate and is surfaced in
114    /// `--jsonl` output.
115    pub fork_root_count: usize,
116}
117
118#[derive(Debug)]
119pub struct ParseError {
120    pub path: PathBuf,
121    pub error: anyhow::Error,
122}
123
124/// Recursive directory walk. Stdlib-only (no `walkdir` dep). Depth-limited
125/// to avoid runaway recursion on symlink loops. Errors on individual
126/// `read_dir` calls are silently skipped so permission-denied
127/// subdirectories don't abort the whole scan.
128pub fn discover_files(root: &Path, max_depth: usize) -> Vec<PathBuf> {
129    let mut out = Vec::new();
130    walk(root, max_depth, &mut out);
131    out
132}
133
134fn walk(root: &Path, max_depth: usize, out: &mut Vec<PathBuf>) {
135    if max_depth == 0 {
136        return;
137    }
138    let Ok(entries) = std::fs::read_dir(root) else {
139        return;
140    };
141    for entry in entries.flatten() {
142        let path = entry.path();
143        if path.is_dir() {
144            walk(&path, max_depth - 1, out);
145        } else if path.is_file() {
146            out.push(path);
147        }
148    }
149}
150
151/// Load every path in parallel. Paths that fail format detection are
152/// dropped silently; paths that detect successfully but fail to parse
153/// are returned as `ParseError`s so they show up in the "errored" count.
154///
155/// Test hook: when `AGX_CORPUS_SERIAL=1` is set we skip rayon entirely.
156/// Useful in tests where thread-pool init noise would confuse `cargo test`.
157/// Three-way outcome for a single-file corpus load. Dedicated enum
158/// (rather than smuggling "skip" through an `anyhow` error with a
159/// magic substring) so misclassification can't be introduced by a
160/// future refactor that reshuffles error messages.
161enum LoadOutcome {
162    Ok(Format, Vec<Step>),
163    /// File wasn't recognized as any session format, or was a binary
164    /// blob we shouldn't try to parse (e.g. images in an OtelProto-
165    /// detection fallback when the feature is off). Dropped silently.
166    Skip,
167    Err(anyhow::Error),
168}
169
170/// Per-path load result — kept as a type alias so clippy's
171/// `type_complexity` lint doesn't fire on the collect site below.
172type RawLoad = (PathBuf, LoadOutcome);
173
174pub fn load_parallel(paths: &[PathBuf]) -> (Vec<ParsedSession>, Vec<ParseError>) {
175    let raw: Vec<RawLoad> = if std::env::var_os("AGX_CORPUS_SERIAL").is_some() {
176        paths.iter().map(|p| (p.clone(), load_one(p))).collect()
177    } else {
178        paths.par_iter().map(|p| (p.clone(), load_one(p))).collect()
179    };
180
181    let mut parsed = Vec::new();
182    let mut errors = Vec::new();
183    for (path, result) in raw {
184        match result {
185            LoadOutcome::Ok(fmt, steps) => {
186                let totals = compute_session_totals(&steps);
187                let tool_stats = compute_tool_stats(&steps);
188                let mtime_secs = std::fs::metadata(&path)
189                    .and_then(|m| m.modified())
190                    .ok()
191                    .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
192                    .map(|d| d.as_secs());
193                // Load annotation count eagerly during the parallel
194                // parse phase — it's one small disk read per session
195                // (the notes JSON under ~/.agx/notes/), cheap enough
196                // to do unconditionally so we can filter / display
197                // without a second pass.
198                let annotation_count = crate::annotations::Annotations::load_for(&path).notes.len();
199                // Compute fork-root count before `steps` is moved so
200                // we don't have to re-walk. Non-Claude-Code formats
201                // always yield 0, so this is essentially free for
202                // everything except Claude Code.
203                let fork_root_count = crate::timeline::fork_root_count(&steps);
204                parsed.push(ParsedSession {
205                    path,
206                    format: fmt,
207                    totals,
208                    tool_stats,
209                    step_count: steps.len(),
210                    mtime_secs,
211                    annotation_count,
212                    fork_root_count,
213                });
214            }
215            LoadOutcome::Skip => {}
216            LoadOutcome::Err(error) => errors.push(ParseError { path, error }),
217        }
218    }
219    (parsed, errors)
220}
221
222fn load_one(path: &Path) -> LoadOutcome {
223    // Detection failure → silent skip. Detection succeeds → attempt
224    // parse and surface any failure as a real error.
225    let fmt = match format::detect(path) {
226        Ok(f) => f,
227        Err(_) => return LoadOutcome::Skip,
228    };
229    match load_session(path) {
230        Ok(steps) => LoadOutcome::Ok(fmt, steps),
231        Err(e) => {
232            // Non-UTF-8 files route to OtelProto at detection time. When
233            // the `otel-proto` feature is off (the default build), those
234            // files are almost always unrelated binaries (images, PDFs,
235            // archives) rather than real OTLP protobuf exports the user
236            // forgot to compile support for. Skip them silently rather
237            // than spamming the "rebuild with --features" message across
238            // every image in the tree. The compile-time `cfg!` is
239            // strictly stronger than matching on the stub error message:
240            // it triggers on any load failure, not just the exact string.
241            if fmt == Format::OtelProto && !cfg!(feature = "otel-proto") {
242                return LoadOutcome::Skip;
243            }
244            LoadOutcome::Err(e)
245        }
246    }
247}
248
249/// Aggregate stats across the surviving parsed sessions.
250#[derive(Debug, Default, Serialize)]
251pub struct CorpusStats {
252    pub file_count: usize,
253    pub parse_success_count: usize,
254    pub parse_error_count: usize,
255    pub filtered_out_count: usize,
256    pub total_steps: usize,
257    pub total_tokens_in: u64,
258    pub total_tokens_out: u64,
259    pub total_cache_read: u64,
260    pub total_cache_create: u64,
261    pub total_cost_usd: Option<f64>,
262    pub per_model: Vec<ModelBucket>,
263    pub per_tool: Vec<ToolBucket>,
264    pub per_format: Vec<FormatBucket>,
265}
266
267#[derive(Debug, Default, Serialize)]
268pub struct ModelBucket {
269    pub model: String,
270    pub session_count: usize,
271    pub tokens_in: u64,
272    pub tokens_out: u64,
273    pub cost_usd: Option<f64>,
274}
275
276#[derive(Debug, Default, Serialize)]
277pub struct ToolBucket {
278    pub tool: String,
279    pub use_count: usize,
280    pub error_count: usize,
281    pub session_count: usize,
282}
283
284#[derive(Debug, Default, Serialize)]
285pub struct FormatBucket {
286    pub format: String,
287    pub session_count: usize,
288}
289
290/// Compute corpus-level stats from the parallel-load outputs.
291pub fn aggregate(
292    parsed: &[ParsedSession],
293    errors: &[ParseError],
294    file_count: usize,
295    filtered_out: usize,
296) -> CorpusStats {
297    let mut stats = CorpusStats {
298        file_count,
299        parse_success_count: parsed.len(),
300        parse_error_count: errors.len(),
301        filtered_out_count: filtered_out,
302        ..CorpusStats::default()
303    };
304
305    let mut model_map: HashMap<String, ModelBucket> = HashMap::new();
306    let mut tool_map: HashMap<String, ToolBucket> = HashMap::new();
307    let mut format_map: HashMap<String, usize> = HashMap::new();
308    let mut any_cost: Option<f64> = None;
309
310    for session in parsed {
311        stats.total_steps += session.step_count;
312        stats.total_tokens_in += session.totals.tokens_in;
313        stats.total_tokens_out += session.totals.tokens_out;
314        stats.total_cache_read += session.totals.cache_read;
315        stats.total_cache_create += session.totals.cache_create;
316        if let Some(c) = session.totals.cost_usd {
317            any_cost = Some(any_cost.unwrap_or(0.0) + c);
318        }
319
320        *format_map.entry(session.format.to_string()).or_insert(0) += 1;
321
322        // Per-model: session_count counts unique sessions that used the
323        // model (not per-step). tokens/cost sum across all sessions that
324        // used the model — over-attributes for multi-model sessions, but
325        // multi-model sessions are rare and this is the simplest correct
326        // behavior.
327        for model in &session.totals.unique_models {
328            let bucket = model_map
329                .entry(model.clone())
330                .or_insert_with(|| ModelBucket {
331                    model: model.clone(),
332                    ..ModelBucket::default()
333                });
334            bucket.session_count += 1;
335            bucket.tokens_in += session.totals.tokens_in;
336            bucket.tokens_out += session.totals.tokens_out;
337            if let Some(c) = session.totals.cost_usd {
338                bucket.cost_usd = Some(bucket.cost_usd.unwrap_or(0.0) + c);
339            }
340        }
341
342        for tool in &session.tool_stats {
343            let bucket = tool_map
344                .entry(tool.name.clone())
345                .or_insert_with(|| ToolBucket {
346                    tool: tool.name.clone(),
347                    ..ToolBucket::default()
348                });
349            bucket.use_count += tool.use_count;
350            bucket.error_count += tool.error_count;
351            bucket.session_count += 1;
352        }
353    }
354
355    stats.total_cost_usd = any_cost;
356
357    let mut models: Vec<ModelBucket> = model_map.into_values().collect();
358    models.sort_by(|a, b| {
359        b.session_count
360            .cmp(&a.session_count)
361            .then_with(|| a.model.cmp(&b.model))
362    });
363    stats.per_model = models;
364
365    let mut tools: Vec<ToolBucket> = tool_map.into_values().collect();
366    tools.sort_by(|a, b| {
367        b.use_count
368            .cmp(&a.use_count)
369            .then_with(|| a.tool.cmp(&b.tool))
370    });
371    stats.per_tool = tools;
372
373    let mut formats: Vec<FormatBucket> = format_map
374        .into_iter()
375        .map(|(format, session_count)| FormatBucket {
376            format,
377            session_count,
378        })
379        .collect();
380    formats.sort_by(|a, b| {
381        b.session_count
382            .cmp(&a.session_count)
383            .then_with(|| a.format.cmp(&b.format))
384    });
385    stats.per_format = formats;
386
387    stats
388}
389
390/// One metric's distribution across the corpus — percentiles plus the
391/// sum. Kept alongside the aggregate `CorpusStats` struct because the
392/// shape is different (cross-session min/p50/p90/p99/max) and the
393/// rendering is only used by `--trajectory-stats`.
394#[derive(Debug, Default, Clone, Serialize)]
395pub struct Distribution {
396    pub min: u64,
397    pub p50: u64,
398    pub p90: u64,
399    pub p99: u64,
400    pub max: u64,
401    pub mean: f64,
402    pub total: u64,
403}
404
405impl Distribution {
406    /// Build a distribution from an unsorted slice of per-session
407    /// values. Zero-length input yields all-zero values so callers
408    /// never have to branch. `mean` is `0.0` in that case rather than
409    /// `NaN`.
410    fn from_values(values: &[u64]) -> Self {
411        if values.is_empty() {
412            return Self::default();
413        }
414        let mut v = values.to_vec();
415        v.sort_unstable();
416        let n = v.len();
417        let pick = |p: f64| -> u64 {
418            // Nearest-rank percentile. Matches numpy's "lower"
419            // interpolation and is the simplest correct choice for
420            // integer-valued distributions. Clamp to valid indices.
421            let idx = ((n as f64) * p).ceil() as usize;
422            let idx = idx.saturating_sub(1).min(n - 1);
423            v[idx]
424        };
425        let total: u64 = v.iter().sum();
426        #[allow(clippy::cast_precision_loss)]
427        let mean = total as f64 / n as f64;
428        Self {
429            min: v[0],
430            p50: pick(0.50),
431            p90: pick(0.90),
432            p99: pick(0.99),
433            max: v[n - 1],
434            mean,
435            total,
436        }
437    }
438}
439
440/// Dataset-level distribution stats for `agx corpus --trajectory-stats`.
441/// Serialized directly when `--json` is combined; rendered as a terse
442/// text report otherwise.
443#[derive(Debug, Default, Clone, Serialize)]
444pub struct TrajectoryStats {
445    pub session_count: usize,
446    pub steps_per_session: Distribution,
447    pub tool_calls_per_session: Distribution,
448    pub tokens_in_per_session: Distribution,
449    pub tokens_out_per_session: Distribution,
450    /// Fraction of sessions (0.0–1.0) that contain at least one fork
451    /// root. Non-zero only when the corpus includes Claude Code
452    /// edit/resume sessions.
453    pub branched_rate: f64,
454    /// Fraction of sessions with ≥1 stored annotation.
455    pub annotated_rate: f64,
456    /// Fraction of sessions where any tool_result matched the
457    /// `is_error_result` heuristic.
458    pub errored_rate: f64,
459}
460
461/// Build a `TrajectoryStats` from the surviving `ParsedSession` slice.
462/// Pure function — the render step takes the output and prints
463/// separately. Extracted so tests can assert on the stats struct
464/// without spinning up a full run.
465pub fn compute_trajectory_stats(parsed: &[ParsedSession]) -> TrajectoryStats {
466    let session_count = parsed.len();
467    if session_count == 0 {
468        return TrajectoryStats::default();
469    }
470    let steps: Vec<u64> = parsed.iter().map(|p| p.step_count as u64).collect();
471    let tool_calls: Vec<u64> = parsed
472        .iter()
473        .map(|p| p.tool_stats.iter().map(|t| t.use_count as u64).sum())
474        .collect();
475    let tokens_in: Vec<u64> = parsed.iter().map(|p| p.totals.tokens_in).collect();
476    let tokens_out: Vec<u64> = parsed.iter().map(|p| p.totals.tokens_out).collect();
477    #[allow(clippy::cast_precision_loss)]
478    let branched =
479        parsed.iter().filter(|p| p.fork_root_count > 0).count() as f64 / session_count as f64;
480    #[allow(clippy::cast_precision_loss)]
481    let annotated =
482        parsed.iter().filter(|p| p.annotation_count > 0).count() as f64 / session_count as f64;
483    #[allow(clippy::cast_precision_loss)]
484    let errored = parsed
485        .iter()
486        .filter(|p| p.tool_stats.iter().any(|t| t.error_count > 0))
487        .count() as f64
488        / session_count as f64;
489    TrajectoryStats {
490        session_count,
491        steps_per_session: Distribution::from_values(&steps),
492        tool_calls_per_session: Distribution::from_values(&tool_calls),
493        tokens_in_per_session: Distribution::from_values(&tokens_in),
494        tokens_out_per_session: Distribution::from_values(&tokens_out),
495        branched_rate: branched,
496        annotated_rate: annotated,
497        errored_rate: errored,
498    }
499}
500
501fn print_trajectory_stats_text(stats: &TrajectoryStats) {
502    println!("Trajectory stats — {} sessions", stats.session_count);
503    if stats.session_count == 0 {
504        println!("  (no sessions after filter / sample)");
505        return;
506    }
507    let row = |label: &str, d: &Distribution| {
508        println!(
509            "  {label:<22}  min={:>8}  p50={:>8}  p90={:>8}  p99={:>8}  max={:>8}  mean={:>10.1}  total={:>12}",
510            d.min, d.p50, d.p90, d.p99, d.max, d.mean, d.total
511        );
512    };
513    row("steps/session", &stats.steps_per_session);
514    row("tool_calls/session", &stats.tool_calls_per_session);
515    row("tokens_in/session", &stats.tokens_in_per_session);
516    row("tokens_out/session", &stats.tokens_out_per_session);
517    println!();
518    println!(
519        "  branched:  {:>5.1}%    annotated: {:>5.1}%    errored: {:>5.1}%",
520        stats.branched_rate * 100.0,
521        stats.annotated_rate * 100.0,
522        stats.errored_rate * 100.0
523    );
524}
525
526/// Arguments for the `agx corpus` subcommand. Wired up in `main.rs`.
527#[derive(Debug)]
528pub struct CorpusArgs {
529    pub dir: PathBuf,
530    pub filters: Vec<Filter>,
531    pub json: bool,
532    pub no_cost: bool,
533    pub max_depth: usize,
534    /// When true, emit walk / load / aggregate timings to stderr after
535    /// the main output. Wired from the hidden `--bench` CLI flag.
536    pub bench: bool,
537    /// When true, launch the interactive corpus TUI (session list +
538    /// selected-session summary, Enter drills into the per-session TUI).
539    /// Mutually exclusive with `--json` (the TUI owns the terminal; JSON
540    /// needs stdout clean).
541    pub tui: bool,
542    /// When true, emit one JSON object per session to stdout instead of
543    /// the default text summary or the aggregate `--json` blob. Parse
544    /// errors go to stderr so stdout stays pipeable into `jq` / `xargs`.
545    /// Intended for CI / eval pipelines.
546    pub jsonl: bool,
547    /// When true, exit with code 2 if any parse failure occurred OR any
548    /// tool_result across the corpus matched the is_error_result
549    /// heuristic. Exit 0 otherwise. Orthogonal to the rendering mode.
550    pub fail_on_errored: bool,
551    /// When true, replace the default aggregate rendering with a
552    /// distributional breakdown across the corpus (percentiles for
553    /// steps / tool-calls / tokens, plus branch / annotation / error
554    /// rates). Combines with `--json` for machine-readable output.
555    /// Phase 6.2 — the numbers a researcher needs before publishing a
556    /// trajectory dataset.
557    pub trajectory_stats: bool,
558    /// When set, keep only the first N sessions after sorting by
559    /// mtime descending (most recent first). Applied *after* filters
560    /// so `--filter model=X --sample 20` gives you the 20 most
561    /// recent sessions of that model. Phase 6.2 spot-check workflow.
562    pub sample: Option<usize>,
563}
564
565/// Callback the CLI layer supplies when it wants to drop into the
566/// interactive corpus TUI on `--tui`. agx-core doesn't carry the TUI
567/// deps (ratatui / crossterm / arboard), so the launcher lives in the
568/// top-level `agx` crate and gets passed through here. When `args.tui`
569/// is `false`, the launcher is never invoked — callers that don't
570/// ship a TUI can pass the [`no_tui`] helper.
571pub type TuiLauncher<'a> = dyn Fn(Vec<ParsedSession>, &CorpusStats, bool) -> Result<()> + 'a;
572
573/// Default launcher for consumers without a TUI layer. Returns an
574/// error if `--tui` is ever set; callers that never set `args.tui`
575/// can pass this unconditionally.
576pub fn no_tui(_parsed: Vec<ParsedSession>, _stats: &CorpusStats, _no_cost: bool) -> Result<()> {
577    anyhow::bail!("this agx-core caller does not provide a corpus TUI implementation");
578}
579
580/// Entry point called from `main.rs::main`. Walks the directory, loads
581/// every session in parallel, applies filters, aggregates, and prints.
582/// When `args.tui` is set, delegates to `tui_launcher` — the top-level
583/// agx bin crate passes its `corpus_tui::run`; library consumers that
584/// don't ship a TUI can pass [`no_tui`].
585pub fn run(args: &CorpusArgs, tui_launcher: &TuiLauncher<'_>) -> Result<()> {
586    use std::time::Instant;
587    let t_walk = Instant::now();
588    let files = discover_files(&args.dir, args.max_depth);
589    let file_count = files.len();
590    let walk_ms = t_walk.elapsed().as_secs_f64() * 1000.0;
591
592    let t_load = Instant::now();
593    let (mut parsed, errors) = load_parallel(&files);
594    let load_ms = t_load.elapsed().as_secs_f64() * 1000.0;
595
596    let t_agg = Instant::now();
597    let before_filter = parsed.len();
598    if !args.filters.is_empty() {
599        parsed.retain(|p| args.filters.iter().all(|f| f.matches(p)));
600    }
601    let filtered_out = before_filter - parsed.len();
602    // `--sample N` keeps the N most-recent-by-mtime sessions. Applied
603    // after filters so `--filter model=X --sample 20` gives the 20
604    // most recent X-model sessions. Deterministic (not random) to
605    // avoid adding a PRNG dep and to keep runs reproducible — users
606    // who want true random can `ls -u | shuf | head` and pass the
607    // file list via another mechanism (future `--sample-random`
608    // follow-up if demand surfaces).
609    if let Some(n) = args.sample
610        && parsed.len() > n
611    {
612        parsed.sort_by_key(|p| std::cmp::Reverse(p.mtime_secs));
613        parsed.truncate(n);
614    }
615    let stats = aggregate(&parsed, &errors, file_count, filtered_out);
616    let agg_ms = t_agg.elapsed().as_secs_f64() * 1000.0;
617
618    // `--fail-on-errored` turns parse errors or tool-level error_results
619    // into a nonzero exit. Evaluated before the rendering branch so we
620    // don't have to clone `parsed` — the TUI path takes it by value.
621    // The rendering side effects still run; the fail is reported at the
622    // end via `anyhow::bail` (exit code 1 via anyhow's normal error
623    // path — simpler than reserving a distinct code 2 and bypassing
624    // anyhow's reporting for this one case).
625    let parse_error_count = errors.len();
626    let tool_error_count: usize = parsed
627        .iter()
628        .flat_map(|p| p.tool_stats.iter())
629        .map(|t| t.error_count)
630        .sum();
631    let fail_on_errored = args.fail_on_errored && (parse_error_count > 0 || tool_error_count > 0);
632
633    if args.trajectory_stats {
634        // `--trajectory-stats` replaces the default aggregate rendering
635        // with a distributional breakdown. Still compatible with `--json`
636        // (emit the TrajectoryStats struct as JSON) and `--jsonl` (emit
637        // stats to stderr so stdout stays per-session JSONL).
638        let tstats = compute_trajectory_stats(&parsed);
639        if args.jsonl {
640            // Keep stdout per-session JSONL; dump trajectory stats to
641            // stderr so both streams stay usable in a pipeline.
642            print_jsonl(&parsed, &errors);
643            eprintln!("{}", serde_json::to_string_pretty(&tstats)?);
644        } else if args.json {
645            println!("{}", serde_json::to_string_pretty(&tstats)?);
646        } else {
647            print_trajectory_stats_text(&tstats);
648        }
649    } else if args.tui {
650        // Drop into the interactive corpus TUI via the launcher the
651        // CLI layer supplied. Keeping the launcher injectable means
652        // agx-core stays pure (no ratatui / crossterm / arboard
653        // deps) while the top-level `agx` bin provides the actual
654        // implementation.
655        tui_launcher(parsed, &stats, args.no_cost)?;
656    } else if args.jsonl {
657        print_jsonl(&parsed, &errors);
658    } else if args.json {
659        println!("{}", serde_json::to_string_pretty(&stats)?);
660    } else {
661        print_text_summary(&stats, &args.dir, args.no_cost, &errors);
662    }
663
664    if fail_on_errored {
665        anyhow::bail!(
666            "--fail-on-errored: {parse_error_count} parse error(s), \
667             {tool_error_count} tool-error result(s) detected",
668        );
669    }
670
671    if args.bench {
672        eprintln!(
673            "[bench] walk: {:.2}ms ({} files)  load: {:.2}ms ({} parsed, {} errored)  aggregate: {:.2}ms  total: {:.2}ms",
674            walk_ms,
675            file_count,
676            load_ms,
677            stats.parse_success_count,
678            stats.parse_error_count,
679            agg_ms,
680            walk_ms + load_ms + agg_ms,
681        );
682    }
683    Ok(())
684}
685
686fn print_text_summary(stats: &CorpusStats, dir: &Path, no_cost: bool, errors: &[ParseError]) {
687    println!("agx corpus {}", dir.display());
688    println!(
689        "  {} files scanned; {} parsed; {} errored; {} filtered out",
690        stats.file_count,
691        stats.parse_success_count,
692        stats.parse_error_count,
693        stats.filtered_out_count,
694    );
695    if stats.parse_success_count == 0 {
696        println!("  (no sessions to aggregate)");
697        return;
698    }
699    println!(
700        "  Total: {} steps, {} input tokens, {} output, {} cache_read, {} cache_create",
701        stats.total_steps,
702        stats.total_tokens_in,
703        stats.total_tokens_out,
704        stats.total_cache_read,
705        stats.total_cache_create,
706    );
707    if !no_cost {
708        match stats.total_cost_usd {
709            Some(c) => println!("  Estimated cost: ${c:.4} USD"),
710            None if stats.total_tokens_in > 0 || stats.total_tokens_out > 0 => {
711                println!("  Estimated cost: (no priced models detected)");
712            }
713            None => {}
714        }
715    }
716
717    if !stats.per_format.is_empty() {
718        println!("\nBy format:");
719        for f in &stats.per_format {
720            println!("  {:<32} {}", f.format, f.session_count);
721        }
722    }
723
724    if !stats.per_model.is_empty() {
725        println!("\nTop models:");
726        for m in stats.per_model.iter().take(10) {
727            let cost = match m.cost_usd {
728                Some(c) if !no_cost => format!(" ${c:.4}"),
729                _ => String::new(),
730            };
731            println!(
732                "  {:<28} {:>4} sess  {:>10} in  {:>10} out{}",
733                m.model, m.session_count, m.tokens_in, m.tokens_out, cost,
734            );
735        }
736    }
737
738    if !stats.per_tool.is_empty() {
739        println!("\nTop tools:");
740        for t in stats.per_tool.iter().take(10) {
741            let err_pct = if t.use_count > 0 {
742                #[allow(clippy::cast_precision_loss)]
743                let r = t.error_count as f64 / t.use_count as f64;
744                format!("({:.1}% err)", r * 100.0)
745            } else {
746                String::new()
747            };
748            println!(
749                "  {:<28} {:>5} uses  {:>4} errors {}",
750                t.tool, t.use_count, t.error_count, err_pct,
751            );
752        }
753    }
754
755    if !errors.is_empty() {
756        println!("\nParse errors (first {}):", errors.len().min(5));
757        for err in errors.iter().take(5) {
758            println!("  {}: {}", err.path.display(), err.error);
759        }
760        if errors.len() > 5 {
761            println!("  ... ({} more)", errors.len() - 5);
762        }
763    }
764}
765
766/// JSON-Lines output: one session per line on stdout, parse errors to
767/// stderr. Schema intentionally flat and stable — downstream eval
768/// pipelines can rely on these field names.
769#[derive(serde::Serialize)]
770struct SessionLine {
771    path: String,
772    format: String,
773    step_count: usize,
774    tokens_in: u64,
775    tokens_out: u64,
776    cache_read: u64,
777    cache_create: u64,
778    cost_usd: Option<f64>,
779    models: Vec<String>,
780    tool_counts: Vec<ToolLine>,
781    error_count: usize,
782    annotation_count: usize,
783    fork_root_count: usize,
784    mtime_secs: Option<u64>,
785}
786
787#[derive(serde::Serialize)]
788struct ToolLine {
789    name: String,
790    use_count: usize,
791    error_count: usize,
792}
793
794fn session_to_line(s: &ParsedSession) -> SessionLine {
795    SessionLine {
796        path: s.path.display().to_string(),
797        format: s.format.to_string(),
798        step_count: s.step_count,
799        tokens_in: s.totals.tokens_in,
800        tokens_out: s.totals.tokens_out,
801        cache_read: s.totals.cache_read,
802        cache_create: s.totals.cache_create,
803        cost_usd: s.totals.cost_usd,
804        models: s.totals.unique_models.clone(),
805        tool_counts: s
806            .tool_stats
807            .iter()
808            .map(|t| ToolLine {
809                name: t.name.clone(),
810                use_count: t.use_count,
811                error_count: t.error_count,
812            })
813            .collect(),
814        error_count: s.tool_stats.iter().map(|t| t.error_count).sum(),
815        annotation_count: s.annotation_count,
816        fork_root_count: s.fork_root_count,
817        mtime_secs: s.mtime_secs,
818    }
819}
820
821fn print_jsonl(parsed: &[ParsedSession], errors: &[ParseError]) {
822    // Sessions on stdout, one line each — compact (not pretty) so
823    // downstream `jq -c` / `xargs` consumers see line-delimited JSON.
824    for session in parsed {
825        let line = session_to_line(session);
826        match serde_json::to_string(&line) {
827            Ok(s) => println!("{s}"),
828            Err(e) => eprintln!("agx: failed to serialize session line: {e}"),
829        }
830    }
831    // Parse errors on stderr so they don't corrupt the stdout stream
832    // that consumers are piping into jq / xargs / a file.
833    for err in errors {
834        eprintln!("agx: parse error: {}: {}", err.path.display(), err.error);
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use crate::timeline::{
842        ToolStats, assistant_text_step, tool_result_step, tool_use_step, user_text_step,
843    };
844
845    fn mk_session(path: &str, fmt: Format, steps: Vec<Step>) -> ParsedSession {
846        let totals = compute_session_totals(&steps);
847        let tool_stats = compute_tool_stats(&steps);
848        ParsedSession {
849            path: PathBuf::from(path),
850            format: fmt,
851            step_count: steps.len(),
852            totals,
853            tool_stats,
854            mtime_secs: None,
855            annotation_count: 0,
856            fork_root_count: 0,
857        }
858    }
859
860    fn priced_session(model: &str) -> Vec<Step> {
861        let mut a = assistant_text_step("hi");
862        a.model = Some(model.into());
863        a.tokens_in = Some(100);
864        a.tokens_out = Some(50);
865        vec![user_text_step("q"), a]
866    }
867
868    #[test]
869    fn filter_parse_accepts_all_forms() {
870        assert_eq!(
871            Filter::parse("model=claude-opus-4-6").unwrap(),
872            Filter::Model("claude-opus-4-6".into())
873        );
874        assert_eq!(
875            Filter::parse("tool=Bash").unwrap(),
876            Filter::Tool("Bash".into())
877        );
878        assert_eq!(Filter::parse("errored").unwrap(), Filter::Errored);
879        assert_eq!(Filter::parse("  errored  ").unwrap(), Filter::Errored);
880        assert_eq!(
881            Filter::parse("  model = gpt-5  ").unwrap(),
882            Filter::Model("gpt-5".into())
883        );
884    }
885
886    #[test]
887    fn filter_parse_rejects_unknown_key() {
888        assert!(Filter::parse("foo=bar").is_err());
889    }
890
891    #[test]
892    fn filter_parse_rejects_bare_word() {
893        assert!(Filter::parse("not-a-thing").is_err());
894    }
895
896    #[test]
897    fn filter_model_matches_session_with_that_model() {
898        let s = mk_session("a", Format::ClaudeCode, priced_session("claude-opus-4-6"));
899        assert!(Filter::Model("claude-opus-4-6".into()).matches(&s));
900        assert!(!Filter::Model("gpt-5".into()).matches(&s));
901    }
902
903    #[test]
904    fn filter_tool_matches_case_insensitive() {
905        let steps = vec![
906            user_text_step("q"),
907            tool_use_step("t1", "Bash", "{}"),
908            tool_result_step("t1", "ok", Some("Bash"), Some("{}")),
909        ];
910        let s = mk_session("a", Format::ClaudeCode, steps);
911        assert!(Filter::Tool("Bash".into()).matches(&s));
912        assert!(Filter::Tool("bash".into()).matches(&s));
913        assert!(!Filter::Tool("Write".into()).matches(&s));
914    }
915
916    #[test]
917    fn filter_errored_matches_session_with_error_result() {
918        let steps = vec![
919            tool_use_step("t1", "Bash", "{}"),
920            tool_result_step("t1", "error: command failed", Some("Bash"), Some("{}")),
921        ];
922        let s = mk_session("a", Format::ClaudeCode, steps);
923        assert!(Filter::Errored.matches(&s));
924    }
925
926    #[test]
927    fn filter_errored_does_not_match_clean_session() {
928        let steps = vec![
929            tool_use_step("t1", "Bash", "{}"),
930            tool_result_step("t1", "success", Some("Bash"), Some("{}")),
931        ];
932        let s = mk_session("a", Format::ClaudeCode, steps);
933        assert!(!Filter::Errored.matches(&s));
934    }
935
936    #[test]
937    fn aggregate_sums_tokens_across_sessions() {
938        let sessions = vec![
939            mk_session("a", Format::ClaudeCode, priced_session("claude-opus-4-6")),
940            mk_session("b", Format::Codex, priced_session("gpt-5")),
941        ];
942        let stats = aggregate(&sessions, &[], 2, 0);
943        assert_eq!(stats.parse_success_count, 2);
944        assert_eq!(stats.total_tokens_in, 200);
945        assert_eq!(stats.total_tokens_out, 100);
946        assert!(stats.total_cost_usd.is_some());
947        // Two formats, two models, no tools.
948        assert_eq!(stats.per_format.len(), 2);
949        assert_eq!(stats.per_model.len(), 2);
950        assert!(stats.per_tool.is_empty());
951    }
952
953    #[test]
954    fn aggregate_per_model_sorts_by_session_count_desc() {
955        let sessions = vec![
956            mk_session("a", Format::ClaudeCode, priced_session("gpt-5")),
957            mk_session("b", Format::ClaudeCode, priced_session("gpt-5")),
958            mk_session("c", Format::ClaudeCode, priced_session("claude-opus-4-6")),
959        ];
960        let stats = aggregate(&sessions, &[], 3, 0);
961        assert_eq!(stats.per_model[0].model, "gpt-5");
962        assert_eq!(stats.per_model[0].session_count, 2);
963        assert_eq!(stats.per_model[1].model, "claude-opus-4-6");
964    }
965
966    #[test]
967    fn aggregate_per_tool_sums_use_and_error_counts() {
968        let s1 = mk_session(
969            "a",
970            Format::ClaudeCode,
971            vec![
972                tool_use_step("t1", "Bash", "{}"),
973                tool_result_step("t1", "ok", Some("Bash"), Some("{}")),
974            ],
975        );
976        let s2 = mk_session(
977            "b",
978            Format::ClaudeCode,
979            vec![
980                tool_use_step("t2", "Bash", "{}"),
981                tool_result_step("t2", "error: failed", Some("Bash"), Some("{}")),
982            ],
983        );
984        let stats = aggregate(&[s1, s2], &[], 2, 0);
985        assert_eq!(stats.per_tool.len(), 1);
986        assert_eq!(stats.per_tool[0].tool, "Bash");
987        assert_eq!(stats.per_tool[0].use_count, 2);
988        assert_eq!(stats.per_tool[0].error_count, 1);
989    }
990
991    #[test]
992    fn aggregate_empty_input_returns_zeros() {
993        let stats = aggregate(&[], &[], 0, 0);
994        assert_eq!(stats.parse_success_count, 0);
995        assert_eq!(stats.total_tokens_in, 0);
996        assert_eq!(stats.total_cost_usd, None);
997        assert!(stats.per_model.is_empty());
998        assert!(stats.per_tool.is_empty());
999    }
1000
1001    #[test]
1002    fn aggregate_counts_filtered_and_errored() {
1003        let sessions = vec![mk_session("a", Format::ClaudeCode, priced_session("gpt-5"))];
1004        let errors = vec![ParseError {
1005            path: PathBuf::from("bad.jsonl"),
1006            error: anyhow!("format drift"),
1007        }];
1008        let stats = aggregate(&sessions, &errors, 5, 3);
1009        assert_eq!(stats.file_count, 5);
1010        assert_eq!(stats.parse_success_count, 1);
1011        assert_eq!(stats.parse_error_count, 1);
1012        assert_eq!(stats.filtered_out_count, 3);
1013    }
1014
1015    #[test]
1016    fn tool_bucket_ordering_is_stable_on_ties() {
1017        // Equal use_count → alphabetic tie-break.
1018        let sessions = vec![
1019            mk_session(
1020                "a",
1021                Format::ClaudeCode,
1022                vec![tool_use_step("t1", "Zebra", "{}")],
1023            ),
1024            mk_session(
1025                "b",
1026                Format::ClaudeCode,
1027                vec![tool_use_step("t2", "Apple", "{}")],
1028            ),
1029        ];
1030        let stats = aggregate(&sessions, &[], 2, 0);
1031        assert_eq!(stats.per_tool[0].tool, "Apple");
1032        assert_eq!(stats.per_tool[1].tool, "Zebra");
1033    }
1034
1035    #[test]
1036    fn unused_tool_stats_type_reference() {
1037        // Sanity: the ToolStats type is in scope so future tests can
1038        // construct one directly if needed. This test just compiles.
1039        let _ = ToolStats {
1040            name: "x".into(),
1041            use_count: 0,
1042            result_count: 0,
1043            error_count: 0,
1044        };
1045    }
1046
1047    // -------- Phase 6.2 trajectory stats --------
1048
1049    #[test]
1050    fn distribution_empty_slice_is_all_zero() {
1051        let d = Distribution::from_values(&[]);
1052        assert_eq!(d.min, 0);
1053        assert_eq!(d.max, 0);
1054        assert_eq!(d.mean, 0.0);
1055        assert_eq!(d.total, 0);
1056    }
1057
1058    #[test]
1059    fn distribution_single_value() {
1060        let d = Distribution::from_values(&[42]);
1061        assert_eq!(d.min, 42);
1062        assert_eq!(d.p50, 42);
1063        assert_eq!(d.p90, 42);
1064        assert_eq!(d.p99, 42);
1065        assert_eq!(d.max, 42);
1066        assert!((d.mean - 42.0).abs() < 1e-6);
1067        assert_eq!(d.total, 42);
1068    }
1069
1070    #[test]
1071    fn distribution_percentiles_on_ordered_integers() {
1072        // 1..=100 — p50 should be 50, p90 ≈ 90, p99 ≈ 99, max 100.
1073        let values: Vec<u64> = (1..=100).collect();
1074        let d = Distribution::from_values(&values);
1075        assert_eq!(d.min, 1);
1076        assert_eq!(d.max, 100);
1077        assert_eq!(d.p50, 50);
1078        assert_eq!(d.p90, 90);
1079        assert_eq!(d.p99, 99);
1080        assert_eq!(d.total, 5050);
1081        assert!((d.mean - 50.5).abs() < 1e-6);
1082    }
1083
1084    #[test]
1085    fn distribution_handles_unsorted_input() {
1086        // Input order doesn't matter — the constructor sorts internally.
1087        let a = Distribution::from_values(&[5, 1, 3, 2, 4]);
1088        let b = Distribution::from_values(&[1, 2, 3, 4, 5]);
1089        assert_eq!(a.min, b.min);
1090        assert_eq!(a.max, b.max);
1091        assert_eq!(a.p50, b.p50);
1092        assert_eq!(a.total, b.total);
1093    }
1094
1095    #[test]
1096    fn trajectory_stats_empty_corpus() {
1097        let stats = compute_trajectory_stats(&[]);
1098        assert_eq!(stats.session_count, 0);
1099        assert_eq!(stats.branched_rate, 0.0);
1100        assert_eq!(stats.annotated_rate, 0.0);
1101        assert_eq!(stats.errored_rate, 0.0);
1102    }
1103
1104    #[test]
1105    fn trajectory_stats_branched_rate_counts_fork_roots() {
1106        let a = {
1107            let mut s = mk_session("a.jsonl", Format::ClaudeCode, Vec::new());
1108            s.fork_root_count = 2;
1109            s
1110        };
1111        let b = mk_session("b.jsonl", Format::ClaudeCode, Vec::new());
1112        let c = {
1113            let mut s = mk_session("c.jsonl", Format::Gemini, Vec::new());
1114            s.fork_root_count = 1;
1115            s
1116        };
1117        let stats = compute_trajectory_stats(&[a, b, c]);
1118        assert_eq!(stats.session_count, 3);
1119        // 2 of 3 sessions have forks.
1120        assert!((stats.branched_rate - 2.0 / 3.0).abs() < 1e-6);
1121    }
1122
1123    #[test]
1124    fn trajectory_stats_annotated_rate_counts_annotation_count() {
1125        let mut a = mk_session("a", Format::ClaudeCode, Vec::new());
1126        a.annotation_count = 3;
1127        let b = mk_session("b", Format::ClaudeCode, Vec::new());
1128        let stats = compute_trajectory_stats(&[a, b]);
1129        assert!((stats.annotated_rate - 0.5).abs() < 1e-6);
1130    }
1131
1132    #[test]
1133    fn trajectory_stats_errored_rate_counts_sessions_not_errors() {
1134        // A session with 5 errors still counts as 1 errored session;
1135        // the rate is session-level, not error-level.
1136        let a = mk_session(
1137            "a",
1138            Format::ClaudeCode,
1139            vec![crate::timeline::tool_result_step(
1140                "t1",
1141                "error: bad",
1142                Some("X"),
1143                None,
1144            )],
1145        );
1146        // A clean session.
1147        let b = mk_session("b", Format::ClaudeCode, Vec::new());
1148        let stats = compute_trajectory_stats(&[a, b]);
1149        assert!((stats.errored_rate - 0.5).abs() < 1e-6);
1150    }
1151
1152    #[test]
1153    fn trajectory_stats_steps_distribution_reflects_step_counts() {
1154        let a = mk_session(
1155            "a",
1156            Format::ClaudeCode,
1157            vec![
1158                crate::timeline::user_text_step("one"),
1159                crate::timeline::assistant_text_step("two"),
1160            ],
1161        );
1162        let b = mk_session(
1163            "b",
1164            Format::ClaudeCode,
1165            vec![crate::timeline::user_text_step("solo")],
1166        );
1167        let stats = compute_trajectory_stats(&[a, b]);
1168        assert_eq!(stats.steps_per_session.min, 1);
1169        assert_eq!(stats.steps_per_session.max, 2);
1170        assert_eq!(stats.steps_per_session.total, 3);
1171    }
1172}