use std::{
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use clockwork_client::{
network::state::{Pool, Registry, Snapshot, SnapshotFrame, Worker},
thread::state::Thread,
Client as ClockworkClient,
};
use dashmap::DashMap;
use log::info;
use rayon::prelude::*;
use solana_client::rpc_config::RpcSimulateTransactionConfig;
use solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPluginError, Result as PluginResult,
};
use solana_program::{hash::Hash, message::Message, pubkey::Pubkey};
use solana_sdk::{commitment_config::CommitmentConfig, transaction::Transaction};
use tokio::runtime::Runtime;
use crate::{
config::PluginConfig, observers::Observers, pool_position::PoolPosition, tpu_client::TpuClient,
};
static MESSAGE_DEDUPE_PERIOD: u64 = 10;
static THREAD_TIMEOUT_WINDOW: u64 = 10;
static MAX_THREAD_SIMULATION_FAILURES: u32 = 5;
pub struct TxExecutor {
pub config: PluginConfig,
pub client: Arc<ClockworkClient>, pub message_history: DashMap<Hash, u64>, pub observers: Arc<Observers>,
pub runtime: Arc<Runtime>,
pub tpu_client: Arc<TpuClient>,
pub simulation_failures: DashMap<Pubkey, u32>,
pub is_locked: AtomicBool,
}
impl TxExecutor {
pub fn new(
config: PluginConfig,
client: Arc<ClockworkClient>,
observers: Arc<Observers>,
runtime: Arc<Runtime>,
tpu_client: Arc<TpuClient>,
) -> Self {
Self {
config: config.clone(),
client,
message_history: DashMap::new(),
observers,
runtime,
tpu_client,
simulation_failures: DashMap::new(),
is_locked: AtomicBool::new(false),
}
}
pub fn execute_txs(self: Arc<Self>, slot: u64) -> PluginResult<()> {
self.spawn(|this| async move {
if this
.clone()
.is_locked
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return Ok(());
}
this.clone()
.simulation_failures
.retain(|thread_pubkey, failures| {
if *failures >= MAX_THREAD_SIMULATION_FAILURES {
this.observers
.thread
.executable_threads
.remove(thread_pubkey);
false
} else {
true
}
});
this.clone()
.message_history
.retain(|_msg_hash, msg_slot| *msg_slot >= slot - MESSAGE_DEDUPE_PERIOD);
let worker_pubkey = Worker::pubkey(this.config.worker_id);
if let Ok(pool_position) = this.client.get::<Pool>(&Pool::pubkey(0)).map(|pool| {
let workers = &mut pool.workers.clone();
PoolPosition {
current_position: pool
.workers
.iter()
.position(|k| k.eq(&worker_pubkey))
.map(|i| i as u64),
workers: workers.make_contiguous().to_vec().clone(),
}
}) {
this.clone()
.execute_pool_rotate_txs(slot, pool_position.clone())
.await
.ok();
this.clone()
.execute_thread_exec_txs(slot, pool_position)
.await
.ok();
}
this.clone()
.is_locked
.store(false, std::sync::atomic::Ordering::Relaxed);
Ok(())
})
}
async fn execute_pool_rotate_txs(
self: Arc<Self>,
slot: u64,
pool_position: PoolPosition,
) -> PluginResult<()> {
let registry = self.client.get::<Registry>(&Registry::pubkey()).unwrap();
let snapshot_pubkey = Snapshot::pubkey(registry.current_epoch);
let snapshot_frame_pubkey = SnapshotFrame::pubkey(snapshot_pubkey, self.config.worker_id);
if let Ok(snapshot) = self.client.get::<Snapshot>(&snapshot_pubkey) {
if let Ok(snapshot_frame) = self.client.get::<SnapshotFrame>(&snapshot_frame_pubkey) {
match crate::builders::build_pool_rotation_tx(
self.client.clone(),
pool_position,
registry,
snapshot,
snapshot_frame,
self.config.worker_id,
) {
None => {}
Some(tx) => {
self.clone().execute_tx(slot, &tx).map_err(|err| err).ok();
}
};
}
}
Ok(())
}
async fn execute_thread_exec_txs(
self: Arc<Self>,
slot: u64,
pool_position: PoolPosition,
) -> PluginResult<()> {
if pool_position.current_position.is_none() && !pool_position.workers.is_empty() {
self.observers
.thread
.executable_threads
.par_iter()
.filter(|entry| slot > entry.value() + THREAD_TIMEOUT_WINDOW)
.filter_map(|entry| {
self.clone()
.try_build_thread_exec_tx(*entry.key())
.map(|tx| (tx, *entry.key()))
})
.for_each(|(tx, thread_pubkey)| {
self.clone().simulation_failures.remove(&thread_pubkey);
self.clone().execute_tx(slot, &tx).map_err(|err| err).ok();
});
return Ok(());
}
self.observers
.thread
.executable_threads
.par_iter()
.filter(|entry| {
let failure_count = self
.clone()
.simulation_failures
.get(entry.key())
.map(|e| *e.value())
.unwrap_or(0);
let backoff = ((failure_count * 3) as u64) + entry.value();
slot >= backoff
})
.filter_map(|entry| {
self.clone()
.try_build_thread_exec_tx(*entry.key())
.map(|tx| (tx, *entry.key()))
})
.for_each(|(tx, thread_pubkey)| {
self.clone().simulation_failures.remove(&thread_pubkey);
self.clone().execute_tx(slot, &tx).map_err(|err| err).ok();
});
Ok(())
}
pub fn try_build_thread_exec_tx(self: Arc<Self>, thread_pubkey: Pubkey) -> Option<Transaction> {
let thread = match self.client.clone().get::<Thread>(&thread_pubkey) {
Err(_err) => return None,
Ok(thread) => thread,
};
crate::builders::build_thread_exec_tx(
self.client.clone(),
thread.clone(),
thread_pubkey,
self.config.worker_id,
)
.or_else(|| {
self.clone()
.simulation_failures
.entry(thread_pubkey)
.and_modify(|v| *v += 1)
.or_insert(1);
None
})
}
fn execute_tx(self: Arc<Self>, slot: u64, tx: &Transaction) -> PluginResult<()> {
if let Some(entry) = self
.message_history
.get(&tx.message().blockhash_agnostic_hash())
{
let msg_slot = entry.value();
if slot < msg_slot + MESSAGE_DEDUPE_PERIOD {
return Ok(());
}
}
self.clone()
.simulate_tx(tx)
.and_then(|tx| self.clone().submit_tx(&tx))
.and_then(|tx| self.log_tx(slot, tx))
}
fn simulate_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
self.tpu_client
.rpc_client()
.simulate_transaction_with_config(
tx,
RpcSimulateTransactionConfig {
replace_recent_blockhash: true,
commitment: Some(CommitmentConfig::processed()),
..RpcSimulateTransactionConfig::default()
},
)
.map_err(|err| {
GeyserPluginError::Custom(format!("Tx failed simulation: {}", err).into())
})
.map(|response| match response.value.err {
None => Ok(tx.clone()),
Some(err) => Err(GeyserPluginError::Custom(
format!(
"Tx failed simulation: {} Logs: {:#?}",
err, response.value.logs
)
.into(),
)),
})?
}
fn submit_tx(self: Arc<Self>, tx: &Transaction) -> PluginResult<Transaction> {
if !self.tpu_client.send_transaction(tx) {
return Err(GeyserPluginError::Custom(
"Failed to send transaction".into(),
));
}
Ok(tx.clone())
}
fn log_tx(self: Arc<Self>, slot: u64, tx: Transaction) -> PluginResult<()> {
self.message_history
.insert(tx.message().blockhash_agnostic_hash(), slot);
let sig = tx.signatures[0];
info!("slot: {} sig: {}", slot, sig);
Ok(())
}
fn spawn<F: std::future::Future<Output = PluginResult<()>> + Send + 'static>(
self: &Arc<Self>,
f: impl FnOnce(Arc<Self>) -> F,
) -> PluginResult<()> {
self.runtime.spawn(f(self.clone()));
Ok(())
}
}
impl Debug for TxExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "tx-executor")
}
}
trait BlockhashAgnosticHash {
fn blockhash_agnostic_hash(&self) -> Hash;
}
impl BlockhashAgnosticHash for Message {
fn blockhash_agnostic_hash(&self) -> Hash {
Message {
header: self.header.clone(),
account_keys: self.account_keys.clone(),
recent_blockhash: Hash::default(),
instructions: self.instructions.clone(),
}
.hash()
}
}