#![allow(clippy::missing_docs_in_private_items)]
use super::dispatch::{
ClassifiedAccountTouchDispatch, ClassifiedTransactionBatchDispatch,
ClassifiedTransactionDispatch, ClassifiedTransactionViewBatchDispatch, PluginDispatchEvent,
PluginDispatcher, SelectedAccountTouchDispatch, SelectedAccountUpdateDispatch,
SelectedBlockMetaDispatch, SelectedTransactionLogDispatch, SelectedTransactionStatusDispatch,
TransactionDispatchPriority, TransactionDispatchQueueMetrics, TransactionPluginDispatcher,
};
use super::state::{ObservedRecentBlockhashState, ObservedTpuLeaderState};
use super::*;
use crate::framework::PluginContext;
use crate::framework::events::AccountTouchEventRef;
use crate::framework::events::TransactionEventRef;
use crate::framework::pubkey_bytes;
use crate::framework::{
AccountTouchEvent, AccountUpdateEvent, BlockMetaEvent, TransactionStatusEvent,
};
use agave_transaction_view::{
transaction_data::TransactionData, transaction_view::SanitizedTransactionView,
};
use std::time::Instant;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum TransactionDispatchScope {
All,
InlineOnly,
DeferredOnly,
}
impl TransactionDispatchScope {
const fn includes(self, inline_requested: bool) -> bool {
match self {
Self::All => true,
Self::InlineOnly => inline_requested,
Self::DeferredOnly => !inline_requested,
}
}
}
pub(crate) struct PrefilteredTransactionDispatch {
pub(crate) dispatch: ClassifiedTransactionDispatch,
pub(crate) needs_full_classification: bool,
}
impl PrefilteredTransactionDispatch {
const fn empty() -> Self {
Self {
dispatch: ClassifiedTransactionDispatch::empty(),
needs_full_classification: false,
}
}
}
#[derive(Clone)]
pub struct PluginHost {
pub(super) plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) transaction_plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) transaction_plugin_commitments:
Arc<[crate::framework::plugin::TransactionCommitmentSelector]>,
pub(super) transaction_log_plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) transaction_log_plugin_commitments:
Arc<[crate::framework::plugin::TransactionCommitmentSelector]>,
pub(super) transaction_status_plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) transaction_status_plugin_commitments:
Arc<[crate::framework::plugin::TransactionCommitmentSelector]>,
pub(super) transaction_plugin_inline_preferences: Arc<[bool]>,
pub(super) transaction_plugin_prefilters: Arc<[Option<crate::framework::TransactionPrefilter>]>,
pub(super) transaction_prefilter_enabled_at_processed: bool,
pub(super) transaction_prefilter_enabled_at_confirmed: bool,
pub(super) transaction_prefilter_enabled_at_finalized: bool,
pub(super) transaction_batch_plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) transaction_batch_plugin_commitments:
Arc<[crate::framework::plugin::TransactionCommitmentSelector]>,
pub(super) transaction_batch_plugin_inline_preferences: Arc<[bool]>,
pub(super) transaction_view_batch_plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) transaction_view_batch_plugin_commitments:
Arc<[crate::framework::plugin::TransactionCommitmentSelector]>,
pub(super) transaction_view_batch_plugin_inline_preferences: Arc<[bool]>,
pub(super) account_touch_plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) account_update_plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) block_meta_plugins: Arc<[Arc<dyn ObserverPlugin>]>,
pub(super) dispatcher: Option<PluginDispatcher>,
pub(super) transaction_dispatcher: Option<TransactionPluginDispatcher>,
pub(super) subscriptions: PluginHookSubscriptions,
pub(super) latest_observed_recent_blockhash: ArcShift<Option<ObservedRecentBlockhashState>>,
pub(super) latest_observed_tpu_leader: ArcShift<Option<ObservedTpuLeaderState>>,
pub(super) lifecycle: Arc<PluginHostLifecycleState>,
}
impl Default for PluginHost {
fn default() -> Self {
Self {
plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
transaction_plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
transaction_plugin_commitments: Arc::from(Vec::<
crate::framework::plugin::TransactionCommitmentSelector,
>::new()),
transaction_log_plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
transaction_log_plugin_commitments: Arc::from(Vec::<
crate::framework::plugin::TransactionCommitmentSelector,
>::new()),
transaction_status_plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
transaction_status_plugin_commitments: Arc::from(Vec::<
crate::framework::plugin::TransactionCommitmentSelector,
>::new()),
transaction_plugin_inline_preferences: Arc::from(Vec::<bool>::new()),
transaction_plugin_prefilters: Arc::from(Vec::<
Option<crate::framework::TransactionPrefilter>,
>::new()),
transaction_prefilter_enabled_at_processed: false,
transaction_prefilter_enabled_at_confirmed: false,
transaction_prefilter_enabled_at_finalized: false,
transaction_batch_plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
transaction_batch_plugin_commitments: Arc::from(Vec::<
crate::framework::plugin::TransactionCommitmentSelector,
>::new()),
transaction_batch_plugin_inline_preferences: Arc::from(Vec::<bool>::new()),
transaction_view_batch_plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
transaction_view_batch_plugin_commitments: Arc::from(Vec::<
crate::framework::plugin::TransactionCommitmentSelector,
>::new()),
transaction_view_batch_plugin_inline_preferences: Arc::from(Vec::<bool>::new()),
account_touch_plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
account_update_plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
block_meta_plugins: Arc::from(Vec::<Arc<dyn ObserverPlugin>>::new()),
dispatcher: None,
transaction_dispatcher: None,
subscriptions: PluginHookSubscriptions::default(),
latest_observed_recent_blockhash: ArcShift::new(None),
latest_observed_tpu_leader: ArcShift::new(None),
lifecycle: Arc::new(PluginHostLifecycleState::default()),
}
}
}
impl PluginHost {
#[must_use]
pub fn builder() -> PluginHostBuilder {
PluginHostBuilder::new()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.plugins.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.plugins.len()
}
#[must_use]
pub const fn wants_dataset(&self) -> bool {
self.subscriptions.dataset
}
#[must_use]
pub const fn wants_transaction(&self) -> bool {
self.subscriptions.transaction
}
#[must_use]
pub(crate) fn transaction_enabled_at_commitment(
&self,
commitment_status: crate::event::TxCommitmentStatus,
) -> bool {
self.subscriptions.transaction
&& commitment_status.satisfies_minimum(self.subscriptions.transaction_min_commitment)
}
#[must_use]
pub const fn has_transaction_prefilter(&self) -> bool {
self.subscriptions.transaction_prefilter
}
#[must_use]
pub(crate) const fn has_transaction_prefilter_at_commitment(
&self,
commitment_status: crate::event::TxCommitmentStatus,
) -> bool {
match commitment_status {
crate::event::TxCommitmentStatus::Processed => {
self.transaction_prefilter_enabled_at_processed
}
crate::event::TxCommitmentStatus::Confirmed => {
self.transaction_prefilter_enabled_at_confirmed
}
crate::event::TxCommitmentStatus::Finalized => {
self.transaction_prefilter_enabled_at_finalized
}
}
}
#[must_use]
pub const fn wants_transaction_log(&self) -> bool {
self.subscriptions.transaction_log
}
#[must_use]
pub const fn wants_transaction_status(&self) -> bool {
self.subscriptions.transaction_status
}
#[must_use]
pub const fn wants_inline_transaction_dispatch(&self) -> bool {
self.subscriptions.inline_transaction
}
#[must_use]
pub fn wants_deferred_transaction_dispatch(&self) -> bool {
self.transaction_plugin_inline_preferences
.iter()
.any(|inline_requested| !*inline_requested)
}
#[must_use]
pub const fn wants_transaction_batch(&self) -> bool {
self.subscriptions.transaction_batch
}
#[must_use]
pub const fn wants_inline_transaction_batch_dispatch(&self) -> bool {
self.subscriptions.inline_transaction_batch
}
#[must_use]
pub const fn wants_transaction_view_batch(&self) -> bool {
self.subscriptions.transaction_view_batch
}
#[must_use]
pub const fn wants_inline_transaction_view_batch_dispatch(&self) -> bool {
self.subscriptions.inline_transaction_view_batch
}
#[must_use]
pub const fn wants_account_touch(&self) -> bool {
self.subscriptions.account_touch
}
#[must_use]
pub const fn wants_account_update(&self) -> bool {
self.subscriptions.account_update
}
#[must_use]
pub const fn wants_block_meta(&self) -> bool {
self.subscriptions.block_meta
}
#[must_use]
pub const fn wants_recent_blockhash(&self) -> bool {
self.subscriptions.recent_blockhash
}
#[must_use]
pub const fn wants_raw_packet(&self) -> bool {
self.subscriptions.raw_packet
}
#[must_use]
pub const fn wants_shred(&self) -> bool {
self.subscriptions.shred
}
#[must_use]
pub const fn wants_cluster_topology(&self) -> bool {
self.subscriptions.cluster_topology
}
#[must_use]
pub const fn wants_leader_schedule(&self) -> bool {
self.subscriptions.leader_schedule
}
#[must_use]
pub const fn wants_slot_status(&self) -> bool {
self.subscriptions.slot_status
}
#[must_use]
pub const fn wants_reorg(&self) -> bool {
self.subscriptions.reorg
}
#[must_use]
pub fn general_queue_depth(&self) -> u64 {
self.dispatcher
.as_ref()
.map_or(0, PluginDispatcher::queue_depth)
}
#[must_use]
pub fn general_max_queue_depth(&self) -> u64 {
self.dispatcher
.as_ref()
.map_or(0, PluginDispatcher::max_queue_depth)
}
#[must_use]
pub fn dropped_event_count(&self) -> u64 {
self.general_dropped_event_count()
.saturating_add(self.transaction_dropped_event_count())
.saturating_add(self.background_transaction_dropped_event_count())
}
#[must_use]
pub fn general_dropped_event_count(&self) -> u64 {
self.dispatcher
.as_ref()
.map_or(0, PluginDispatcher::dropped_count)
}
#[must_use]
pub fn transaction_dropped_event_count(&self) -> u64 {
self.transaction_dispatcher
.as_ref()
.map_or(0, TransactionPluginDispatcher::critical_dropped_count)
}
#[must_use]
pub fn background_transaction_dropped_event_count(&self) -> u64 {
self.transaction_dispatcher
.as_ref()
.map_or(0, TransactionPluginDispatcher::background_dropped_count)
}
#[must_use]
pub(crate) fn transaction_queue_metrics(&self) -> TransactionDispatchQueueMetrics {
self.transaction_dispatcher
.as_ref()
.map_or_else(TransactionDispatchQueueMetrics::default, |dispatcher| {
dispatcher.queue_metrics()
})
}
#[must_use]
pub fn plugin_names(&self) -> Vec<&'static str> {
self.plugins.iter().map(|plugin| plugin.name()).collect()
}
pub async fn startup(&self) -> Result<(), PluginHostStartupError> {
if self
.lifecycle
.started
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Ok(());
}
let mut started_plugins: Vec<Arc<dyn ObserverPlugin>> =
Vec::with_capacity(self.plugins.len());
for plugin in self.plugins.iter() {
let plugin_name = plugin.name();
let context = PluginContext { plugin_name };
if let Err(error) = plugin.setup(context.clone()).await {
for started_plugin in started_plugins.iter().rev() {
started_plugin
.shutdown(PluginContext {
plugin_name: started_plugin.name(),
})
.await;
}
self.lifecycle.started.store(false, Ordering::Release);
return Err(PluginHostStartupError {
plugin: plugin_name,
reason: error.to_string(),
});
}
started_plugins.push(Arc::clone(plugin));
}
Ok(())
}
pub async fn shutdown(&self) {
if self
.lifecycle
.started
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
for plugin in self.plugins.iter().rev() {
plugin
.shutdown(PluginContext {
plugin_name: plugin.name(),
})
.await;
}
}
#[must_use]
pub fn latest_observed_recent_blockhash(&self) -> Option<(u64, [u8; 32])> {
self.latest_observed_recent_blockhash
.shared_get()
.as_ref()
.map(|state| (state.slot, state.recent_blockhash))
}
#[must_use]
pub fn latest_observed_tpu_leader(&self) -> Option<LeaderScheduleEntry> {
self.latest_observed_tpu_leader
.shared_get()
.as_ref()
.map(|state| LeaderScheduleEntry {
slot: state.slot,
leader: pubkey_bytes(state.leader),
})
}
#[must_use]
pub(crate) fn classify_transaction_ref(
&self,
event: TransactionEventRef<'_>,
) -> ClassifiedTransactionDispatch {
self.classify_transaction_ref_in_scope(event, TransactionDispatchScope::All)
}
#[must_use]
pub(crate) fn classify_transaction_ref_in_scope(
&self,
event: TransactionEventRef<'_>,
scope: TransactionDispatchScope,
) -> ClassifiedTransactionDispatch {
if !self.wants_transaction_dispatch_in_scope(scope) || self.transaction_dispatcher.is_none()
{
return ClassifiedTransactionDispatch::empty();
}
let mut dispatch = ClassifiedTransactionDispatch::empty();
for (((plugin, inline_requested), prefilter), commitment_selector) in self
.transaction_plugins
.iter()
.zip(self.transaction_plugin_inline_preferences.iter().copied())
.zip(self.transaction_plugin_prefilters.iter())
.zip(self.transaction_plugin_commitments.iter().copied())
{
if !scope.includes(inline_requested) {
continue;
}
if !commitment_selector.matches(event.commitment_status) {
continue;
}
let interest = prefilter.as_ref().map_or_else(
|| plugin.transaction_interest_ref(&event),
|filter| filter.classify_ref(&event),
);
if interest != crate::framework::TransactionInterest::Ignore {
dispatch.push(interest, inline_requested, Arc::clone(plugin));
}
}
dispatch
}
#[must_use]
pub(crate) fn classify_transaction_view_in_scope<D: TransactionData>(
&self,
view: &SanitizedTransactionView<D>,
commitment_status: crate::event::TxCommitmentStatus,
scope: TransactionDispatchScope,
) -> PrefilteredTransactionDispatch {
if !self.wants_transaction_dispatch_in_scope(scope) || self.transaction_dispatcher.is_none()
{
return PrefilteredTransactionDispatch::empty();
}
let mut dispatch = ClassifiedTransactionDispatch::empty();
let mut needs_full_classification = false;
for (((plugin, inline_requested), prefilter), commitment_selector) in self
.transaction_plugins
.iter()
.zip(self.transaction_plugin_inline_preferences.iter().copied())
.zip(self.transaction_plugin_prefilters.iter())
.zip(self.transaction_plugin_commitments.iter().copied())
{
if !scope.includes(inline_requested) {
continue;
}
if !commitment_selector.matches(commitment_status) {
continue;
}
if let Some(filter) = prefilter.as_ref() {
let interest = filter.classify_view_ref(view);
if interest != crate::framework::TransactionInterest::Ignore {
dispatch.push(interest, inline_requested, Arc::clone(plugin));
}
} else {
needs_full_classification = true;
}
}
PrefilteredTransactionDispatch {
dispatch,
needs_full_classification,
}
}
#[must_use]
pub(crate) fn wants_transaction_dispatch_in_scope(
&self,
scope: TransactionDispatchScope,
) -> bool {
match scope {
TransactionDispatchScope::All => self.subscriptions.transaction,
TransactionDispatchScope::InlineOnly => self.subscriptions.inline_transaction,
TransactionDispatchScope::DeferredOnly => self.wants_deferred_transaction_dispatch(),
}
}
#[expect(
clippy::too_many_arguments,
reason = "hot-path dispatch keeps scalar timing and dataset metadata explicit"
)]
pub(crate) fn on_classified_transaction(
&self,
dispatch: ClassifiedTransactionDispatch,
event: TransactionEvent,
completed_at: Instant,
first_shred_observed_at: Instant,
last_shred_observed_at: Instant,
inline_source: InlineTransactionDispatchSource,
dataset_tx_count: u32,
dataset_tx_position: u32,
) {
if !self.subscriptions.transaction {
return;
}
let Some(dispatcher) = &self.transaction_dispatcher else {
return;
};
let (critical_inline, critical, background) = dispatch.into_dispatches(
event,
completed_at,
first_shred_observed_at,
last_shred_observed_at,
inline_source,
dataset_tx_count,
dataset_tx_position,
);
if let Some(dispatch_event) = critical_inline {
dispatcher.dispatch_inline_critical(dispatch_event);
}
if let Some(dispatch_event) = critical {
dispatcher.dispatch(TransactionDispatchPriority::Critical, dispatch_event);
}
if let Some(dispatch_event) = background {
dispatcher.dispatch(TransactionDispatchPriority::Background, dispatch_event);
}
}
pub(crate) fn on_transaction_batch(&self, event: TransactionBatchEvent, completed_at: Instant) {
if !self.subscriptions.transaction_batch {
return;
}
let Some(dispatcher) = &self.dispatcher else {
return;
};
let dispatch = ClassifiedTransactionBatchDispatch::from_plugins(
&self.transaction_batch_plugins,
&self.transaction_batch_plugin_commitments,
&self.transaction_batch_plugin_inline_preferences,
event,
completed_at,
);
if let Some(dispatch_event) = dispatch {
dispatcher.dispatch(PluginDispatchEvent::TransactionBatch(dispatch_event));
}
}
pub(crate) fn on_transaction_view_batch(
&self,
event: crate::framework::TransactionViewBatchEvent,
completed_at: Instant,
) {
if !self.subscriptions.transaction_view_batch {
return;
}
let Some(dispatcher) = &self.dispatcher else {
return;
};
let dispatch = ClassifiedTransactionViewBatchDispatch::from_plugins(
&self.transaction_view_batch_plugins,
&self.transaction_view_batch_plugin_commitments,
&self.transaction_view_batch_plugin_inline_preferences,
event,
completed_at,
);
if let Some(dispatch_event) = dispatch {
dispatcher.dispatch(PluginDispatchEvent::TransactionViewBatch(dispatch_event));
}
}
#[must_use]
pub(crate) fn classify_account_touch_ref(
&self,
event: AccountTouchEventRef<'_>,
) -> ClassifiedAccountTouchDispatch {
if !self.subscriptions.account_touch || self.dispatcher.is_none() {
return ClassifiedAccountTouchDispatch::empty();
}
let mut selected = ClassifiedAccountTouchDispatch::empty();
for plugin in self.account_touch_plugins.iter() {
if plugin.accepts_account_touch_ref(&event) {
selected.push(Arc::clone(plugin));
}
}
selected
}
pub(crate) fn on_selected_account_touch(
&self,
dispatch: ClassifiedAccountTouchDispatch,
event: AccountTouchEvent,
) {
if !self.subscriptions.account_touch {
return;
}
if let Some(dispatcher) = &self.dispatcher
&& let Some(event) = SelectedAccountTouchDispatch::from_classified(dispatch, event)
{
dispatcher.dispatch(PluginDispatchEvent::SelectedAccountTouch(event));
}
}
pub fn on_raw_packet(&self, event: RawPacketEvent) {
if self.subscriptions.raw_packet
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::RawPacket(event));
}
}
pub fn on_shred(&self, event: ShredEvent) {
if self.subscriptions.shred
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::Shred(event));
}
}
pub fn on_dataset(&self, event: DatasetEvent) {
if self.subscriptions.dataset
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::Dataset(event));
}
}
pub fn on_transaction_log(&self, event: crate::framework::TransactionLogEvent) {
if !self.subscriptions.transaction_log {
return;
}
let Some(dispatcher) = &self.dispatcher else {
return;
};
if let Some(dispatch) = SelectedTransactionLogDispatch::from_plugins(
&self.transaction_log_plugins,
&self.transaction_log_plugin_commitments,
event,
) {
dispatcher.dispatch(PluginDispatchEvent::SelectedTransactionLog(dispatch));
}
}
pub fn on_transaction_status(&self, event: TransactionStatusEvent) {
if !self.subscriptions.transaction_status {
return;
}
let Some(dispatcher) = &self.dispatcher else {
return;
};
if let Some(dispatch) = SelectedTransactionStatusDispatch::from_plugins(
&self.transaction_status_plugins,
&self.transaction_status_plugin_commitments,
event,
) {
dispatcher.dispatch(PluginDispatchEvent::SelectedTransactionStatus(dispatch));
}
}
pub fn on_transaction(&self, event: TransactionEvent) {
let dispatch = self.classify_transaction_ref(TransactionEventRef {
slot: event.slot,
commitment_status: event.commitment_status,
confirmed_slot: event.confirmed_slot,
finalized_slot: event.finalized_slot,
signature: event.signature,
tx: event.tx.as_ref(),
kind: event.kind,
});
if dispatch.is_empty() {
return;
}
let now = Instant::now();
self.on_classified_transaction(
dispatch,
event,
now,
now,
now,
InlineTransactionDispatchSource::CompletedDatasetFallback,
1,
0,
);
}
pub fn on_account_touch(&self, event: AccountTouchEvent) {
if self.subscriptions.account_touch
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::AccountTouch(Arc::new(event)));
}
}
pub fn on_account_update(&self, event: AccountUpdateEvent) {
if !self.subscriptions.account_update {
return;
}
let Some(dispatcher) = &self.dispatcher else {
return;
};
if let Some(dispatch) =
SelectedAccountUpdateDispatch::from_plugins(&self.account_update_plugins, event)
{
dispatcher.dispatch(PluginDispatchEvent::SelectedAccountUpdate(dispatch));
}
}
pub fn on_block_meta(&self, event: BlockMetaEvent) {
if !self.subscriptions.block_meta {
return;
}
let Some(dispatcher) = &self.dispatcher else {
return;
};
if let Some(dispatch) =
SelectedBlockMetaDispatch::from_plugins(&self.block_meta_plugins, event)
{
dispatcher.dispatch(PluginDispatchEvent::SelectedBlockMeta(dispatch));
}
}
pub fn on_slot_status(&self, event: SlotStatusEvent) {
if self.subscriptions.slot_status
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::SlotStatus(event));
}
}
pub fn on_reorg(&self, event: ReorgEvent) {
if self.subscriptions.reorg
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::Reorg(event));
}
}
pub fn on_recent_blockhash(&self, event: ObservedRecentBlockhashEvent) {
let mut latest_observed_recent_blockhash = self.latest_observed_recent_blockhash.clone();
let mut next_state = *latest_observed_recent_blockhash.shared_get();
let (state_changed, should_emit) = match next_state.as_mut() {
None => {
next_state = Some(ObservedRecentBlockhashState {
slot: event.slot,
recent_blockhash: event.recent_blockhash,
});
(true, true)
}
Some(current) if event.slot < current.slot => (false, false),
Some(current)
if event.slot == current.slot
&& event.recent_blockhash == current.recent_blockhash =>
{
(false, false)
}
Some(current)
if event.slot > current.slot
&& event.recent_blockhash == current.recent_blockhash =>
{
current.slot = event.slot;
(true, false)
}
Some(current) => {
current.slot = event.slot;
current.recent_blockhash = event.recent_blockhash;
(true, true)
}
};
if state_changed {
latest_observed_recent_blockhash.update(next_state);
}
if should_emit
&& self.subscriptions.recent_blockhash
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::ObservedRecentBlockhash(event));
}
}
pub fn on_cluster_topology(&self, event: ClusterTopologyEvent) {
if self.subscriptions.cluster_topology
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::ClusterTopology(event));
}
}
pub fn on_leader_schedule(&self, event: LeaderScheduleEvent) {
let mut latest_observed_tpu_leader = self.latest_observed_tpu_leader.clone();
if let Some(newest_entry) = event
.added_leaders
.iter()
.chain(event.updated_leaders.iter())
.chain(event.snapshot_leaders.iter())
.copied()
.max_by_key(|entry| entry.slot)
{
let mut next_state = *latest_observed_tpu_leader.shared_get();
let state_changed = match next_state.as_mut() {
None => {
next_state = Some(ObservedTpuLeaderState {
slot: newest_entry.slot,
leader: newest_entry.leader.to_solana(),
});
true
}
Some(current)
if newest_entry.slot > current.slot
|| (newest_entry.slot == current.slot
&& newest_entry.leader.to_solana() != current.leader) =>
{
current.slot = newest_entry.slot;
current.leader = newest_entry.leader.to_solana();
true
}
Some(_) => false,
};
if state_changed {
latest_observed_tpu_leader.update(next_state);
}
}
if self.subscriptions.leader_schedule
&& let Some(dispatcher) = &self.dispatcher
{
dispatcher.dispatch(PluginDispatchEvent::LeaderSchedule(event));
}
}
}