sim-cli 0.7.0

CLI tool for running and comparing Solana simulator backtests
Documentation
use std::sync::{
    Arc,
    atomic::{AtomicU64, AtomicUsize, Ordering},
};

use dashmap::{DashMap, DashSet};
use simulator_client::AccountDiffNotification;
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction};
use tokio::sync::mpsc::UnboundedSender;

use crate::{
    fetch::transaction_from_encoded,
    output::{AccountDiff, AccountDiffRow, OutputEvent, SolChange, TokenChange, Transaction},
};

/// How many slots of tx records to retain in [`RecordsStore`] for account-diff
/// enrichment. Entries older than `max_seen_slot - RECORDS_RETENTION_SLOTS`
/// are dropped on the next slot advance. Set wide enough to absorb the skew
/// between the `transactionSubscribe` and `accountDiffSubscribe` streams in
/// account-diff mode; a late diff that misses the window degrades to empty
/// enrichment (the existing `None` arm in `on_account_diff_notification`).
const RECORDS_RETENTION_SLOTS: u64 = 64;

/// Per-tx records keyed by signature, backing two consumers:
///
/// - Summary totals at end-of-run, served by [`Self::total`] /
///   [`Self::successes`] counters that don't depend on map retention so they
///   stay accurate across the whole run.
/// - Account-diff enrichment, served by lookup against `transactions`. Only
///   used in `--subscription account-diff` mode to match each diff back to its
///   tx's balance/token data.
///
/// The store is bounded to the last `RECORDS_RETENTION_SLOTS` slots so a long
/// run can't accumulate every tx ever seen.
#[derive(Clone)]
pub(crate) struct RecordsStore {
    transactions: Arc<DashMap<String, BalanceChanges>>,
    /// Emitted account-diff identities, used to drop diffs a reconnect replay
    /// re-delivers for the resumed boundary slot. Bounded to the same slot
    /// window as `transactions`.
    account_diffs: Arc<DashSet<DiffKey>>,
    /// Independent retention high-water marks for `transactions` and
    /// `account_diffs`: the tx and diff streams arrive on separate channels, so
    /// a shared mark would let whichever advances first starve the other of
    /// pruning.
    tx_max_slot: Arc<AtomicU64>,
    diff_max_slot: Arc<AtomicU64>,
    total: Arc<AtomicUsize>,
    successes: Arc<AtomicUsize>,
}

/// Identity of an emitted account-diff: `(slot, transaction signature, account)`.
type DiffKey = (u64, String, String);

/// Slim per-tx record: just what diff enrichment and retention need. Storing
/// the full [`Transaction`] would clone every log line on the hot path for
/// data nothing ever reads back.
struct BalanceChanges {
    slot: u64,
    sol_changes: Vec<SolChange>,
    token_changes: Vec<TokenChange>,
}

impl RecordsStore {
    pub(crate) fn new() -> Self {
        Self {
            transactions: Arc::new(DashMap::new()),
            account_diffs: Arc::new(DashSet::new()),
            tx_max_slot: Arc::new(AtomicU64::new(0)),
            diff_max_slot: Arc::new(AtomicU64::new(0)),
            total: Arc::new(AtomicUsize::new(0)),
            successes: Arc::new(AtomicUsize::new(0)),
        }
    }

    pub(crate) fn total(&self) -> usize {
        self.total.load(Ordering::Relaxed)
    }

    pub(crate) fn successes(&self) -> usize {
        self.successes.load(Ordering::Relaxed)
    }

    fn get(&self, sig: &str) -> Option<dashmap::mapref::one::Ref<'_, String, BalanceChanges>> {
        self.transactions.get(sig)
    }

    /// Record a tx, returning `true` only if this signature was not already
    /// present. Counters and retention advance solely for genuinely new
    /// records, so a reconnect that re-delivers an already-seen slot via the
    /// `replayFromSlot` resume cursor is idempotent — neither double-counted nor
    /// re-emitted.
    fn note_transaction(&self, sig: String, tx: &Transaction) -> bool {
        let stored = BalanceChanges {
            slot: tx.slot,
            sol_changes: tx.sol_changes.clone(),
            token_changes: tx.token_changes.clone(),
        };
        if self.transactions.insert(sig, stored).is_some() {
            return false;
        }
        self.total.fetch_add(1, Ordering::Relaxed);
        if tx.success {
            self.successes.fetch_add(1, Ordering::Relaxed);
        }
        advance_window(&self.tx_max_slot, tx.slot, |cutoff| {
            self.transactions.retain(|_, v| v.slot >= cutoff)
        });
        true
    }

    /// Record that an account-diff identity has been emitted, returning `true`
    /// the first time. Keeps the diff output duplicate-free when a resume replay
    /// re-delivers a boundary slot, bounded to the retention window so it can't
    /// grow unbounded.
    fn note_diff(&self, slot: u64, sig: &str, account: &str) -> bool {
        let is_new = self
            .account_diffs
            .insert((slot, sig.to_string(), account.to_string()));
        advance_window(&self.diff_max_slot, slot, |cutoff| {
            self.account_diffs.retain(|(s, _, _)| *s >= cutoff)
        });
        is_new
    }
}

/// Advance a retention high-water mark to `slot`; when it crosses into a new
/// max, invoke `prune` with the `slot - RECORDS_RETENTION_SLOTS` cutoff so the
/// caller evicts entries older than the window.
fn advance_window(max_slot: &AtomicU64, slot: u64, prune: impl FnOnce(u64)) {
    let prev_max = max_slot.fetch_max(slot, Ordering::Relaxed);
    if slot > prev_max
        && let Some(cutoff) = slot.checked_sub(RECORDS_RETENTION_SLOTS)
    {
        prune(cutoff);
    }
}

/// Handle a single account-diff notification from the subscription manager.
///
/// Emits a `diff` event on the NDJSON stream. Enrichment (`pre_lamports`,
/// `post_lamports`, `pre_tokens`, `post_tokens`) is read from the shared
/// records store, populated by the `transactionSubscribe` stream running
/// alongside this subscription in account-diff mode. If the matching tx
/// record hasn't arrived yet (small race window between two WS channels) or
/// has already aged out of the slot-window, the row's enrichment fields are
/// emitted as `None`.
pub(crate) fn on_account_diff_notification(
    records: RecordsStore,
    output_tx: UnboundedSender<OutputEvent>,
    notification: AccountDiffNotification,
) {
    let Some(sig) = notification.signature else {
        return;
    };
    let Some(account) = notification.account else {
        return;
    };
    // Use the notification's slot, not the RecordsStore entry's, so slot,
    // tx_index, and block_time stay internally consistent — entry.slot can
    // lag across the two WS channels.
    let slot = notification.context.slot;
    // Skip a diff a reconnect already delivered (the NDJSON stream isn't deduped).
    if !records.note_diff(slot, &sig, &account) {
        return;
    }
    let tx_index = notification.tx_index;
    let block_time = notification.block_time;
    let diff = AccountDiff {
        account,
        pre_state: notification.pre,
        post_state: notification.post,
    };

    let row = match records.get(&sig) {
        Some(entry) => AccountDiffRow::from_diff(
            slot,
            &sig,
            tx_index,
            block_time,
            &diff,
            &entry.sol_changes,
            &entry.token_changes,
        ),
        None => AccountDiffRow::from_diff(slot, &sig, tx_index, block_time, &diff, &[], &[]),
    };

    let _ = output_tx.send(OutputEvent::Diff(row));
}

/// Handle a single `transactionSubscribe` notification.
///
/// The server pushes a fully-assembled, `getTransaction`-shaped payload, so we
/// build the [`Transaction`] record directly from the notification and skip
/// any per-tx fetch. Logs come from the meta in the payload.
pub(crate) fn on_transaction_notification(
    records: RecordsStore,
    output_tx: UnboundedSender<OutputEvent>,
    notification: EncodedConfirmedTransactionWithStatusMeta,
) {
    let slot = notification.slot;
    let Some(signature) = primary_signature(&notification.transaction.transaction) else {
        tracing::warn!(
            slot,
            "transactionSubscribe notification missing primary signature; dropping"
        );
        return;
    };

    let logs = notification
        .transaction
        .meta
        .as_ref()
        .and_then(|meta| match &meta.log_messages {
            solana_transaction_status::option_serializer::OptionSerializer::Some(v) => {
                Some(v.clone())
            }
            _ => None,
        })
        .unwrap_or_default();

    let Some(mut tx) = transaction_from_encoded(notification, &signature, slot) else {
        tracing::warn!(
            slot,
            signature,
            "transactionSubscribe payload could not be decoded into a Transaction; dropping"
        );
        return;
    };
    tx.logs = logs;
    // Emit only on first sight; a reconnect can re-deliver a slot (output isn't deduped).
    if records.note_transaction(signature, &tx) {
        let _ = output_tx.send(OutputEvent::Tx(tx));
    }
}

/// Pull the first signature off an [`EncodedTransaction`]. Returns `None` for
/// encodings we don't decode (binary forms) or for a transaction with no
/// signatures, which we'd have no way to key the records map by.
fn primary_signature(encoded: &EncodedTransaction) -> Option<String> {
    match encoded {
        EncodedTransaction::Json(ui_tx) => ui_tx.signatures.first().cloned(),
        EncodedTransaction::LegacyBinary(_) | EncodedTransaction::Binary(_, _) => None,
        EncodedTransaction::Accounts(parsed) => parsed.signatures.first().cloned(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn tx(slot: u64, sig: &str, success: bool) -> Transaction {
        Transaction {
            slot,
            timestamp: None,
            signature: sig.to_string(),
            success,
            error: None,
            logs: Vec::new(),
            sol_changes: Vec::new(),
            token_changes: Vec::new(),
            account_diffs: Vec::new(),
        }
    }

    #[test]
    fn note_transaction_dedups_repeated_signatures() {
        let store = RecordsStore::new();
        assert!(store.note_transaction("sig".into(), &tx(10, "sig", true)));
        assert!(!store.note_transaction("sig".into(), &tx(10, "sig", true)));
        assert_eq!(store.total(), 1);
        assert_eq!(store.successes(), 1);
    }

    #[test]
    fn note_diff_dedups_repeated_identities() {
        let store = RecordsStore::new();
        assert!(store.note_diff(5, "sig", "acct"));
        assert!(!store.note_diff(5, "sig", "acct"));
        assert!(store.note_diff(5, "sig", "other"));
        assert!(store.note_diff(6, "sig", "acct"));
    }
}