mod diff;
mod model;
mod render;
mod spill;
mod symbols;
use std::path::{Path, PathBuf};
use clap::{Parser, Subcommand};
use diff::CompareAccumulator;
use eyre::{Context, Result};
use model::CompareReport;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
use render::{print_json, print_text};
use spill::{load_side, partition_count, read_partition};
use symbols::resolve_token_symbols;
const MAX_PARALLEL_PARTITION_JOINS: usize = 8;
#[derive(Subcommand, Debug)]
pub enum CompareSection {
Regressions,
Improvements,
Logs,
Balances,
}
#[derive(Parser, Debug)]
pub struct CompareArgs {
pub baseline: PathBuf,
pub experiment: PathBuf,
#[command(subcommand)]
pub section: Option<CompareSection>,
#[arg(long)]
pub json: bool,
}
pub async fn compare(args: CompareArgs) -> Result<()> {
let partitions = partition_count(&args.baseline, &args.experiment)?;
let report = {
let (baseline, experiment) = (args.baseline.clone(), args.experiment.clone());
tokio::task::spawn_blocking(move || compute(&baseline, &experiment, partitions))
.await
.expect("compute task panicked")?
};
let symbols = resolve_token_symbols(&report.pnl_diffs).await;
if args.json {
print_json(&args, report, &symbols)?;
} else {
print_text(&args, &report, &symbols);
}
Ok(())
}
fn compute(baseline: &Path, experiment: &Path, partitions: usize) -> Result<CompareReport> {
let spill_dir = tempfile::tempdir().context("failed to create spill directory")?;
let (base_side, exp_side) = std::thread::scope(|s| {
let base = s.spawn(|| load_side(baseline, partitions, spill_dir.path(), "baseline"));
let exp = load_side(experiment, partitions, spill_dir.path(), "experiment");
let base = base.join().expect("baseline load thread panicked");
Ok::<_, eyre::Report>((base?, exp?))
})?;
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_PARALLEL_PARTITION_JOINS.min(base_side.partitions.len()))
.build()
.context("failed to build partition join pool")?;
let partition_accs: Vec<Result<CompareAccumulator>> = pool.install(|| {
base_side
.partitions
.par_iter()
.zip(exp_side.partitions.par_iter())
.map(|(base_part, exp_part)| {
let mut acc = CompareAccumulator::default();
acc.join_partition(read_partition(base_part)?, read_partition(exp_part)?);
Ok(acc)
})
.collect()
});
let mut acc = CompareAccumulator::default();
for partition_acc in partition_accs {
acc.merge(partition_acc?);
}
let CompareAccumulator {
mut regressions,
mut improvements,
mut log_diffs,
mut missing,
mut pnl_diffs,
common,
} = acc;
regressions.sort_by(|a, b| (a.slot, &a.signature).cmp(&(b.slot, &b.signature)));
improvements.sort_by(|a, b| (a.slot, &a.signature).cmp(&(b.slot, &b.signature)));
log_diffs.sort_by(|a, b| (a.slot, &a.signature).cmp(&(b.slot, &b.signature)));
missing.sort();
pnl_diffs.sort_by(|a, b| {
b.score
.total_cmp(&a.score)
.then_with(|| (a.slot, &a.signature).cmp(&(b.slot, &b.signature)))
});
Ok(CompareReport {
baseline_transactions: base_side.total_transactions,
experiment_transactions: exp_side.total_transactions,
common,
regressions,
improvements,
log_diffs,
missing,
pnl_diffs,
})
}
#[cfg(test)]
mod tests {
use super::{render::format_pct_change, spill::TARGET_PARTITION_BYTES, *};
use crate::output::{
OutputEvent, SimulationMetadata, SimulationSummary, SolChange, TokenChange, Transaction,
};
fn tx(slot: u64, signature: &str, success: bool, logs: usize, sol_delta: i64) -> Transaction {
Transaction {
slot,
timestamp: None,
signature: signature.to_string(),
success,
error: (!success).then(|| format!("err-{signature}")),
logs: (0..logs).map(|i| format!("log {i}")).collect(),
sol_changes: vec![SolChange {
pubkey: format!("payer-{signature}"),
pre_lamports: 1_000_000,
post_lamports: (1_000_000_i64 + sol_delta) as u64,
}],
token_changes: Vec::new(),
account_diffs: Vec::new(),
}
}
fn write_ndjson(dir: &Path, name: &str, txs: &[Transaction]) -> PathBuf {
let path = dir.join(name);
let mut lines = vec![
serde_json::to_string(&OutputEvent::Metadata(SimulationMetadata {
start_slot: 1,
end_slot: 100,
program_ids: Vec::new(),
program_so: Vec::new(),
ran_at_unix_secs: 0,
session_ids: Vec::new(),
}))
.unwrap(),
];
for tx in txs {
lines.push(serde_json::to_string(&OutputEvent::Tx(tx.clone())).unwrap());
}
lines.push(
serde_json::to_string(&OutputEvent::Summary(SimulationSummary {
total_transactions: txs.len(),
successes: txs.iter().filter(|t| t.success).count(),
failures: txs.iter().filter(|t| !t.success).count(),
session_ids: Vec::new(),
}))
.unwrap(),
);
std::fs::write(&path, lines.join("\n")).unwrap();
path
}
fn fixture() -> (Vec<Transaction>, Vec<Transaction>) {
let baseline = vec![
tx(1, "sig-regressed", true, 5, 10),
tx(2, "sig-improved", false, 5, 10),
tx(3, "sig-logs", true, 5, 10),
tx(4, "sig-missing", true, 5, 10),
tx(5, "sig-pnl-small", true, 5, 10),
tx(6, "sig-pnl-big", true, 5, 10),
tx(7, "sig-same", true, 5, 10),
];
let experiment = vec![
tx(1, "sig-regressed", false, 5, 10),
tx(2, "sig-improved", true, 5, 10),
tx(3, "sig-logs", true, 9, 10),
tx(5, "sig-pnl-small", true, 5, 11),
tx(6, "sig-pnl-big", true, 5, 5_000),
tx(7, "sig-same", true, 5, 10),
tx(8, "sig-extra-in-experiment", true, 5, 10),
];
(baseline, experiment)
}
fn report_value(report: &CompareReport) -> serde_json::Value {
serde_json::to_value(report).unwrap()
}
#[test]
fn compare_sections_match_expectations() {
let dir = tempfile::tempdir().unwrap();
let (baseline, experiment) = fixture();
let base_path = write_ndjson(dir.path(), "base.ndjson", &baseline);
let exp_path = write_ndjson(dir.path(), "exp.ndjson", &experiment);
let report = compute(&base_path, &exp_path, 1).unwrap();
assert_eq!(report.baseline_transactions, 7);
assert_eq!(report.experiment_transactions, 7);
assert_eq!(report.common, 6);
assert_eq!(
report
.regressions
.iter()
.map(|r| r.signature.as_str())
.collect::<Vec<_>>(),
["sig-regressed"]
);
assert_eq!(
report.regressions[0].error.as_deref(),
Some("err-sig-regressed")
);
assert_eq!(
report
.improvements
.iter()
.map(|i| i.signature.as_str())
.collect::<Vec<_>>(),
["sig-improved"]
);
assert_eq!(
report
.log_diffs
.iter()
.map(|l| (
l.signature.as_str(),
l.baseline_log_count,
l.experiment_log_count
))
.collect::<Vec<_>>(),
[("sig-logs", 5, 9)]
);
assert_eq!(report.missing, ["sig-missing"]);
assert_eq!(
report
.pnl_diffs
.iter()
.map(|p| p.signature.as_str())
.collect::<Vec<_>>(),
["sig-pnl-big", "sig-pnl-small"]
);
}
#[test]
fn report_is_invariant_under_partition_count() {
let dir = tempfile::tempdir().unwrap();
let (mut baseline, mut experiment) = fixture();
for i in 0..200 {
baseline.push(tx(10 + i, &format!("sig-bulk-{i}"), true, 3, 10));
let delta = if i % 3 == 0 { 999 } else { 10 };
experiment.push(tx(10 + i, &format!("sig-bulk-{i}"), true, 3, delta));
}
let base_path = write_ndjson(dir.path(), "base.ndjson", &baseline);
let exp_path = write_ndjson(dir.path(), "exp.ndjson", &experiment);
let in_memory = compute(&base_path, &exp_path, 1).unwrap();
let partitioned = compute(&base_path, &exp_path, 7).unwrap();
assert_eq!(report_value(&in_memory), report_value(&partitioned));
}
#[test]
fn legacy_blob_matches_ndjson() {
let dir = tempfile::tempdir().unwrap();
let (baseline, experiment) = fixture();
let base_ndjson = write_ndjson(dir.path(), "base.ndjson", &baseline);
let exp_path = write_ndjson(dir.path(), "exp.ndjson", &experiment);
let blob = crate::output::SimulationOutput {
metadata: SimulationMetadata {
start_slot: 1,
end_slot: 100,
program_ids: Vec::new(),
program_so: Vec::new(),
ran_at_unix_secs: 0,
session_ids: Vec::new(),
},
transactions: baseline,
summary: SimulationSummary {
total_transactions: 7,
successes: 6,
failures: 1,
session_ids: Vec::new(),
},
};
let base_blob = dir.path().join("base.json");
std::fs::write(&base_blob, serde_json::to_string(&blob).unwrap()).unwrap();
let from_ndjson = compute(&base_ndjson, &exp_path, 1).unwrap();
let from_blob = compute(&base_blob, &exp_path, 1).unwrap();
let from_blob_partitioned = compute(&base_blob, &exp_path, 5).unwrap();
assert_eq!(report_value(&from_ndjson), report_value(&from_blob));
assert_eq!(
report_value(&from_ndjson),
report_value(&from_blob_partitioned)
);
}
#[test]
fn missing_metadata_envelope_is_rejected() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("no-meta.ndjson");
let line = serde_json::to_string(&OutputEvent::Tx(tx(1, "sig", true, 1, 10))).unwrap();
std::fs::write(&path, line).unwrap();
let other = write_ndjson(dir.path(), "ok.ndjson", &fixture().0);
let err = compute(&path, &other, 1).map(|_| ()).unwrap_err();
assert!(
err.to_string()
.contains("missing the leading `metadata` envelope")
);
}
#[test]
fn empty_file_is_rejected() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("empty.ndjson");
std::fs::write(&path, "").unwrap();
let other = write_ndjson(dir.path(), "ok.ndjson", &fixture().0);
let err = compute(&path, &other, 1).map(|_| ()).unwrap_err();
assert!(
err.to_string()
.contains("missing the leading `metadata` envelope")
);
}
#[test]
fn metadata_only_experiment_reports_all_baseline_as_missing() {
let dir = tempfile::tempdir().unwrap();
let (baseline, _) = fixture();
let base_path = write_ndjson(dir.path(), "base.ndjson", &baseline);
let exp_path = write_ndjson(dir.path(), "exp.ndjson", &[]);
let report = compute(&base_path, &exp_path, 1).unwrap();
assert_eq!(report.common, 0);
assert_eq!(report.missing.len(), baseline.len());
assert!(report.regressions.is_empty());
assert!(report.pnl_diffs.is_empty());
}
#[test]
fn duplicate_signatures_keep_the_last_record() {
let dir = tempfile::tempdir().unwrap();
let baseline = vec![
tx(2, "sig-dup", true, 5, 10),
tx(1, "sig-dup", false, 5, 10),
];
let experiment = vec![tx(2, "sig-dup", false, 5, 10)];
let base_path = write_ndjson(dir.path(), "base.ndjson", &baseline);
let exp_path = write_ndjson(dir.path(), "exp.ndjson", &experiment);
let report = compute(&base_path, &exp_path, 1).unwrap();
assert_eq!(report.common, 1);
assert_eq!(
report
.regressions
.iter()
.map(|r| (r.slot, r.signature.as_str()))
.collect::<Vec<_>>(),
[(2, "sig-dup")]
);
}
#[test]
fn token_changes_produce_ranked_spl_diffs() {
let dir = tempfile::tempdir().unwrap();
let with_tokens = |amount: u64| {
let mut t = tx(1, "sig-tokens", true, 5, 10);
t.token_changes = vec![TokenChange {
pubkey: "token-account".to_string(),
mint: "mint-a".to_string(),
owner: "owner".to_string(),
pre_amount: 100,
post_amount: amount,
decimals: 6,
}];
t
};
let base_path = write_ndjson(dir.path(), "base.ndjson", &[with_tokens(150)]);
let exp_path = write_ndjson(dir.path(), "exp.ndjson", &[with_tokens(300)]);
let report = compute(&base_path, &exp_path, 1).unwrap();
assert_eq!(report.pnl_diffs.len(), 1);
let diff = &report.pnl_diffs[0];
assert!(diff.sol.is_empty());
assert_eq!(diff.tokens.len(), 1);
let spl = &diff.tokens[0];
assert_eq!(spl.mint, "mint-a");
assert_eq!(spl.decimals, 6);
assert_eq!(spl.baseline_delta, 50);
assert_eq!(spl.experiment_delta, 200);
}
#[test]
fn pct_change_handles_zero_baseline() {
assert_eq!(format_pct_change(0, 500), "new");
assert_eq!(format_pct_change(200, 300), "+50.0000%");
assert_eq!(format_pct_change(-200, -300), "-50.0000%");
}
#[test]
fn spill_threshold_starts_just_past_one_partition() {
let dir = tempfile::tempdir().unwrap();
let small = dir.path().join("small");
let big = dir.path().join("big");
std::fs::write(&small, vec![0_u8; 1024]).unwrap();
std::fs::write(&big, vec![0_u8; (TARGET_PARTITION_BYTES + 1) as usize]).unwrap();
assert_eq!(partition_count(&small, &small).unwrap(), 1);
assert_eq!(partition_count(&small, &big).unwrap(), 2);
}
}