use std::{sync::Arc, time::Duration};
use dashmap::DashMap;
use eyre::Result;
use simulator_client::{
BacktestSession, SubscriptionHandle, subscribe_program_diffs, subscribe_program_logs,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_commitment_config::CommitmentConfig;
use tracing::warn;
use crate::{
fetch::fetch_transaction,
output::{AccountDiff, AccountDiffRow, Transaction},
};
const SUB_CONNECT_RETRIES: u32 = 3;
const SUB_CONNECT_RETRY_BASE_MS: u64 = 1_000;
pub(crate) async fn set_up_log_sub(
session: &BacktestSession,
program_id: &str,
records: Arc<DashMap<String, Transaction>>,
) -> Result<SubscriptionHandle> {
let rpc_url = session
.rpc_endpoint()
.ok_or_else(|| eyre::eyre!("session has no RPC endpoint"))?
.to_string();
let rpc = Arc::new(RpcClient::new_with_commitment(
rpc_url.clone(),
CommitmentConfig::confirmed(),
));
let mut last_err = None;
for attempt in 0..SUB_CONNECT_RETRIES {
if attempt > 0 {
let delay = Duration::from_millis(SUB_CONNECT_RETRY_BASE_MS * (1 << (attempt - 1)));
warn!(attempt, ?delay, "retrying pubsub log subscription connect");
tokio::time::sleep(delay).await;
}
let records = records.clone();
let rpc = rpc.clone();
match subscribe_program_logs(
&rpc_url,
program_id,
CommitmentConfig::confirmed(),
move |notification| {
let records = records.clone();
let rpc = rpc.clone();
async move {
let slot = notification.context.slot;
let signature = notification.value.signature.clone();
let logs = notification.value.logs.clone();
let mut tx = match fetch_transaction(&rpc, &signature, slot).await {
Some(t) => t,
None => {
let err_str = notification.value.err.as_ref().map(|e| e.to_string());
Transaction {
slot,
timestamp: None,
signature: signature.clone(),
success: err_str.is_none(),
error: err_str,
logs: Vec::new(),
sol_changes: Vec::new(),
token_changes: Vec::new(),
account_diffs: Vec::new(),
}
}
};
tx.logs = logs;
records.insert(signature, tx);
}
},
)
.await
{
Ok(handle) => return Ok(handle.into()),
Err(e) => {
warn!(attempt, error = %e, "pubsub log subscription connect failed");
last_err = Some(e);
}
}
}
Err(eyre::eyre!(
"failed to subscribe to program logs after {SUB_CONNECT_RETRIES} attempts: {}",
last_err.unwrap()
))
}
pub(crate) async fn set_up_account_diff_sub(
session: &mut BacktestSession,
program_id: &str,
records: Arc<DashMap<String, Transaction>>,
stream_tx: Option<tokio::sync::mpsc::UnboundedSender<AccountDiffRow>>,
) -> Result<Vec<SubscriptionHandle>> {
let rpc_url = session
.rpc_endpoint()
.ok_or_else(|| eyre::eyre!("session has no RPC endpoint"))?
.to_string();
let rpc = Arc::new(RpcClient::new(rpc_url.clone()));
let mut last_err = None;
for attempt in 0..SUB_CONNECT_RETRIES {
if attempt > 0 {
let delay = Duration::from_millis(SUB_CONNECT_RETRY_BASE_MS * (1 << (attempt - 1)));
warn!(attempt, ?delay, "retrying pubsub diff subscription connect");
tokio::time::sleep(delay).await;
}
let rpc_url = rpc_url.clone();
let rpc = rpc.clone();
let records = records.clone();
let stream_tx = stream_tx.clone();
let result = subscribe_program_diffs(&rpc_url, program_id, move |event| {
let records = records.clone();
let rpc = rpc.clone();
let stream_tx = stream_tx.clone();
async move {
let Some(sig) = event.signature else { return };
let Some(account) = event.account else { return };
let slot = event.context.slot;
let diff = AccountDiff {
account,
pre_state: event.pre,
post_state: event.post,
};
if let Some(mut entry) = records.get_mut(&sig) {
if let Some(ref tx) = stream_tx {
let row = AccountDiffRow::from_diff(
entry.slot,
&sig,
&diff,
&entry.sol_changes,
&entry.token_changes,
);
let _ = tx.send(row);
} else {
entry.account_diffs.push(diff);
}
return;
}
let fetched = fetch_transaction(&rpc, &sig, slot).await;
if fetched.is_none() {
tracing::warn!(sig, slot, "failed to fetch transaction metadata");
}
let streaming = if let Some(ref tx) = stream_tx {
let (sol, tok) = match &fetched {
Some(f) => (f.sol_changes.as_slice(), f.token_changes.as_slice()),
None => (&[][..], &[][..]),
};
let _ = tx.send(AccountDiffRow::from_diff(slot, &sig, &diff, sol, tok));
true
} else {
false
};
let sig_fallback = sig.clone();
let mut entry = records.entry(sig).or_insert_with(|| match fetched {
Some(tx) => Transaction {
slot: tx.slot,
timestamp: tx.timestamp,
signature: tx.signature,
success: tx.success,
error: tx.error,
logs: Vec::new(),
sol_changes: tx.sol_changes,
token_changes: tx.token_changes,
account_diffs: Vec::new(),
},
None => Transaction {
slot,
timestamp: None,
signature: sig_fallback,
success: false,
error: None,
logs: Vec::new(),
sol_changes: Vec::new(),
token_changes: Vec::new(),
account_diffs: Vec::new(),
},
});
if !streaming {
entry.account_diffs.push(diff);
}
}
})
.await;
match result {
Ok(handle) => return Ok(vec![handle.into()]),
Err(e) => {
warn!(attempt, error = %e, "pubsub diff subscription connect failed");
last_err = Some(e);
}
}
}
Err(eyre::eyre!(
"failed to subscribe to account diffs after {SUB_CONNECT_RETRIES} attempts: {}",
last_err.unwrap()
))
}