sim-cli 0.7.0

CLI tool for running and comparing Solana simulator backtests
Documentation
//! External-sort spill machinery: both sides are hash-partitioned to disk so
//! the join holds at most one partition pair in memory.

use std::{
    collections::{HashMap, hash_map::Entry},
    fmt,
    fs::File,
    hash::{DefaultHasher, Hash, Hasher},
    io::{BufRead, BufReader, BufWriter, Write},
    path::{Path, PathBuf},
};

use eyre::{Context, Result};
use serde::{
    Deserialize,
    de::{DeserializeSeed, Deserializer, IgnoredAny, MapAccess, SeqAccess, Visitor},
};

use super::model::CompareTransaction;
use crate::output::{
    NdjsonScan, SimulationMetadata, SimulationSummary, SolChange, TokenChange,
    missing_metadata_envelope, scan_ndjson_events,
};

/// Raw input bytes each spill partition targets; bounds join memory.
pub(super) const TARGET_PARTITION_BYTES: u64 = 256 * 1024 * 1024;
const MAX_PARTITIONS: usize = 128;
const SPILL_WRITE_BUFFER_BYTES: usize = 64 * 1024;

pub(super) struct LoadedSide {
    pub(super) partitions: Vec<PathBuf>,
    pub(super) total_transactions: usize,
}

struct RecordSink {
    paths: Vec<PathBuf>,
    writers: Vec<BufWriter<File>>,
}

impl RecordSink {
    fn new(partitions: usize, spill: &Path, label: &str) -> Result<Self> {
        let mut paths = Vec::with_capacity(partitions);
        let mut writers = Vec::with_capacity(partitions);
        for i in 0..partitions {
            let path = spill.join(format!("{label}-{i}.ndjson"));
            let file = File::create(&path)
                .with_context(|| format!("failed to create spill file {}", path.display()))?;
            paths.push(path);
            writers.push(BufWriter::with_capacity(SPILL_WRITE_BUFFER_BYTES, file));
        }
        Ok(Self { paths, writers })
    }

    fn push(&mut self, tx: &CompareTransaction) -> Result<()> {
        let mut hasher = DefaultHasher::new();
        tx.signature.hash(&mut hasher);
        let index = (hasher.finish() % self.writers.len() as u64) as usize;
        let writer = &mut self.writers[index];
        serde_json::to_writer(&mut *writer, tx)?;
        writer.write_all(b"\n")?;
        Ok(())
    }

    fn finish(self) -> Result<Vec<PathBuf>> {
        for mut writer in self.writers {
            writer.flush()?;
        }
        Ok(self.paths)
    }
}

pub(super) fn partition_count(baseline: &Path, experiment: &Path) -> Result<usize> {
    let len = |p: &Path| -> Result<u64> {
        Ok(std::fs::metadata(p)
            .with_context(|| format!("failed to stat {}", p.display()))?
            .len())
    };
    let max_len = len(baseline)?.max(len(experiment)?);
    Ok(usize::try_from(max_len.div_ceil(TARGET_PARTITION_BYTES))
        .unwrap_or(MAX_PARTITIONS)
        .clamp(1, MAX_PARTITIONS))
}

/// Slim mirror of [`crate::output::OutputEvent`] for the compare scan: `tx`
/// counts its logs instead of materializing them, and payloads compare never
/// reads deserialize as [`IgnoredAny`] (well-formedness still checked).
#[derive(Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "snake_case")]
enum SlimEvent {
    // The payload stays typed (not `IgnoredAny`) so a malformed `metadata`
    // line fails the scan exactly like the full-fidelity parse would.
    Metadata(#[allow(dead_code)] SimulationMetadata),
    SessionStarted(IgnoredAny),
    Tx(SlimTransaction),
    Diff(IgnoredAny),
    Summary(SimulationSummary),
}

/// Per-line projection of [`crate::output::Transaction`]. Field requirements
/// mirror `Transaction` so malformed tx lines fail the scan the same way.
#[derive(Deserialize)]
struct SlimTransaction {
    slot: u64,
    signature: String,
    success: bool,
    error: Option<String>,
    logs: LogCount,
    sol_changes: Vec<SolChange>,
    token_changes: Vec<TokenChange>,
}

impl From<SlimTransaction> for CompareTransaction {
    fn from(tx: SlimTransaction) -> Self {
        Self {
            slot: tx.slot,
            signature: tx.signature,
            success: tx.success,
            error: tx.error,
            log_count: tx.logs.0,
            sol_changes: tx.sol_changes,
            token_changes: tx.token_changes,
        }
    }
}

/// Counts a JSON array's elements without allocating them.
struct LogCount(usize);

impl<'de> Deserialize<'de> for LogCount {
    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
        struct CountVisitor;
        impl<'de> Visitor<'de> for CountVisitor {
            type Value = LogCount;

            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
                f.write_str("an array of log lines")
            }

            fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<LogCount, A::Error> {
                let mut count = 0;
                while seq.next_element::<IgnoredAny>()?.is_some() {
                    count += 1;
                }
                Ok(LogCount(count))
            }
        }
        deserializer.deserialize_seq(CountVisitor)
    }
}

pub(super) fn load_side(
    path: &Path,
    partitions: usize,
    spill: &Path,
    label: &str,
) -> Result<LoadedSide> {
    let mut sink = RecordSink::new(partitions, spill, label)?;
    let file = File::open(path).with_context(|| format!("failed to read {}", path.display()))?;

    let mut tx_count = 0_usize;
    let mut seen_metadata = false;
    let mut summary_total: Option<usize> = None;

    let scan = scan_ndjson_events(BufReader::new(file), path, &mut |event| {
        match event {
            SlimEvent::Metadata(_) => seen_metadata = true,
            SlimEvent::Tx(tx) => {
                tx_count += 1;
                sink.push(&CompareTransaction::from(tx))?;
            }
            SlimEvent::Summary(summary) => summary_total = Some(summary.total_transactions),
            SlimEvent::SessionStarted(_) | SlimEvent::Diff(_) => {}
        }
        Ok(())
    })?;
    if let NdjsonScan::NotAnEnvelopeStream { .. } = scan {
        return load_legacy_side(path, sink);
    }

    if !seen_metadata {
        return Err(missing_metadata_envelope(path));
    }

    Ok(LoadedSide {
        partitions: sink.finish()?,
        total_transactions: summary_total.unwrap_or(tx_count),
    })
}

/// Stream the legacy single-blob `SimulationOutput` JSON without
/// materializing it: transactions spill one at a time and only the summary
/// total is retained. Field requirements mirror `SimulationOutput` so a
/// malformed blob fails with the same context message as before.
fn load_legacy_side(path: &Path, mut sink: RecordSink) -> Result<LoadedSide> {
    let file = File::open(path).with_context(|| format!("failed to read {}", path.display()))?;
    let mut deserializer = serde_json::Deserializer::from_reader(BufReader::new(file));
    let total_transactions = deserializer
        .deserialize_map(LegacyBlobVisitor { sink: &mut sink })
        .with_context(|| {
            format!(
                "{} is neither an NDJSON event stream nor a single-blob `SimulationOutput` JSON file",
                path.display()
            )
        })?;
    Ok(LoadedSide {
        partitions: sink.finish()?,
        total_transactions,
    })
}

struct LegacyBlobVisitor<'a> {
    sink: &'a mut RecordSink,
}

impl<'de> Visitor<'de> for LegacyBlobVisitor<'_> {
    /// `summary.total_transactions`.
    type Value = usize;

    fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.write_str("a `SimulationOutput` object")
    }

    fn visit_map<A: MapAccess<'de>>(self, mut map: A) -> Result<usize, A::Error> {
        let mut seen_metadata = false;
        let mut seen_transactions = false;
        let mut total: Option<usize> = None;
        while let Some(key) = map.next_key::<String>()? {
            match key.as_str() {
                "metadata" => {
                    if seen_metadata {
                        return Err(serde::de::Error::duplicate_field("metadata"));
                    }
                    seen_metadata = true;
                    map.next_value::<SimulationMetadata>()?;
                }
                "transactions" => {
                    if seen_transactions {
                        return Err(serde::de::Error::duplicate_field("transactions"));
                    }
                    seen_transactions = true;
                    map.next_value_seed(SpillTransactions {
                        sink: &mut *self.sink,
                    })?;
                }
                "summary" => {
                    if total.is_some() {
                        return Err(serde::de::Error::duplicate_field("summary"));
                    }
                    total = Some(map.next_value::<SimulationSummary>()?.total_transactions);
                }
                _ => {
                    map.next_value::<IgnoredAny>()?;
                }
            }
        }
        if !seen_metadata {
            return Err(serde::de::Error::missing_field("metadata"));
        }
        if !seen_transactions {
            return Err(serde::de::Error::missing_field("transactions"));
        }
        total.ok_or_else(|| serde::de::Error::missing_field("summary"))
    }
}

/// Spills each element of the `transactions` array as it deserializes.
struct SpillTransactions<'a> {
    sink: &'a mut RecordSink,
}

impl<'de> DeserializeSeed<'de> for SpillTransactions<'_> {
    type Value = ();

    fn deserialize<D: Deserializer<'de>>(self, deserializer: D) -> Result<(), D::Error> {
        deserializer.deserialize_seq(self)
    }
}

impl<'de> Visitor<'de> for SpillTransactions<'_> {
    type Value = ();

    fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.write_str("an array of transactions")
    }

    fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<(), A::Error> {
        while let Some(tx) = seq.next_element::<SlimTransaction>()? {
            self.sink
                .push(&CompareTransaction::from(tx))
                .map_err(serde::de::Error::custom)?;
        }
        Ok(())
    }
}

/// Read one partition into a signature-keyed map, deduping as records arrive:
/// the highest `(slot, signature)` record wins, with later lines winning ties.
pub(super) fn read_partition(path: &Path) -> Result<HashMap<String, CompareTransaction>> {
    let file = File::open(path).with_context(|| format!("failed to read {}", path.display()))?;
    let mut reader = BufReader::new(file);
    let mut line = String::new();
    let mut records: HashMap<String, CompareTransaction> = HashMap::new();
    loop {
        line.clear();
        if reader.read_line(&mut line)? == 0 {
            return Ok(records);
        }
        let tx: CompareTransaction = serde_json::from_str(&line)?;
        match records.entry(tx.signature.clone()) {
            Entry::Vacant(vacant) => {
                vacant.insert(tx);
            }
            Entry::Occupied(mut occupied) => {
                if tx.slot >= occupied.get().slot {
                    occupied.insert(tx);
                }
            }
        }
    }
}