use solana_geyser_plugin_interface::geyser_plugin_interface::ReplicaAccountInfo;
use {
crate::{
config::PluginConfig,
events::AccountUpdateEvent,
executors::{tx::TxExecutor, webhook::WebhookExecutor, Executors},
observers::{
thread::ThreadObserver,
webhook::{HttpRequest, WebhookObserver},
Observers,
},
tpu_client::TpuClient,
utils::read_or_new_keypair,
},
clockwork_client::Client as ClockworkClient,
log::info,
solana_client::rpc_client::RpcClient,
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, ReplicaAccountInfoVersions, Result as PluginResult, SlotStatus,
},
solana_program::pubkey::Pubkey,
solana_sdk::commitment_config::CommitmentConfig,
std::{fmt::Debug, sync::Arc},
tokio::runtime::{Builder, Runtime},
};
static LOCAL_RPC_URL: &str = "http://127.0.0.1:8899";
static LOCAL_WEBSOCKET_URL: &str = "ws://127.0.0.1:8900";
#[derive(Debug)]
pub struct ClockworkPlugin {
pub config: PluginConfig,
pub executors: Option<Arc<Executors>>,
pub observers: Arc<Observers>,
pub runtime: Arc<Runtime>,
}
impl GeyserPlugin for ClockworkPlugin {
fn name(&self) -> &'static str {
"clockwork-plugin"
}
fn on_load(&mut self, config_file: &str) -> PluginResult<()> {
solana_logger::setup_with_default("info");
info!(
"clockwork-plugin crate-info - spec: {}, geyser_interface_version: {}",
env!("SPEC"),
env!("GEYSER_INTERFACE_VERSION")
);
info!("Loading snapshot...");
let config = PluginConfig::read_from(config_file)?;
let _guard = sentry::init((
config.clone().sentry_url,
sentry::ClientOptions {
release: sentry::release_name!(),
..Default::default()
},
));
*self = ClockworkPlugin::new_from_config(config);
Ok(())
}
fn on_unload(&mut self) {}
fn update_account(
&mut self,
account: ReplicaAccountInfoVersions,
slot: u64,
_is_startup: bool,
) -> PluginResult<()> {
let account_info = match account {
ReplicaAccountInfoVersions::V0_0_1(account_info) => account_info.clone(),
ReplicaAccountInfoVersions::V0_0_2(account_info) => ReplicaAccountInfo {
pubkey: account_info.pubkey,
lamports: account_info.lamports,
owner: account_info.owner,
executable: account_info.executable,
rent_epoch: account_info.rent_epoch,
data: account_info.data,
write_version: account_info.write_version,
},
};
let account_pubkey = Pubkey::new(account_info.clone().pubkey);
self.observers.thread.clone().observe_account(
account_pubkey,
account_info.clone(),
slot,
)?;
match AccountUpdateEvent::try_from(account_info) {
Ok(event) => match event {
AccountUpdateEvent::Clock { clock } => {
self.observers.thread.clone().observe_clock(clock)
}
AccountUpdateEvent::HttpRequest { request } => {
self.observers.webhook.clone().observe_request(HttpRequest {
pubkey: account_pubkey,
request,
})
}
AccountUpdateEvent::Thread { thread } => self
.observers
.thread
.clone()
.observe_thread(thread, account_pubkey, slot),
},
Err(_err) => Ok(()),
}
}
fn notify_end_of_startup(&mut self) -> PluginResult<()> {
info!("Snapshot loaded");
Ok(())
}
fn update_slot_status(
&mut self,
slot: u64,
_parent: Option<u64>,
status: solana_geyser_plugin_interface::geyser_plugin_interface::SlotStatus,
) -> PluginResult<()> {
if self.executors.is_none() {
self.try_build_executors()
}
match status {
SlotStatus::Processed => match &self.executors {
Some(executors) => {
info!(
"slot: {} executable_threads: {} cron_threads: {}",
slot,
self.observers.thread.executable_threads.len(),
self.observers.thread.cron_threads.len()
);
self.observers.thread.clone().observe_processed_slot(slot)?;
executors.clone().execute_work(slot)?;
}
None => (),
},
_ => (),
}
Ok(())
}
fn notify_transaction(
&mut self,
_transaction: solana_geyser_plugin_interface::geyser_plugin_interface::ReplicaTransactionInfoVersions,
_slot: u64,
) -> PluginResult<()> {
Ok(())
}
fn notify_block_metadata(
&mut self,
_blockinfo: solana_geyser_plugin_interface::geyser_plugin_interface::ReplicaBlockInfoVersions,
) -> PluginResult<()> {
Ok(())
}
fn account_data_notifications_enabled(&self) -> bool {
true
}
fn transaction_notifications_enabled(&self) -> bool {
false
}
}
impl ClockworkPlugin {
fn new_from_config(config: PluginConfig) -> Self {
let runtime = build_runtime(config.clone());
let thread_observer = Arc::new(ThreadObserver::new(config.clone(), runtime.clone()));
let webhook_observer = Arc::new(WebhookObserver::new(runtime.clone()));
Self {
config,
executors: None,
observers: Arc::new(Observers {
thread: thread_observer,
webhook: webhook_observer,
}),
runtime,
}
}
fn try_build_executors(&mut self) {
if RpcClient::new_with_commitment::<String>(
LOCAL_RPC_URL.into(),
CommitmentConfig::confirmed(),
)
.get_health()
.is_err()
{
return;
}
let clockwork_client = Arc::new(ClockworkClient::new(
read_or_new_keypair(self.config.clone().keypath),
LOCAL_RPC_URL.into(),
));
let tpu_client = Arc::new(
TpuClient::new(
read_or_new_keypair(self.config.clone().keypath),
LOCAL_RPC_URL.into(),
LOCAL_WEBSOCKET_URL.into(),
)
.unwrap(),
);
let webhook_executor = Arc::new(WebhookExecutor::new(
self.config.clone(),
self.observers.clone(),
self.runtime.clone(),
clockwork_client.payer_pubkey(),
));
let tx_executor = Arc::new(TxExecutor::new(
self.config.clone(),
clockwork_client.clone(),
self.observers.clone(),
self.runtime.clone(),
tpu_client.clone(),
));
self.executors = Some(Arc::new(Executors {
tx: tx_executor,
runtime: self.runtime.clone(),
webhook: webhook_executor,
}))
}
}
impl Default for ClockworkPlugin {
fn default() -> Self {
Self::new_from_config(PluginConfig::default())
}
}
fn build_runtime(config: PluginConfig) -> Arc<Runtime> {
Arc::new(
Builder::new_multi_thread()
.enable_all()
.thread_name("clockwork-plugin")
.worker_threads(config.thread_count)
.max_blocking_threads(config.thread_count)
.build()
.unwrap(),
)
}