use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender};
use jsonrpc_core::futures::future::join_all;
use litesvm::types::{FailedTransactionMetadata, SimulatedTransactionInfo, TransactionResult};
use solana_account::Account;
use solana_account_decoder::parse_bpf_loader::{
parse_bpf_upgradeable_loader, BpfUpgradeableLoaderAccountType, UiProgram,
};
use solana_address_lookup_table_interface::state::AddressLookupTable;
use solana_client::rpc_response::RpcKeyedAccount;
use solana_clock::Slot;
use solana_commitment_config::CommitmentConfig;
use solana_epoch_info::EpochInfo;
use solana_hash::Hash;
use solana_message::{
v0::{LoadedAddresses, MessageAddressTableLookup},
VersionedMessage,
};
use solana_pubkey::Pubkey;
use solana_sdk::{
bpf_loader_upgradeable::{get_program_data_address, UpgradeableLoaderState},
transaction::VersionedTransaction,
};
use solana_signature::Signature;
use solana_transaction_error::TransactionError;
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding};
use surfpool_types::{
ComputeUnitsEstimationResult, ProfileResult, SimnetEvent, TransactionConfirmationStatus,
TransactionStatusEvent,
};
use tokio::sync::RwLock;
use super::{
remote::{SomeRemoteCtx, SurfnetRemoteClient},
AccountFactory, GetAccountResult, GetTransactionResult, GeyserEvent, SignatureSubscriptionType,
SurfnetSvm,
};
use crate::{
error::{SurfpoolError, SurfpoolResult},
rpc::utils::convert_transaction_metadata_from_canonical,
};
pub struct SvmAccessContext<T> {
pub slot: Slot,
pub latest_epoch_info: EpochInfo,
pub latest_blockhash: Hash,
pub inner: T,
}
impl<T> SvmAccessContext<T> {
pub fn new(slot: Slot, latest_epoch_info: EpochInfo, latest_blockhash: Hash, inner: T) -> Self {
Self {
slot,
latest_blockhash,
latest_epoch_info,
inner,
}
}
pub fn inner(&self) -> &T {
&self.inner
}
pub fn with_new_value<N>(&self, inner: N) -> SvmAccessContext<N> {
SvmAccessContext {
slot: self.slot,
latest_blockhash: self.latest_blockhash,
latest_epoch_info: self.latest_epoch_info.clone(),
inner,
}
}
}
pub type SurfpoolContextualizedResult<T> = SurfpoolResult<SvmAccessContext<T>>;
pub struct SurfnetSvmLocker(pub Arc<RwLock<SurfnetSvm>>);
impl Clone for SurfnetSvmLocker {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl SurfnetSvmLocker {
pub fn with_svm_reader<T, F>(&self, reader: F) -> T
where
F: Fn(&SurfnetSvm) -> T + Send + Sync,
T: Send + 'static,
{
let read_lock = self.0.clone();
tokio::task::block_in_place(move || {
let read_guard = read_lock.blocking_read();
reader(&read_guard)
})
}
fn with_contextualized_svm_reader<T, F>(&self, reader: F) -> SvmAccessContext<T>
where
F: Fn(&SurfnetSvm) -> T + Send + Sync,
T: Send + 'static,
{
let read_lock = self.0.clone();
tokio::task::block_in_place(move || {
let read_guard = read_lock.blocking_read();
let res = reader(&read_guard);
SvmAccessContext::new(
read_guard.get_latest_absolute_slot(),
read_guard.latest_epoch_info(),
read_guard.latest_blockhash(),
res,
)
})
}
pub fn with_svm_writer<T, F>(&self, writer: F) -> T
where
F: Fn(&mut SurfnetSvm) -> T + Send + Sync,
T: Send + 'static,
{
let write_lock = self.0.clone();
tokio::task::block_in_place(move || {
let mut write_guard = write_lock.blocking_write();
writer(&mut write_guard)
})
}
}
impl SurfnetSvmLocker {
pub fn new(svm: SurfnetSvm) -> Self {
Self(Arc::new(RwLock::new(svm)))
}
pub async fn initialize(
&self,
remote_ctx: &Option<SurfnetRemoteClient>,
) -> SurfpoolResult<EpochInfo> {
let epoch_info = if let Some(remote_client) = remote_ctx {
remote_client.get_epoch_info().await?
} else {
EpochInfo {
epoch: 0,
slot_index: 0,
slots_in_epoch: 0,
absolute_slot: 0,
block_height: 0,
transaction_count: None,
}
};
self.with_svm_writer(|svm_writer| {
svm_writer.initialize(epoch_info.clone(), remote_ctx);
});
Ok(epoch_info)
}
}
impl SurfnetSvmLocker {
pub fn get_account_local(&self, pubkey: &Pubkey) -> SvmAccessContext<GetAccountResult> {
self.with_contextualized_svm_reader(|svm_reader| {
match svm_reader.inner.get_account(pubkey) {
Some(account) => GetAccountResult::FoundAccount(
*pubkey, account,
false,
),
None => GetAccountResult::None(*pubkey),
}
})
}
pub async fn get_account_local_then_remote(
&self,
client: &SurfnetRemoteClient,
pubkey: &Pubkey,
commitment_config: CommitmentConfig,
) -> SurfpoolContextualizedResult<GetAccountResult> {
let result = self.get_account_local(pubkey);
if result.inner.is_none() {
let remote_account = client.get_account(pubkey, commitment_config).await?;
Ok(result.with_new_value(remote_account))
} else {
Ok(result)
}
}
pub async fn get_account(
&self,
remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>,
pubkey: &Pubkey,
factory: Option<AccountFactory>,
) -> SurfpoolContextualizedResult<GetAccountResult> {
let result = if let Some((remote_client, commitment_config)) = remote_ctx {
self.get_account_local_then_remote(remote_client, pubkey, *commitment_config)
.await?
} else {
self.get_account_local(pubkey)
};
match (&result.inner, factory) {
(&GetAccountResult::None(_), Some(factory)) => {
let default = factory(self.clone());
Ok(result.with_new_value(default))
}
_ => Ok(result),
}
}
pub fn get_multiple_accounts_local(
&self,
pubkeys: &[Pubkey],
) -> SvmAccessContext<Vec<GetAccountResult>> {
self.with_contextualized_svm_reader(|svm_reader| {
let mut accounts = vec![];
for pubkey in pubkeys.iter() {
let res = match svm_reader.inner.get_account(pubkey) {
Some(account) => GetAccountResult::FoundAccount(
*pubkey, account,
false,
),
None => GetAccountResult::None(*pubkey),
};
accounts.push(res);
}
accounts
})
}
pub async fn get_multiple_accounts_local_then_remote(
&self,
client: &SurfnetRemoteClient,
pubkeys: &[Pubkey],
commitment_config: CommitmentConfig,
) -> SurfpoolContextualizedResult<Vec<GetAccountResult>> {
let results = self.get_multiple_accounts_local(pubkeys);
let mut missing_accounts = vec![];
for result in &results.inner {
if let GetAccountResult::None(pubkey) = result {
missing_accounts.push(*pubkey)
}
}
if missing_accounts.is_empty() {
return Ok(results);
}
let mut remote_results = client
.get_multiple_accounts(&missing_accounts, commitment_config)
.await?;
let mut combined_results = results.inner.clone();
combined_results.append(&mut remote_results);
Ok(results.with_new_value(combined_results))
}
pub async fn get_multiple_accounts(
&self,
remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>,
pubkeys: &[Pubkey],
factory: Option<AccountFactory>,
) -> SurfpoolContextualizedResult<Vec<GetAccountResult>> {
let results = if let Some((remote_client, commitment_config)) = remote_ctx {
self.get_multiple_accounts_local_then_remote(remote_client, pubkeys, *commitment_config)
.await?
} else {
self.get_multiple_accounts_local(pubkeys)
};
let mut combined = Vec::with_capacity(results.inner.len());
for result in results.inner.clone() {
match (&result, &factory) {
(&GetAccountResult::None(_), Some(factory)) => {
let default = factory(self.clone());
combined.push(default);
}
_ => combined.push(result),
}
}
Ok(results.with_new_value(combined))
}
}
impl SurfnetSvmLocker {
pub async fn get_transaction(
&self,
remote_ctx: &Option<(SurfnetRemoteClient, Option<UiTransactionEncoding>)>,
signature: &Signature,
) -> SvmAccessContext<GetTransactionResult> {
if let Some((remote_client, encoding)) = remote_ctx {
self.get_transaction_local_then_remote(remote_client, signature, *encoding)
.await
} else {
self.get_transaction_local(signature)
}
}
pub fn get_transaction_local(
&self,
signature: &Signature,
) -> SvmAccessContext<GetTransactionResult> {
self.with_contextualized_svm_reader(|svm_reader| {
let latest_absolute_slot = svm_reader.get_latest_absolute_slot();
match svm_reader.transactions.get(signature).map(|entry| {
Into::<EncodedConfirmedTransactionWithStatusMeta>::into(
entry.expect_processed().clone(),
)
}) {
Some(tx) => {
GetTransactionResult::found_transaction(*signature, tx, latest_absolute_slot)
}
None => GetTransactionResult::None(*signature),
}
})
}
pub async fn get_transaction_local_then_remote(
&self,
client: &SurfnetRemoteClient,
signature: &Signature,
encoding: Option<UiTransactionEncoding>,
) -> SvmAccessContext<GetTransactionResult> {
let local_result = self.get_transaction_local(signature);
if local_result.inner.is_none() {
let remote_result = client
.get_transaction(*signature, encoding, local_result.slot)
.await;
local_result.with_new_value(remote_result)
} else {
local_result
}
}
}
impl SurfnetSvmLocker {
pub fn simulate_transaction(
&self,
transaction: VersionedTransaction,
) -> Result<SimulatedTransactionInfo, FailedTransactionMetadata> {
self.with_svm_reader(|svm_reader| svm_reader.simulate_transaction(transaction.clone()))
}
pub async fn process_transaction(
&self,
remote_ctx: &Option<SurfnetRemoteClient>,
transaction: VersionedTransaction,
status_tx: Sender<TransactionStatusEvent>,
skip_preflight: bool,
) -> SurfpoolContextualizedResult<()> {
let remote_ctx = &remote_ctx.get_remote_ctx(CommitmentConfig::confirmed());
let (latest_absolute_slot, latest_epoch_info, latest_blockhash) =
self.with_svm_writer(|svm_writer| {
let latest_absolute_slot = svm_writer.get_latest_absolute_slot();
svm_writer.notify_signature_subscribers(
SignatureSubscriptionType::received(),
&transaction.signatures[0],
latest_absolute_slot,
None,
);
(
latest_absolute_slot,
svm_writer.latest_epoch_info(),
svm_writer.latest_blockhash(),
)
});
{
if transaction
.verify_with_results()
.iter()
.any(|valid| !*valid)
{
return Ok(self.with_contextualized_svm_reader(|svm_reader| {
svm_reader
.notify_invalid_transaction(transaction.signatures[0], status_tx.clone());
}));
}
}
let signature = transaction.signatures[0];
let accounts = self
.get_pubkeys_from_message(remote_ctx, &transaction.message)
.await?;
let account_updates = self
.get_multiple_accounts(remote_ctx, &accounts, None)
.await?
.inner;
self.with_svm_writer(|svm_writer| {
for update in &account_updates {
svm_writer.write_account_update(update.clone());
}
if !skip_preflight {
match svm_writer.simulate_transaction(transaction.clone()) {
Ok(_) => {}
Err(res) => {
let _ = svm_writer
.simnet_events_tx
.try_send(SimnetEvent::error(format!(
"Transaction simulation failed: {}",
res.err
)));
let meta = convert_transaction_metadata_from_canonical(&res.meta);
let _ = status_tx.try_send(TransactionStatusEvent::SimulationFailure((
res.err.clone(),
meta,
)));
svm_writer.notify_signature_subscribers(
SignatureSubscriptionType::processed(),
&signature,
latest_absolute_slot,
Some(res.err),
);
return;
}
}
}
let err = match svm_writer
.send_transaction(transaction.clone(), false )
{
Ok(res) => {
let transaction_meta = convert_transaction_metadata_from_canonical(&res);
let _ = svm_writer
.geyser_events_tx
.send(GeyserEvent::NewTransaction(
transaction.clone(),
transaction_meta.clone(),
latest_absolute_slot,
));
let _ = status_tx.try_send(TransactionStatusEvent::Success(
TransactionConfirmationStatus::Processed,
));
svm_writer
.transactions_queued_for_confirmation
.push_back((transaction.clone(), status_tx.clone()));
None
}
Err(res) => {
let transaction_meta = convert_transaction_metadata_from_canonical(&res.meta);
let _ = svm_writer
.simnet_events_tx
.try_send(SimnetEvent::error(format!(
"Transaction execution failed: {}",
res.err
)));
let _ = status_tx.try_send(TransactionStatusEvent::ExecutionFailure((
res.err.clone(),
transaction_meta,
)));
Some(res.err)
}
};
svm_writer.notify_signature_subscribers(
SignatureSubscriptionType::processed(),
&signature,
latest_absolute_slot,
err,
);
});
Ok(SvmAccessContext::new(
latest_absolute_slot,
latest_epoch_info,
latest_blockhash,
(),
))
}
}
impl SurfnetSvmLocker {
pub fn write_account_update(&self, account_update: GetAccountResult) {
if !account_update.requires_update() {
return;
}
self.with_svm_writer(move |svm_writer| {
svm_writer.write_account_update(account_update.clone())
})
}
pub fn write_multiple_account_updates(&self, account_updates: &[GetAccountResult]) {
if account_updates
.iter()
.all(|update| !update.requires_update())
{
return;
}
self.with_svm_writer(move |svm_writer| {
for update in account_updates {
svm_writer.write_account_update(update.clone());
}
});
}
}
impl SurfnetSvmLocker {
pub async fn get_all_token_accounts(
&self,
remote_ctx: &Option<SurfnetRemoteClient>,
owner: Pubkey,
token_program: Pubkey,
) -> SurfpoolContextualizedResult<(Vec<RpcKeyedAccount>, Vec<Pubkey>)> {
let keyed_accounts = if let Some(remote_client) = remote_ctx {
remote_client
.get_token_accounts_by_owner(owner, token_program)
.await?
} else {
vec![]
};
let token_account_pubkeys = keyed_accounts
.iter()
.map(|a| Pubkey::from_str_const(&a.pubkey))
.collect::<Vec<_>>();
let local_accounts = self.get_multiple_accounts_local(&token_account_pubkeys);
let missing_pubkeys = local_accounts
.inner
.iter()
.filter_map(|some_account_result| match &some_account_result {
GetAccountResult::None(pubkey) => Some(*pubkey),
_ => None,
})
.collect::<Vec<_>>();
Ok(local_accounts.with_new_value((keyed_accounts, missing_pubkeys)))
}
}
impl SurfnetSvmLocker {
pub async fn get_pubkeys_from_message(
&self,
remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>,
message: &VersionedMessage,
) -> SurfpoolResult<Vec<Pubkey>> {
match message {
VersionedMessage::Legacy(message) => Ok(message.account_keys.clone()),
VersionedMessage::V0(message) => {
let alts = message.address_table_lookups.clone();
let mut acc_keys = message.account_keys.clone();
let mut alt_pubkeys = alts.iter().map(|msg| msg.account_key).collect::<Vec<_>>();
let mut table_entries = join_all(alts.iter().map(|msg| async {
let loaded_addresses = self
.get_lookup_table_addresses(remote_ctx, msg)
.await?
.inner;
let mut combined = loaded_addresses.writable;
combined.extend(loaded_addresses.readonly);
Ok::<_, SurfpoolError>(combined)
}))
.await
.into_iter()
.collect::<Result<Vec<Vec<Pubkey>>, SurfpoolError>>()?
.into_iter()
.flatten()
.collect();
acc_keys.append(&mut alt_pubkeys);
acc_keys.append(&mut table_entries);
Ok(acc_keys)
}
}
}
pub async fn get_lookup_table_addresses(
&self,
remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>,
address_table_lookup: &MessageAddressTableLookup,
) -> SurfpoolContextualizedResult<LoadedAddresses> {
let result = self
.get_account(remote_ctx, &address_table_lookup.account_key, None)
.await?;
let table_account = result.inner.clone().map_account()?;
if table_account.owner == solana_sdk_ids::address_lookup_table::id() {
let SvmAccessContext {
slot: current_slot,
inner: slot_hashes,
..
} = self.with_contextualized_svm_reader(|svm_reader| {
svm_reader
.inner
.get_sysvar::<solana_sdk::sysvar::slot_hashes::SlotHashes>()
});
let data = &table_account.data.clone();
let lookup_table = AddressLookupTable::deserialize(data).map_err(|_ix_err| {
SurfpoolError::invalid_account_data(
address_table_lookup.account_key,
table_account.data,
Some("Attempted to lookup addresses from an invalid account"),
)
})?;
let loaded_addresses = LoadedAddresses {
writable: lookup_table
.lookup(
current_slot,
&address_table_lookup.writable_indexes,
&slot_hashes,
)
.map_err(|_ix_err| {
SurfpoolError::invalid_lookup_index(address_table_lookup.account_key)
})?,
readonly: lookup_table
.lookup(
current_slot,
&address_table_lookup.readonly_indexes,
&slot_hashes,
)
.map_err(|_ix_err| {
SurfpoolError::invalid_lookup_index(address_table_lookup.account_key)
})?,
};
Ok(result.with_new_value(loaded_addresses))
} else {
Err(SurfpoolError::invalid_account_owner(
table_account.owner,
Some("Attempted to lookup addresses from an account owned by the wrong program"),
))
}
}
}
impl SurfnetSvmLocker {
pub fn estimate_compute_units(
&self,
transaction: &VersionedTransaction,
) -> SvmAccessContext<ComputeUnitsEstimationResult> {
self.with_contextualized_svm_reader(|svm_reader| {
svm_reader.estimate_compute_units(transaction)
})
}
pub fn write_profiling_results(&self, tag: String, profile_result: ProfileResult) {
self.with_svm_writer(|svm_writer| {
svm_writer
.tagged_profiling_results
.entry(tag.clone())
.or_default()
.push(profile_result.clone());
let _ = svm_writer
.simnet_events_tx
.try_send(SimnetEvent::tagged_profile(
profile_result.clone(),
tag.clone(),
));
});
}
}
impl SurfnetSvmLocker {
pub async fn clone_program_account(
&self,
remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>,
source_program_id: &Pubkey,
destination_program_id: &Pubkey,
) -> SurfpoolContextualizedResult<()> {
let expected_source_program_data_address = get_program_data_address(source_program_id);
let result = self
.get_multiple_accounts(
remote_ctx,
&[*source_program_id, expected_source_program_data_address],
None,
)
.await?;
let mut accounts = result
.inner
.clone()
.into_iter()
.map(|a| a.map_account())
.collect::<SurfpoolResult<Vec<Account>>>()?;
let source_program_data_account = accounts.remove(1);
let source_program_account = accounts.remove(0);
let BpfUpgradeableLoaderAccountType::Program(UiProgram {
program_data: source_program_data_address,
}) = parse_bpf_upgradeable_loader(&source_program_account.data).map_err(|e| {
SurfpoolError::invalid_program_account(source_program_id, e.to_string())
})?
else {
return Err(SurfpoolError::expected_program_account(source_program_id));
};
if source_program_data_address.ne(&expected_source_program_data_address.to_string()) {
return Err(SurfpoolError::invalid_program_account(
source_program_id,
format!(
"Program data address mismatch: expected {}, found {}",
expected_source_program_data_address, source_program_data_address
),
));
}
let destination_program_data_address = get_program_data_address(destination_program_id);
let mut new_program_account = source_program_account;
new_program_account.data = bincode::serialize(&UpgradeableLoaderState::Program {
programdata_address: destination_program_data_address,
})
.map_err(|e| SurfpoolError::internal(format!("Failed to serialize program data: {}", e)))?;
self.with_svm_writer(|svm_writer| {
svm_writer.set_account(
&destination_program_data_address,
source_program_data_account.clone(),
)?;
svm_writer.set_account(destination_program_id, new_program_account.clone())?;
Ok::<(), SurfpoolError>(())
})?;
Ok(result.with_new_value(()))
}
}
impl SurfnetSvmLocker {
pub fn simnet_events_tx(&self) -> Sender<SimnetEvent> {
self.with_svm_reader(|svm_reader| svm_reader.simnet_events_tx.clone())
}
pub fn get_epoch_info(&self) -> EpochInfo {
self.with_svm_reader(|svm_reader| svm_reader.latest_epoch_info.clone())
}
pub fn get_latest_absolute_slot(&self) -> Slot {
self.with_svm_reader(|svm_reader| svm_reader.get_latest_absolute_slot())
}
pub fn airdrop(&self, pubkey: &Pubkey, lamports: u64) -> TransactionResult {
self.with_svm_writer(|svm_writer| svm_writer.airdrop(pubkey, lamports))
}
pub fn airdrop_pubkeys(&self, lamports: u64, addresses: &[Pubkey]) {
self.with_svm_writer(|svm_writer| svm_writer.airdrop_pubkeys(lamports, addresses))
}
pub fn confirm_current_block(&self) -> SurfpoolResult<()> {
self.with_svm_writer(|svm_writer| svm_writer.confirm_current_block())
}
pub fn subscribe_for_signature_updates(
&self,
signature: &Signature,
subscription_type: SignatureSubscriptionType,
) -> Receiver<(Slot, Option<TransactionError>)> {
self.with_svm_writer(|svm_writer| {
svm_writer.subscribe_for_signature_updates(signature, subscription_type.clone())
})
}
}