sim-cli 0.2.0

CLI tool for running and comparing Solana simulator backtests
use std::{sync::Arc, time::Duration};

use dashmap::DashMap;
use eyre::Result;
use simulator_client::{
    BacktestSession, SubscriptionHandle, subscribe_program_diffs, subscribe_program_logs,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_commitment_config::CommitmentConfig;
use tracing::warn;

use crate::{
    fetch::fetch_transaction,
    output::{AccountDiff, AccountDiffRow, Transaction},
};

const SUB_CONNECT_RETRIES: u32 = 3;
const SUB_CONNECT_RETRY_BASE_MS: u64 = 1_000;

pub(crate) async fn set_up_log_sub(
    session: &BacktestSession,
    program_id: &str,
    records: Arc<DashMap<String, Transaction>>,
) -> Result<SubscriptionHandle> {
    let rpc_url = session
        .rpc_endpoint()
        .ok_or_else(|| eyre::eyre!("session has no RPC endpoint"))?
        .to_string();

    let rpc = Arc::new(RpcClient::new_with_commitment(
        rpc_url.clone(),
        CommitmentConfig::confirmed(),
    ));

    let mut last_err = None;
    for attempt in 0..SUB_CONNECT_RETRIES {
        if attempt > 0 {
            let delay = Duration::from_millis(SUB_CONNECT_RETRY_BASE_MS * (1 << (attempt - 1)));
            warn!(attempt, ?delay, "retrying pubsub log subscription connect");
            tokio::time::sleep(delay).await;
        }
        let records = records.clone();
        let rpc = rpc.clone();
        match subscribe_program_logs(
            &rpc_url,
            program_id,
            CommitmentConfig::confirmed(),
            move |notification| {
                let records = records.clone();
                let rpc = rpc.clone();
                async move {
                    let slot = notification.context.slot;
                    let signature = notification.value.signature.clone();
                    let logs = notification.value.logs.clone();
                    let mut tx = match fetch_transaction(&rpc, &signature, slot).await {
                        Some(t) => t,
                        None => {
                            let err_str = notification.value.err.as_ref().map(|e| e.to_string());
                            Transaction {
                                slot,
                                timestamp: None,
                                signature: signature.clone(),
                                success: err_str.is_none(),
                                error: err_str,
                                logs: Vec::new(),
                                sol_changes: Vec::new(),
                                token_changes: Vec::new(),
                                account_diffs: Vec::new(),
                            }
                        }
                    };
                    tx.logs = logs;
                    records.insert(signature, tx);
                }
            },
        )
        .await
        {
            Ok(handle) => return Ok(handle.into()),
            Err(e) => {
                warn!(attempt, error = %e, "pubsub log subscription connect failed");
                last_err = Some(e);
            }
        }
    }

    Err(eyre::eyre!(
        "failed to subscribe to program logs after {SUB_CONNECT_RETRIES} attempts: {}",
        last_err.unwrap()
    ))
}

pub(crate) async fn set_up_account_diff_sub(
    session: &mut BacktestSession,
    program_id: &str,
    records: Arc<DashMap<String, Transaction>>,
    stream_tx: Option<tokio::sync::mpsc::UnboundedSender<AccountDiffRow>>,
) -> Result<Vec<SubscriptionHandle>> {
    let rpc_url = session
        .rpc_endpoint()
        .ok_or_else(|| eyre::eyre!("session has no RPC endpoint"))?
        .to_string();
    let rpc = Arc::new(RpcClient::new(rpc_url.clone()));

    let mut last_err = None;
    for attempt in 0..SUB_CONNECT_RETRIES {
        if attempt > 0 {
            let delay = Duration::from_millis(SUB_CONNECT_RETRY_BASE_MS * (1 << (attempt - 1)));
            warn!(attempt, ?delay, "retrying pubsub diff subscription connect");
            tokio::time::sleep(delay).await;
        }
        let rpc_url = rpc_url.clone();
        let rpc = rpc.clone();
        let records = records.clone();
        let stream_tx = stream_tx.clone();
        let result = subscribe_program_diffs(&rpc_url, program_id, move |event| {
            let records = records.clone();
            let rpc = rpc.clone();
            let stream_tx = stream_tx.clone();
            async move {
                let Some(sig) = event.signature else { return };
                let Some(account) = event.account else { return };
                let slot = event.context.slot;
                let diff = AccountDiff {
                    account,
                    pre_state: event.pre,
                    post_state: event.post,
                };

                // Fast path: transaction already in map (another diff for the same sig arrived first).
                if let Some(mut entry) = records.get_mut(&sig) {
                    if let Some(ref tx) = stream_tx {
                        let row = AccountDiffRow::from_diff(
                            entry.slot,
                            &sig,
                            &diff,
                            &entry.sol_changes,
                            &entry.token_changes,
                        );
                        let _ = tx.send(row);
                    } else {
                        entry.account_diffs.push(diff);
                    }
                    return;
                }

                // Slow path: fetch from RPC, then insert.
                // or_insert_with handles the race where two callbacks both miss get_mut.
                // If the fetch fails we still insert the diff — we just won't have sol/token
                // metadata.
                let fetched = fetch_transaction(&rpc, &sig, slot).await;
                if fetched.is_none() {
                    tracing::warn!(sig, slot, "failed to fetch transaction metadata");
                }

                let streaming = if let Some(ref tx) = stream_tx {
                    let (sol, tok) = match &fetched {
                        Some(f) => (f.sol_changes.as_slice(), f.token_changes.as_slice()),
                        None => (&[][..], &[][..]),
                    };
                    let _ = tx.send(AccountDiffRow::from_diff(slot, &sig, &diff, sol, tok));
                    true
                } else {
                    false
                };

                let sig_fallback = sig.clone();
                let mut entry = records.entry(sig).or_insert_with(|| match fetched {
                    Some(tx) => Transaction {
                        slot: tx.slot,
                        timestamp: tx.timestamp,
                        signature: tx.signature,
                        success: tx.success,
                        error: tx.error,
                        logs: Vec::new(),
                        sol_changes: tx.sol_changes,
                        token_changes: tx.token_changes,
                        account_diffs: Vec::new(),
                    },
                    None => Transaction {
                        slot,
                        timestamp: None,
                        signature: sig_fallback,
                        success: false,
                        error: None,
                        logs: Vec::new(),
                        sol_changes: Vec::new(),
                        token_changes: Vec::new(),
                        account_diffs: Vec::new(),
                    },
                });
                if !streaming {
                    entry.account_diffs.push(diff);
                }
            }
        })
        .await;
        match result {
            Ok(handle) => return Ok(vec![handle.into()]),
            Err(e) => {
                warn!(attempt, error = %e, "pubsub diff subscription connect failed");
                last_err = Some(e);
            }
        }
    }

    Err(eyre::eyre!(
        "failed to subscribe to account diffs after {SUB_CONNECT_RETRIES} attempts: {}",
        last_err.unwrap()
    ))
}