#![allow(clippy::missing_docs_in_private_items)]
use agave_transaction_view::{
transaction_data::TransactionData, transaction_view::SanitizedTransactionView,
};
use async_trait::async_trait;
use sof_types::{PubkeyBytes, SignatureBytes};
use std::{cell::RefCell, sync::Arc};
use thiserror::Error;
use crate::{
event::{TxCommitmentStatus, TxKind},
framework::events::{
AccountTouchEvent, AccountTouchEventRef, AccountUpdateEvent, BlockMetaEvent,
ClusterTopologyEvent, DatasetEvent, LeaderScheduleEvent, ObservedRecentBlockhashEvent,
RawPacketEvent, ReorgEvent, ShredEvent, SlotStatusEvent, TransactionBatchEvent,
TransactionEvent, TransactionEventRef, TransactionLogEvent, TransactionStatusEvent,
TransactionViewBatchEvent,
},
};
#[derive(Clone, Copy, Eq, PartialEq)]
struct CachedTransactionEventKey {
slot: u64,
signature: Option<SignatureBytes>,
tx_ptr: *const solana_transaction::versioned::VersionedTransaction,
kind: TxKind,
}
struct CachedTransactionEvent {
key: CachedTransactionEventKey,
event: TransactionEvent,
}
thread_local! {
static CACHED_TRANSACTION_EVENT: RefCell<Option<CachedTransactionEvent>> = const { RefCell::new(None) };
}
const fn cached_transaction_event_key(
event: &TransactionEventRef<'_>,
) -> CachedTransactionEventKey {
CachedTransactionEventKey {
slot: event.slot,
signature: event.signature,
tx_ptr: event.tx as *const _,
kind: event.kind,
}
}
fn with_cached_transaction_event<R>(
event: &TransactionEventRef<'_>,
f: impl FnOnce(&TransactionEvent) -> R,
) -> R {
let key = cached_transaction_event_key(event);
CACHED_TRANSACTION_EVENT.with(|cached| {
let mut cached = cached.borrow_mut();
if !cached
.as_ref()
.is_some_and(|cached_event| cached_event.key == key)
{
*cached = Some(CachedTransactionEvent {
key,
event: event.to_owned(),
});
}
if let Some(cached_event) = cached.as_ref() {
f(&cached_event.event)
} else {
let owned = event.to_owned();
f(&owned)
}
})
}
pub(crate) fn clone_cached_transaction_event(
event: &TransactionEventRef<'_>,
) -> Option<TransactionEvent> {
let key = cached_transaction_event_key(event);
CACHED_TRANSACTION_EVENT.with(|cached| {
cached
.borrow()
.as_ref()
.and_then(|cached_event| (cached_event.key == key).then(|| cached_event.event.clone()))
})
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum TransactionInterest {
Ignore,
Background,
Critical,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct TransactionPrefilter {
matched_interest: TransactionInterest,
signature: Option<SignatureBytes>,
signature_solana: Option<solana_signature::Signature>,
account_include: CompiledAccountMatcher,
account_exclude: CompiledAccountMatcher,
account_required: CompiledAccountMatcher,
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum CompiledAccountMatcher {
Empty,
One(solana_pubkey::Pubkey),
Two(solana_pubkey::Pubkey, solana_pubkey::Pubkey),
Many(Arc<[solana_pubkey::Pubkey]>),
}
impl TransactionPrefilter {
#[must_use]
pub const fn new(matched_interest: TransactionInterest) -> Self {
Self {
matched_interest,
signature: None,
signature_solana: None,
account_include: CompiledAccountMatcher::Empty,
account_exclude: CompiledAccountMatcher::Empty,
account_required: CompiledAccountMatcher::Empty,
}
}
#[must_use]
pub const fn matched_interest(&self) -> TransactionInterest {
self.matched_interest
}
#[must_use]
pub fn with_signature<S>(mut self, signature: S) -> Self
where
S: Into<SignatureBytes>,
{
let signature = signature.into();
self.signature = Some(signature);
self.signature_solana = Some(signature.to_solana());
self
}
#[must_use]
pub fn with_account_include<I>(mut self, keys: I) -> Self
where
I: IntoIterator,
I::Item: Into<PubkeyBytes>,
{
self.account_include = compile_prefilter_keys(keys);
self
}
#[must_use]
pub fn with_account_exclude<I>(mut self, keys: I) -> Self
where
I: IntoIterator,
I::Item: Into<PubkeyBytes>,
{
self.account_exclude = compile_prefilter_keys(keys);
self
}
#[must_use]
pub fn with_account_required<I>(mut self, keys: I) -> Self
where
I: IntoIterator,
I::Item: Into<PubkeyBytes>,
{
self.account_required = compile_prefilter_keys(keys);
self
}
#[must_use]
pub fn classify_ref(&self, event: &TransactionEventRef<'_>) -> TransactionInterest {
if let Some(signature) = self.signature
&& event.signature != Some(signature)
{
return TransactionInterest::Ignore;
}
if !matches!(self.account_include, CompiledAccountMatcher::Empty)
&& !transaction_matches_any_keys(event.tx, &self.account_include)
{
return TransactionInterest::Ignore;
}
if !matches!(self.account_exclude, CompiledAccountMatcher::Empty)
&& transaction_matches_any_keys(event.tx, &self.account_exclude)
{
return TransactionInterest::Ignore;
}
if !transaction_matches_all_keys(event.tx, &self.account_required) {
return TransactionInterest::Ignore;
}
self.matched_interest
}
#[must_use]
pub(crate) fn classify_view_ref<D: TransactionData>(
&self,
view: &SanitizedTransactionView<D>,
) -> TransactionInterest {
if let Some(signature) = self.signature_solana
&& view.signatures().first().copied() != Some(signature)
{
return TransactionInterest::Ignore;
}
if !matches!(self.account_include, CompiledAccountMatcher::Empty)
&& !transaction_view_matches_any_keys(view, &self.account_include)
{
return TransactionInterest::Ignore;
}
if !matches!(self.account_exclude, CompiledAccountMatcher::Empty)
&& transaction_view_matches_any_keys(view, &self.account_exclude)
{
return TransactionInterest::Ignore;
}
if !transaction_view_matches_all_keys(view, &self.account_required) {
return TransactionInterest::Ignore;
}
self.matched_interest
}
}
fn compile_prefilter_keys<I>(keys: I) -> CompiledAccountMatcher
where
I: IntoIterator,
I::Item: Into<PubkeyBytes>,
{
let keys = keys
.into_iter()
.map(Into::into)
.map(PubkeyBytes::to_solana)
.collect::<Vec<_>>();
match keys.as_slice() {
[] => CompiledAccountMatcher::Empty,
[key] => CompiledAccountMatcher::One(*key),
[first, second] => CompiledAccountMatcher::Two(*first, *second),
_ => CompiledAccountMatcher::Many(Arc::from(keys)),
}
}
fn transaction_mentions_account_key(
tx: &solana_transaction::versioned::VersionedTransaction,
key: &solana_pubkey::Pubkey,
) -> bool {
tx.message.static_account_keys().contains(key)
|| tx
.message
.address_table_lookups()
.is_some_and(|lookups| lookups.iter().any(|lookup| lookup.account_key == *key))
}
fn transaction_matches_any_keys(
tx: &solana_transaction::versioned::VersionedTransaction,
matcher: &CompiledAccountMatcher,
) -> bool {
match matcher {
CompiledAccountMatcher::Empty => false,
CompiledAccountMatcher::One(key) => transaction_mentions_account_key(tx, key),
CompiledAccountMatcher::Two(first, second) => {
transaction_mentions_account_key(tx, first)
|| transaction_mentions_account_key(tx, second)
}
CompiledAccountMatcher::Many(keys) => keys
.iter()
.any(|key| transaction_mentions_account_key(tx, key)),
}
}
fn transaction_matches_all_keys(
tx: &solana_transaction::versioned::VersionedTransaction,
matcher: &CompiledAccountMatcher,
) -> bool {
match matcher {
CompiledAccountMatcher::Empty => true,
CompiledAccountMatcher::One(key) => transaction_mentions_account_key(tx, key),
CompiledAccountMatcher::Two(first, second) => {
transaction_mentions_account_key(tx, first)
&& transaction_mentions_account_key(tx, second)
}
CompiledAccountMatcher::Many(keys) => keys
.iter()
.all(|key| transaction_mentions_account_key(tx, key)),
}
}
fn transaction_view_mentions_account_key<D: TransactionData>(
view: &SanitizedTransactionView<D>,
key: &solana_pubkey::Pubkey,
) -> bool {
view.static_account_keys().contains(key)
|| view
.address_table_lookup_iter()
.any(|lookup| lookup.account_key == key)
}
fn transaction_view_matches_any_keys<D: TransactionData>(
view: &SanitizedTransactionView<D>,
matcher: &CompiledAccountMatcher,
) -> bool {
match matcher {
CompiledAccountMatcher::Empty => false,
CompiledAccountMatcher::One(key) => transaction_view_mentions_account_key(view, key),
CompiledAccountMatcher::Two(first, second) => {
transaction_view_mentions_account_key(view, first)
|| transaction_view_mentions_account_key(view, second)
}
CompiledAccountMatcher::Many(keys) => keys
.iter()
.any(|key| transaction_view_mentions_account_key(view, key)),
}
}
fn transaction_view_matches_all_keys<D: TransactionData>(
view: &SanitizedTransactionView<D>,
matcher: &CompiledAccountMatcher,
) -> bool {
match matcher {
CompiledAccountMatcher::Empty => true,
CompiledAccountMatcher::One(key) => transaction_view_mentions_account_key(view, key),
CompiledAccountMatcher::Two(first, second) => {
transaction_view_mentions_account_key(view, first)
&& transaction_view_mentions_account_key(view, second)
}
CompiledAccountMatcher::Many(keys) => keys
.iter()
.all(|key| transaction_view_mentions_account_key(view, key)),
}
}
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub enum TransactionDispatchMode {
#[default]
Standard,
Inline,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum TransactionCommitmentSelector {
AtLeast(TxCommitmentStatus),
Only(TxCommitmentStatus),
}
impl Default for TransactionCommitmentSelector {
fn default() -> Self {
Self::AtLeast(TxCommitmentStatus::Processed)
}
}
impl TransactionCommitmentSelector {
#[must_use]
pub fn matches(self, commitment_status: TxCommitmentStatus) -> bool {
match self {
Self::AtLeast(minimum) => commitment_status.satisfies_minimum(minimum),
Self::Only(expected) => commitment_status == expected,
}
}
#[must_use]
pub const fn minimum_required(self) -> TxCommitmentStatus {
match self {
Self::AtLeast(minimum) | Self::Only(minimum) => minimum,
}
}
}
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub struct PluginConfig {
pub raw_packet: bool,
pub shred: bool,
pub dataset: bool,
pub transaction: bool,
pub transaction_log: bool,
pub transaction_status: bool,
pub transaction_commitment: TransactionCommitmentSelector,
pub transaction_dispatch_mode: TransactionDispatchMode,
pub transaction_batch: bool,
pub transaction_batch_dispatch_mode: TransactionDispatchMode,
pub transaction_view_batch: bool,
pub transaction_view_batch_dispatch_mode: TransactionDispatchMode,
pub account_touch: bool,
pub account_update: bool,
pub block_meta: bool,
pub slot_status: bool,
pub reorg: bool,
pub recent_blockhash: bool,
pub cluster_topology: bool,
pub leader_schedule: bool,
}
impl PluginConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn with_raw_packet(mut self) -> Self {
self.raw_packet = true;
self
}
#[must_use]
pub const fn with_shred(mut self) -> Self {
self.shred = true;
self
}
#[must_use]
pub const fn with_dataset(mut self) -> Self {
self.dataset = true;
self
}
#[must_use]
pub const fn with_transaction(mut self) -> Self {
self.transaction = true;
self
}
#[must_use]
pub const fn with_transaction_status(mut self) -> Self {
self.transaction_status = true;
self
}
#[must_use]
pub const fn at_commitment(mut self, commitment: TxCommitmentStatus) -> Self {
self.transaction_commitment = TransactionCommitmentSelector::AtLeast(commitment);
self
}
#[must_use]
pub const fn only_at_commitment(mut self, commitment: TxCommitmentStatus) -> Self {
self.transaction_commitment = TransactionCommitmentSelector::Only(commitment);
self
}
#[must_use]
pub const fn with_inline_transaction(mut self) -> Self {
self.transaction = true;
self.transaction_dispatch_mode = TransactionDispatchMode::Inline;
self
}
#[must_use]
pub const fn with_transaction_mode(mut self, mode: TransactionDispatchMode) -> Self {
self.transaction = true;
self.transaction_dispatch_mode = mode;
self
}
#[must_use]
pub const fn with_transaction_batch(mut self) -> Self {
self.transaction_batch = true;
self
}
#[must_use]
pub const fn with_inline_transaction_batch(mut self) -> Self {
self.transaction_batch = true;
self.transaction_batch_dispatch_mode = TransactionDispatchMode::Inline;
self
}
#[must_use]
pub const fn with_transaction_batch_mode(mut self, mode: TransactionDispatchMode) -> Self {
self.transaction_batch = true;
self.transaction_batch_dispatch_mode = mode;
self
}
#[must_use]
pub const fn with_transaction_view_batch(mut self) -> Self {
self.transaction_view_batch = true;
self
}
#[must_use]
pub const fn with_inline_transaction_view_batch(mut self) -> Self {
self.transaction_view_batch = true;
self.transaction_view_batch_dispatch_mode = TransactionDispatchMode::Inline;
self
}
#[must_use]
pub const fn with_transaction_view_batch_mode(mut self, mode: TransactionDispatchMode) -> Self {
self.transaction_view_batch = true;
self.transaction_view_batch_dispatch_mode = mode;
self
}
#[must_use]
pub const fn with_account_touch(mut self) -> Self {
self.account_touch = true;
self
}
#[must_use]
pub const fn with_account_update(mut self) -> Self {
self.account_update = true;
self
}
#[must_use]
pub const fn with_block_meta(mut self) -> Self {
self.block_meta = true;
self
}
#[must_use]
pub const fn with_slot_status(mut self) -> Self {
self.slot_status = true;
self
}
#[must_use]
pub const fn with_reorg(mut self) -> Self {
self.reorg = true;
self
}
#[must_use]
pub const fn with_recent_blockhash(mut self) -> Self {
self.recent_blockhash = true;
self
}
#[must_use]
pub const fn with_cluster_topology(mut self) -> Self {
self.cluster_topology = true;
self
}
#[must_use]
pub const fn with_leader_schedule(mut self) -> Self {
self.leader_schedule = true;
self
}
}
#[derive(Debug, Clone)]
pub struct PluginContext {
pub plugin_name: &'static str,
}
#[derive(Debug, Clone, Error, Eq, PartialEq)]
#[error("{reason}")]
pub struct PluginSetupError {
reason: String,
}
impl PluginSetupError {
#[must_use]
pub fn new(reason: impl Into<String>) -> Self {
Self {
reason: reason.into(),
}
}
}
#[async_trait]
pub trait ObserverPlugin: Send + Sync + 'static {
fn name(&self) -> &'static str {
core::any::type_name::<Self>()
}
fn config(&self) -> PluginConfig {
PluginConfig::default()
}
async fn setup(&self, _ctx: PluginContext) -> Result<(), PluginSetupError> {
Ok(())
}
async fn on_raw_packet(&self, _event: RawPacketEvent) {}
async fn on_shred(&self, _event: ShredEvent) {}
async fn on_dataset(&self, _event: DatasetEvent) {}
fn accepts_transaction(&self, _event: &TransactionEvent) -> bool {
true
}
fn accepts_transaction_ref(&self, event: &TransactionEventRef<'_>) -> bool {
with_cached_transaction_event(event, |owned| self.accepts_transaction(owned))
}
fn transaction_interest(&self, event: &TransactionEvent) -> TransactionInterest {
if self.accepts_transaction(event) {
TransactionInterest::Critical
} else {
TransactionInterest::Ignore
}
}
fn transaction_interest_ref(&self, event: &TransactionEventRef<'_>) -> TransactionInterest {
with_cached_transaction_event(event, |owned| self.transaction_interest(owned))
}
fn transaction_prefilter(&self) -> Option<&TransactionPrefilter> {
None
}
async fn on_transaction(&self, _event: &TransactionEvent) {}
async fn on_transaction_log(&self, _event: &TransactionLogEvent) {}
fn accepts_transaction_log(&self, _event: &TransactionLogEvent) -> bool {
true
}
async fn on_transaction_status(&self, _event: &TransactionStatusEvent) {}
fn accepts_transaction_status(&self, _event: &TransactionStatusEvent) -> bool {
true
}
async fn on_transaction_batch(&self, _event: &TransactionBatchEvent) {}
async fn on_transaction_view_batch(&self, _event: &TransactionViewBatchEvent) {}
async fn on_transaction_with_interest(
&self,
event: &TransactionEvent,
_interest: TransactionInterest,
) {
self.on_transaction(event).await;
}
fn accepts_account_touch_ref(&self, _event: &AccountTouchEventRef<'_>) -> bool {
true
}
async fn on_account_touch(&self, _event: &AccountTouchEvent) {}
async fn on_account_update(&self, _event: &AccountUpdateEvent) {}
fn accepts_block_meta(&self, _event: &BlockMetaEvent) -> bool {
true
}
async fn on_block_meta(&self, _event: &BlockMetaEvent) {}
fn accepts_account_update(&self, _event: &AccountUpdateEvent) -> bool {
true
}
async fn on_slot_status(&self, _event: SlotStatusEvent) {}
async fn on_reorg(&self, _event: ReorgEvent) {}
async fn on_recent_blockhash(&self, _event: ObservedRecentBlockhashEvent) {}
async fn on_cluster_topology(&self, _event: ClusterTopologyEvent) {}
async fn on_leader_schedule(&self, _event: LeaderScheduleEvent) {}
async fn shutdown(&self, _ctx: PluginContext) {}
}