use std::{
any::Any,
future::Future,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
thread,
};
use crate::event::TxCommitmentStatus;
use crate::framework::{
events::{
ClusterTopologyEvent, DatasetEvent, LeaderScheduleEntry, LeaderScheduleEvent,
ObservedRecentBlockhashEvent, RawPacketEvent, ReorgEvent, ShredEvent, SlotStatusEvent,
TransactionBatchEvent, TransactionEvent,
},
plugin::{ObserverPlugin, PluginConfig, TransactionDispatchMode},
};
use arcshift::ArcShift;
use solana_pubkey::Pubkey;
use thiserror::Error;
mod builder;
mod core;
mod dispatch;
mod state;
#[cfg(test)]
mod tests;
pub use builder::PluginHostBuilder;
pub use core::PluginHost;
pub(crate) use core::TransactionDispatchScope;
pub(crate) use dispatch::ClassifiedTransactionDispatch;
pub(crate) use dispatch::TransactionDispatchMetricsBatch;
const DEFAULT_EVENT_QUEUE_CAPACITY: usize = 8_192;
const DEFAULT_TRANSACTION_DISPATCH_WORKERS_CAP: usize = 8;
const INITIAL_DROP_LOG_LIMIT: u64 = 5;
const DROP_LOG_SAMPLE_EVERY: u64 = 1_000;
#[derive(Debug, Default)]
struct PluginHostLifecycleState {
started: AtomicBool,
}
#[derive(Debug, Clone, Error, Eq, PartialEq)]
#[error("plugin `{plugin}` startup failed: {reason}")]
pub struct PluginHostStartupError {
pub plugin: &'static str,
pub reason: String,
}
fn default_transaction_dispatch_workers() -> usize {
thread::available_parallelism()
.map(usize::from)
.unwrap_or(1)
.clamp(1, DEFAULT_TRANSACTION_DISPATCH_WORKERS_CAP)
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub enum PluginDispatchMode {
#[default]
Sequential,
BoundedConcurrent(usize),
}
impl PluginDispatchMode {
#[must_use]
pub fn normalized(self) -> Self {
match self {
Self::Sequential => Self::Sequential,
Self::BoundedConcurrent(limit) => Self::BoundedConcurrent(limit.max(1)),
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum InlineTransactionDispatchSource {
EarlyPrefix,
CompletedDatasetFallback,
}
impl InlineTransactionDispatchSource {
pub(crate) const fn as_str(self) -> &'static str {
match self {
Self::EarlyPrefix => "early_prefix",
Self::CompletedDatasetFallback => "completed_dataset_fallback",
}
}
}
fn panic_payload_to_string(payload: &(dyn Any + Send)) -> String {
payload.downcast_ref::<&str>().map_or_else(
|| {
payload
.downcast_ref::<String>()
.cloned()
.unwrap_or_else(|| "non-string panic payload".to_owned())
},
|message| (*message).to_owned(),
)
}
#[derive(Debug, Clone, Copy, Default)]
struct PluginHookSubscriptions {
raw_packet: bool,
shred: bool,
dataset: bool,
transaction: bool,
transaction_min_commitment: TxCommitmentStatus,
transaction_prefilter: bool,
transaction_log: bool,
transaction_status: bool,
inline_transaction: bool,
transaction_batch: bool,
inline_transaction_batch: bool,
transaction_view_batch: bool,
inline_transaction_view_batch: bool,
account_touch: bool,
account_update: bool,
block_meta: bool,
slot_status: bool,
reorg: bool,
recent_blockhash: bool,
cluster_topology: bool,
leader_schedule: bool,
}
impl From<&PluginConfig> for PluginHookSubscriptions {
fn from(config: &PluginConfig) -> Self {
Self {
raw_packet: config.raw_packet,
shred: config.shred,
dataset: config.dataset,
transaction: config.transaction,
transaction_min_commitment: config.transaction_commitment.minimum_required(),
transaction_prefilter: false,
transaction_log: config.transaction_log,
transaction_status: config.transaction_status,
inline_transaction: config.transaction
&& matches!(
config.transaction_dispatch_mode,
TransactionDispatchMode::Inline
),
transaction_batch: config.transaction_batch,
inline_transaction_batch: config.transaction_batch
&& matches!(
config.transaction_batch_dispatch_mode,
TransactionDispatchMode::Inline
),
transaction_view_batch: config.transaction_view_batch,
inline_transaction_view_batch: config.transaction_view_batch
&& matches!(
config.transaction_view_batch_dispatch_mode,
TransactionDispatchMode::Inline
),
account_touch: config.account_touch,
account_update: config.account_update,
block_meta: config.block_meta,
slot_status: config.slot_status,
reorg: config.reorg,
recent_blockhash: config.recent_blockhash,
cluster_topology: config.cluster_topology,
leader_schedule: config.leader_schedule,
}
}
}