sim-cli 0.6.0

CLI tool for running and comparing Solana simulator backtests
use std::sync::{
    Arc,
    atomic::{AtomicU64, AtomicUsize, Ordering},
};

use dashmap::DashMap;
use simulator_client::AccountDiffNotification;
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction};
use tokio::sync::mpsc::UnboundedSender;

use crate::{
    fetch::transaction_from_encoded,
    output::{AccountDiff, AccountDiffRow, OutputEvent, Transaction},
};

/// How many slots of tx records to retain in [`RecordsStore`] for account-diff
/// enrichment. Entries older than `max_seen_slot - RECORDS_RETENTION_SLOTS`
/// are dropped on the next slot advance. Set wide enough to absorb the skew
/// between the `transactionSubscribe` and `accountDiffSubscribe` streams in
/// account-diff mode; a late diff that misses the window degrades to empty
/// enrichment (the existing `None` arm in `on_account_diff_notification`).
const RECORDS_RETENTION_SLOTS: u64 = 64;

/// Per-tx records keyed by signature, backing two consumers:
///
/// - Summary totals at end-of-run, served by [`Self::total`] /
///   [`Self::successes`] counters that don't depend on map retention so they
///   stay accurate across the whole run.
/// - Account-diff enrichment, served by lookup against `map`. Only used in
///   `--subscription account-diff` mode to match each diff back to its tx's
///   balance/token data.
///
/// The map is bounded to the last `RECORDS_RETENTION_SLOTS` slots so a long
/// run can't accumulate every tx ever seen.
#[derive(Clone)]
pub(crate) struct RecordsStore {
    map: Arc<DashMap<String, Transaction>>,
    max_slot: Arc<AtomicU64>,
    total: Arc<AtomicUsize>,
    successes: Arc<AtomicUsize>,
}

impl RecordsStore {
    pub(crate) fn new() -> Self {
        Self {
            map: Arc::new(DashMap::new()),
            max_slot: Arc::new(AtomicU64::new(0)),
            total: Arc::new(AtomicUsize::new(0)),
            successes: Arc::new(AtomicUsize::new(0)),
        }
    }

    pub(crate) fn total(&self) -> usize {
        self.total.load(Ordering::Relaxed)
    }

    pub(crate) fn successes(&self) -> usize {
        self.successes.load(Ordering::Relaxed)
    }

    fn get(&self, sig: &str) -> Option<dashmap::mapref::one::Ref<'_, String, Transaction>> {
        self.map.get(sig)
    }

    fn insert(&self, sig: String, tx: Transaction) {
        self.total.fetch_add(1, Ordering::Relaxed);
        if tx.success {
            self.successes.fetch_add(1, Ordering::Relaxed);
        }
        let slot = tx.slot;
        let prev_max = self.max_slot.fetch_max(slot, Ordering::Relaxed);
        self.map.insert(sig, tx);
        if slot > prev_max
            && let Some(cutoff) = slot.checked_sub(RECORDS_RETENTION_SLOTS)
        {
            self.map.retain(|_, v| v.slot >= cutoff);
        }
    }
}

/// Handle a single account-diff notification from the subscription manager.
///
/// Emits a `diff` event on the NDJSON stream. Enrichment (`pre_lamports`,
/// `post_lamports`, `pre_tokens`, `post_tokens`) is read from the shared
/// records store, populated by the `transactionSubscribe` stream running
/// alongside this subscription in account-diff mode. If the matching tx
/// record hasn't arrived yet (small race window between two WS channels) or
/// has already aged out of the slot-window, the row's enrichment fields are
/// emitted as `None`.
pub(crate) fn on_account_diff_notification(
    records: RecordsStore,
    output_tx: UnboundedSender<OutputEvent>,
    notification: AccountDiffNotification,
) {
    let Some(sig) = notification.signature else {
        return;
    };
    let Some(account) = notification.account else {
        return;
    };
    // Use the notification's slot, not the RecordsStore entry's, so slot,
    // tx_index, and block_time stay internally consistent — entry.slot can
    // lag across the two WS channels.
    let slot = notification.context.slot;
    let tx_index = notification.tx_index;
    let block_time = notification.block_time;
    let diff = AccountDiff {
        account,
        pre_state: notification.pre,
        post_state: notification.post,
    };

    let row = match records.get(&sig) {
        Some(entry) => AccountDiffRow::from_diff(
            slot,
            &sig,
            tx_index,
            block_time,
            &diff,
            &entry.sol_changes,
            &entry.token_changes,
        ),
        None => AccountDiffRow::from_diff(slot, &sig, tx_index, block_time, &diff, &[], &[]),
    };

    let _ = output_tx.send(OutputEvent::Diff(row));
}

/// Handle a single `transactionSubscribe` notification.
///
/// The server pushes a fully-assembled, `getTransaction`-shaped payload, so we
/// build the [`Transaction`] record directly from the notification and skip
/// any per-tx fetch. Logs come from the meta in the payload.
pub(crate) fn on_transaction_notification(
    records: RecordsStore,
    output_tx: UnboundedSender<OutputEvent>,
    notification: EncodedConfirmedTransactionWithStatusMeta,
) {
    let slot = notification.slot;
    let Some(signature) = primary_signature(&notification.transaction.transaction) else {
        tracing::warn!(
            slot,
            "transactionSubscribe notification missing primary signature; dropping"
        );
        return;
    };

    let logs = notification
        .transaction
        .meta
        .as_ref()
        .and_then(|meta| match &meta.log_messages {
            solana_transaction_status::option_serializer::OptionSerializer::Some(v) => {
                Some(v.clone())
            }
            _ => None,
        })
        .unwrap_or_default();

    let Some(mut tx) = transaction_from_encoded(notification, &signature, slot) else {
        tracing::warn!(
            slot,
            signature,
            "transactionSubscribe payload could not be decoded into a Transaction; dropping"
        );
        return;
    };
    tx.logs = logs;
    records.insert(signature, tx.clone());
    let _ = output_tx.send(OutputEvent::Tx(tx));
}

/// Pull the first signature off an [`EncodedTransaction`]. Returns `None` for
/// encodings we don't decode (binary forms) or for a transaction with no
/// signatures, which we'd have no way to key the records map by.
fn primary_signature(encoded: &EncodedTransaction) -> Option<String> {
    match encoded {
        EncodedTransaction::Json(ui_tx) => ui_tx.signatures.first().cloned(),
        EncodedTransaction::LegacyBinary(_) | EncodedTransaction::Binary(_, _) => None,
        EncodedTransaction::Accounts(parsed) => parsed.signatures.first().cloned(),
    }
}