sim-cli 0.7.0

CLI tool for running and comparing Solana simulator backtests
Documentation
use std::{collections::HashMap, io::BufRead};

use eyre::Context;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct SimulationOutput {
    pub metadata: SimulationMetadata,
    pub transactions: Vec<Transaction>,
    pub summary: SimulationSummary,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimulationMetadata {
    pub start_slot: u64,
    pub end_slot: u64,
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub program_ids: Vec<String>,
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub program_so: Vec<String>,
    pub ran_at_unix_secs: u64,
    /// Empty when carried by the leading NDJSON `metadata` envelope (IDs
    /// aren't known until sessions have been created). The loader populates
    /// this from `session_started` events (emitted as each session begins) or
    /// the trailing `summary` envelope, whichever comes first.
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub session_ids: Vec<String>,
}

/// One line in the NDJSON output stream. Envelope-tagged so a single file can
/// carry mixed event types. Order along the stream is: one `metadata`, zero
/// or more `tx`/`diff` events as they arrive (no inter-session sort
/// guarantee), then a trailing `summary` with totals + the realized session
/// IDs.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "snake_case")]
pub enum OutputEvent {
    Metadata(SimulationMetadata),
    SessionStarted(SessionStartedRecord),
    Tx(Transaction),
    Diff(AccountDiffRow),
    Summary(SimulationSummary),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Transaction {
    pub slot: u64,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub timestamp: Option<u64>,
    pub signature: String,
    pub success: bool,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub error: Option<String>,
    pub logs: Vec<String>,
    pub sol_changes: Vec<SolChange>,
    pub token_changes: Vec<TokenChange>,
    #[serde(skip_serializing_if = "Vec::is_empty", default)]
    pub account_diffs: Vec<AccountDiff>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccountDiff {
    pub account: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub pre_state: Option<serde_json::Value>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub post_state: Option<serde_json::Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SolChange {
    pub pubkey: String,
    pub pre_lamports: u64,
    pub post_lamports: u64,
}

impl SolChange {
    pub fn delta(&self) -> i64 {
        self.post_lamports as i64 - self.pre_lamports as i64
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenChange {
    pub pubkey: String,
    pub mint: String,
    pub owner: String,
    pub pre_amount: u64,
    pub post_amount: u64,
    pub decimals: u8,
}

impl TokenChange {
    pub fn delta(&self) -> i64 {
        self.post_amount as i64 - self.pre_amount as i64
    }
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct SimulationSummary {
    pub total_transactions: usize,
    pub successes: usize,
    pub failures: usize,
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub session_ids: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenInfo {
    pub mint: String,
    pub owner: String,
    pub amount: u64,
    pub decimals: u8,
}

/// A single account diff row for NDJSON streaming, mapping 1:1 to a DB row.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccountDiffRow {
    pub slot: u64,
    pub tx_hash: String,
    pub account: String,
    /// Position of the transaction within its slot, if known.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tx_index: Option<u32>,
    /// Unix-seconds block time of the slot, if known.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub block_time: Option<i64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub pre_lamports: Option<u64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub post_lamports: Option<u64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub pre_state: Option<serde_json::Value>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub post_state: Option<serde_json::Value>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub pre_tokens: Option<TokenInfo>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub post_tokens: Option<TokenInfo>,
}

/// Emitted into the NDJSON stream when a backtest session starts so downstream
/// consumers (e.g. Prefect flows) can link to the session's CloudWatch logs.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionStartedRecord {
    pub session_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub task_id: Option<String>,
    pub start_slot: u64,
    pub end_slot: u64,
}

impl AccountDiffRow {
    pub fn from_diff(
        slot: u64,
        tx_hash: &str,
        tx_index: Option<u32>,
        block_time: Option<i64>,
        diff: &AccountDiff,
        sol_changes: &[SolChange],
        token_changes: &[TokenChange],
    ) -> Self {
        let sol = sol_changes.iter().find(|s| s.pubkey == diff.account);
        let token = token_changes.iter().find(|t| t.pubkey == diff.account);

        Self {
            slot,
            tx_hash: tx_hash.to_string(),
            account: diff.account.clone(),
            tx_index,
            block_time,
            pre_lamports: sol.map(|s| s.pre_lamports),
            post_lamports: sol.map(|s| s.post_lamports),
            pre_state: diff.pre_state.clone(),
            post_state: diff.post_state.clone(),
            pre_tokens: token.map(|t| TokenInfo {
                mint: t.mint.clone(),
                owner: t.owner.clone(),
                amount: t.pre_amount,
                decimals: t.decimals,
            }),
            post_tokens: token.map(|t| TokenInfo {
                mint: t.mint.clone(),
                owner: t.owner.clone(),
                amount: t.post_amount,
                decimals: t.decimals,
            }),
        }
    }
}

/// Error for an NDJSON stream that lacks the leading `metadata` envelope.
pub fn missing_metadata_envelope(path: &std::path::Path) -> eyre::Report {
    eyre::eyre!(
        "NDJSON stream {} is missing the leading `metadata` envelope",
        path.display()
    )
}

/// Outcome of [`scan_ndjson_events`].
pub enum NdjsonScan {
    Complete,
    /// First non-blank line is not an envelope; no further lines were consumed.
    NotAnEnvelopeStream {
        line_number: usize,
        error: serde_json::Error,
    },
}

/// Stream envelope events line by line without buffering the file. `E` is the
/// envelope to deserialize per line — [`OutputEvent`] for full fidelity, or a
/// caller-local slim mirror that skips payloads it doesn't need.
/// A malformed line after the first envelope fails with a line-numbered error.
pub fn scan_ndjson_events<R: BufRead, E: serde::de::DeserializeOwned>(
    mut reader: R,
    path: &std::path::Path,
    on_event: &mut dyn FnMut(E) -> eyre::Result<()>,
) -> eyre::Result<NdjsonScan> {
    let mut line = String::new();
    let mut line_number = 0_usize;
    let mut ndjson = false;
    loop {
        line.clear();
        if reader
            .read_line(&mut line)
            .with_context(|| format!("failed to read {}", path.display()))?
            == 0
        {
            return Ok(NdjsonScan::Complete);
        }
        line_number += 1;
        if line.trim().is_empty() {
            continue;
        }
        match serde_json::from_str::<E>(&line) {
            Ok(event) => {
                ndjson = true;
                on_event(event)?;
            }
            Err(error) if ndjson => {
                return Err(error).with_context(|| {
                    format!(
                        "failed to parse NDJSON line {line_number} in {}",
                        path.display()
                    )
                });
            }
            Err(error) => return Ok(NdjsonScan::NotAnEnvelopeStream { line_number, error }),
        }
    }
}

/// Load a previously written run from disk. Accepts either the single-blob
/// `SimulationOutput` JSON or an NDJSON envelope stream — tries the former,
/// falls back to the latter on parse failure. Returned `transactions` are NOT
/// sorted; callers that need stable ordering should sort by
/// `(slot, signature)` themselves.
pub fn load_simulation_output(path: &std::path::Path) -> eyre::Result<SimulationOutput> {
    let raw = std::fs::read_to_string(path)
        .with_context(|| format!("failed to read {}", path.display()))?;

    let Ok(out) = serde_json::from_str::<SimulationOutput>(&raw) else {
        return parse_ndjson_stream(&raw, path);
    };

    Ok(out)
}

fn parse_ndjson_stream(raw: &str, path: &std::path::Path) -> eyre::Result<SimulationOutput> {
    let mut metadata: Option<SimulationMetadata> = None;
    let mut summary: Option<SimulationSummary> = None;
    let mut transactions: Vec<Transaction> = Vec::new();
    let mut diffs: Vec<AccountDiffRow> = Vec::new();

    let scan = scan_ndjson_events(raw.as_bytes(), path, &mut |event| {
        match event {
            OutputEvent::Metadata(m) => metadata = Some(m),
            OutputEvent::Tx(tx) => transactions.push(tx),
            OutputEvent::Summary(s) => summary = Some(s),
            OutputEvent::Diff(d) => diffs.push(d),
            OutputEvent::SessionStarted(_) => {}
        }
        Ok(())
    })?;
    if let NdjsonScan::NotAnEnvelopeStream { line_number, error } = scan {
        return Err(error).with_context(|| {
            format!(
                "failed to parse NDJSON line {line_number} in {}",
                path.display()
            )
        });
    }

    // Group diffs onto their parent transaction. Diffs and txs can arrive
    // in any order on the stream, so index transactions first.
    let tx_index: HashMap<(u64, String), usize> = transactions
        .iter()
        .enumerate()
        .map(|(idx, tx)| ((tx.slot, tx.signature.clone()), idx))
        .collect();
    for diff in diffs {
        if let Some(&idx) = tx_index.get(&(diff.slot, diff.tx_hash)) {
            transactions[idx].account_diffs.push(AccountDiff {
                account: diff.account,
                pre_state: diff.pre_state,
                post_state: diff.post_state,
            });
        }
    }

    let mut metadata = metadata.ok_or_else(|| missing_metadata_envelope(path))?;
    let summary = summary.unwrap_or_else(|| SimulationSummary {
        total_transactions: transactions.len(),
        successes: transactions.iter().filter(|t| t.success).count(),
        failures: transactions.iter().filter(|t| !t.success).count(),
        session_ids: Vec::new(),
    });
    // Lift session_ids onto metadata so callers see one consistent shape.
    if metadata.session_ids.is_empty() && !summary.session_ids.is_empty() {
        metadata.session_ids = summary.session_ids.clone();
    }

    Ok(SimulationOutput {
        metadata,
        transactions,
        summary,
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn output_event_session_started_wire_format() {
        let value = OutputEvent::SessionStarted(SessionStartedRecord {
            session_id: "backtest_abc".to_string(),
            task_id: Some("task_xyz".to_string()),
            start_slot: 1,
            end_slot: 2,
        });
        let json: serde_json::Value =
            serde_json::from_str(&serde_json::to_string(&value).unwrap()).unwrap();
        assert_eq!(json["type"], "session_started");
        assert_eq!(json["data"]["session_id"], "backtest_abc");
        assert_eq!(json["data"]["task_id"], "task_xyz");
        assert_eq!(json["data"]["start_slot"], 1);
        assert_eq!(json["data"]["end_slot"], 2);
    }

    #[test]
    fn output_event_session_started_omits_null_task_id() {
        let value = OutputEvent::SessionStarted(SessionStartedRecord {
            session_id: "backtest_abc".to_string(),
            task_id: None,
            start_slot: 1,
            end_slot: 2,
        });
        let json: serde_json::Value =
            serde_json::from_str(&serde_json::to_string(&value).unwrap()).unwrap();
        assert_eq!(json["type"], "session_started");
        assert!(json["data"].get("task_id").is_none());
    }
}