use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
use std::sync::atomic::{AtomicU64, Ordering};
use crate::event::TxCommitmentStatus;
use crate::event::TxKind;
use crate::framework::{
AccountUpdateEvent, BlockMetaEvent, ClusterTopologyEvent, LeaderScheduleEvent,
ObservedRecentBlockhashEvent, ReorgEvent, SlotStatusEvent, TransactionEvent,
TransactionLogEvent, TransactionStatusEvent, TransactionViewBatchEvent,
};
use agave_transaction_view::{
transaction_data::TransactionData, transaction_view::SanitizedTransactionView,
};
use sof_types::SignatureBytes;
use solana_sdk_ids::{compute_budget, vote};
use solana_transaction::versioned::VersionedTransaction;
pub const DEFAULT_PROVIDER_STREAM_QUEUE_CAPACITY: usize = 8_192;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ProviderStreamMode {
Generic,
YellowstoneGrpc,
YellowstoneGrpcTransactionStatus,
YellowstoneGrpcAccounts,
YellowstoneGrpcBlockMeta,
YellowstoneGrpcSlots,
LaserStream,
LaserStreamTransactionStatus,
LaserStreamAccounts,
LaserStreamBlockMeta,
LaserStreamSlots,
#[cfg(feature = "provider-websocket")]
WebsocketTransaction,
#[cfg(feature = "provider-websocket")]
WebsocketLogs,
#[cfg(feature = "provider-websocket")]
WebsocketAccount,
#[cfg(feature = "provider-websocket")]
WebsocketProgram,
}
impl ProviderStreamMode {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Generic => "generic_provider",
Self::YellowstoneGrpc => "yellowstone_grpc",
Self::YellowstoneGrpcTransactionStatus => "yellowstone_grpc_transaction_status",
Self::YellowstoneGrpcAccounts => "yellowstone_grpc_accounts",
Self::YellowstoneGrpcBlockMeta => "yellowstone_grpc_block_meta",
Self::YellowstoneGrpcSlots => "yellowstone_grpc_slots",
Self::LaserStream => "laserstream",
Self::LaserStreamTransactionStatus => "laserstream_transaction_status",
Self::LaserStreamAccounts => "laserstream_accounts",
Self::LaserStreamBlockMeta => "laserstream_block_meta",
Self::LaserStreamSlots => "laserstream_slots",
#[cfg(feature = "provider-websocket")]
Self::WebsocketTransaction => "websocket_transaction",
#[cfg(feature = "provider-websocket")]
Self::WebsocketLogs => "websocket_logs",
#[cfg(feature = "provider-websocket")]
Self::WebsocketAccount => "websocket_account",
#[cfg(feature = "provider-websocket")]
Self::WebsocketProgram => "websocket_program",
}
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum ProviderReplayMode {
Live,
#[default]
Resume,
FromSlot(u64),
}
#[derive(Debug, Clone)]
pub enum ProviderStreamUpdate {
Transaction(TransactionEvent),
SerializedTransaction(SerializedTransactionEvent),
TransactionLog(TransactionLogEvent),
TransactionStatus(TransactionStatusEvent),
TransactionViewBatch(TransactionViewBatchEvent),
AccountUpdate(AccountUpdateEvent),
BlockMeta(BlockMetaEvent),
RecentBlockhash(ObservedRecentBlockhashEvent),
SlotStatus(SlotStatusEvent),
ClusterTopology(ClusterTopologyEvent),
LeaderSchedule(LeaderScheduleEvent),
Reorg(ReorgEvent),
Health(ProviderSourceHealthEvent),
}
impl ProviderStreamUpdate {
#[must_use]
pub fn with_provider_source(mut self, source: ProviderSourceIdentity) -> Self {
let source = Arc::new(source);
match &mut self {
Self::Transaction(event) => event.provider_source = Some(Arc::clone(&source)),
Self::SerializedTransaction(event) => event.provider_source = Some(Arc::clone(&source)),
Self::TransactionLog(event) => event.provider_source = Some(Arc::clone(&source)),
Self::TransactionStatus(event) => event.provider_source = Some(Arc::clone(&source)),
Self::TransactionViewBatch(event) => event.provider_source = Some(Arc::clone(&source)),
Self::AccountUpdate(event) => event.provider_source = Some(Arc::clone(&source)),
Self::BlockMeta(event) => event.provider_source = Some(Arc::clone(&source)),
Self::RecentBlockhash(event) => event.provider_source = Some(Arc::clone(&source)),
Self::SlotStatus(event) => event.provider_source = Some(Arc::clone(&source)),
Self::ClusterTopology(event) => event.provider_source = Some(Arc::clone(&source)),
Self::LeaderSchedule(event) => event.provider_source = Some(Arc::clone(&source)),
Self::Reorg(event) => event.provider_source = Some(Arc::clone(&source)),
Self::Health(event) => event.source = Arc::unwrap_or_clone(source),
}
self
}
}
impl From<TransactionEvent> for ProviderStreamUpdate {
fn from(event: TransactionEvent) -> Self {
Self::Transaction(event)
}
}
impl From<SerializedTransactionEvent> for ProviderStreamUpdate {
fn from(event: SerializedTransactionEvent) -> Self {
Self::SerializedTransaction(event)
}
}
impl From<TransactionLogEvent> for ProviderStreamUpdate {
fn from(event: TransactionLogEvent) -> Self {
Self::TransactionLog(event)
}
}
impl From<TransactionStatusEvent> for ProviderStreamUpdate {
fn from(event: TransactionStatusEvent) -> Self {
Self::TransactionStatus(event)
}
}
impl From<TransactionViewBatchEvent> for ProviderStreamUpdate {
fn from(event: TransactionViewBatchEvent) -> Self {
Self::TransactionViewBatch(event)
}
}
impl From<AccountUpdateEvent> for ProviderStreamUpdate {
fn from(event: AccountUpdateEvent) -> Self {
Self::AccountUpdate(event)
}
}
impl From<BlockMetaEvent> for ProviderStreamUpdate {
fn from(event: BlockMetaEvent) -> Self {
Self::BlockMeta(event)
}
}
impl From<ObservedRecentBlockhashEvent> for ProviderStreamUpdate {
fn from(event: ObservedRecentBlockhashEvent) -> Self {
Self::RecentBlockhash(event)
}
}
impl From<SlotStatusEvent> for ProviderStreamUpdate {
fn from(event: SlotStatusEvent) -> Self {
Self::SlotStatus(event)
}
}
impl From<ClusterTopologyEvent> for ProviderStreamUpdate {
fn from(event: ClusterTopologyEvent) -> Self {
Self::ClusterTopology(event)
}
}
impl From<LeaderScheduleEvent> for ProviderStreamUpdate {
fn from(event: LeaderScheduleEvent) -> Self {
Self::LeaderSchedule(event)
}
}
impl From<ReorgEvent> for ProviderStreamUpdate {
fn from(event: ReorgEvent) -> Self {
Self::Reorg(event)
}
}
impl From<ProviderSourceHealthEvent> for ProviderStreamUpdate {
fn from(event: ProviderSourceHealthEvent) -> Self {
Self::Health(event)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ProviderSourceHealthEvent {
pub source: ProviderSourceIdentity,
pub readiness: ProviderSourceReadiness,
pub status: ProviderSourceHealthStatus,
pub reason: ProviderSourceHealthReason,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProviderSourceIdentity {
pub kind: ProviderSourceId,
pub instance: Arc<str>,
pub priority: u16,
pub role: ProviderSourceRole,
pub arbitration: ProviderSourceArbitrationMode,
}
impl ProviderSourceIdentity {
#[must_use]
pub fn new(kind: ProviderSourceId, instance: impl Into<Arc<str>>) -> Self {
Self {
kind,
instance: instance.into(),
priority: ProviderSourceRole::Primary.default_priority(),
role: ProviderSourceRole::Primary,
arbitration: ProviderSourceArbitrationMode::EmitAll,
}
}
#[must_use]
pub fn kind_str(&self) -> &str {
self.kind.as_str()
}
#[must_use]
pub fn instance_str(&self) -> &str {
self.instance.as_ref()
}
#[must_use]
pub const fn priority(&self) -> u16 {
self.priority
}
#[must_use]
pub const fn role(&self) -> ProviderSourceRole {
self.role
}
#[must_use]
pub const fn arbitration(&self) -> ProviderSourceArbitrationMode {
self.arbitration
}
#[must_use]
pub const fn with_priority(mut self, priority: u16) -> Self {
self.priority = priority;
self
}
#[must_use]
pub const fn with_role(mut self, role: ProviderSourceRole) -> Self {
if self.priority == self.role.default_priority() {
self.priority = role.default_priority();
}
self.role = role;
self
}
#[must_use]
pub const fn with_arbitration(mut self, arbitration: ProviderSourceArbitrationMode) -> Self {
self.arbitration = arbitration;
self
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
#[must_use]
pub(crate) fn generated(kind: ProviderSourceId, label: Option<&str>) -> Self {
static NEXT_PROVIDER_SOURCE_INSTANCE: AtomicU64 = AtomicU64::new(1);
match label {
Some(label) => Self::new(kind, label),
None => {
let instance = NEXT_PROVIDER_SOURCE_INSTANCE.fetch_add(1, Ordering::Relaxed);
Self::new(kind.clone(), format!("{}-{instance}", kind.as_str()))
}
}
}
}
impl std::fmt::Display for ProviderSourceIdentity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.kind.as_str(), self.instance)
}
}
impl PartialEq for ProviderSourceIdentity {
fn eq(&self, other: &Self) -> bool {
self.kind == other.kind && self.instance == other.instance
}
}
impl Eq for ProviderSourceIdentity {}
impl Hash for ProviderSourceIdentity {
fn hash<H: Hasher>(&self, state: &mut H) {
self.kind.hash(state);
self.instance.hash(state);
}
}
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum ProviderSourceId {
Generic(Arc<str>),
YellowstoneGrpc,
YellowstoneGrpcTransactionStatus,
YellowstoneGrpcAccounts,
YellowstoneGrpcBlockMeta,
YellowstoneGrpcSlots,
LaserStream,
LaserStreamTransactionStatus,
LaserStreamAccounts,
LaserStreamBlockMeta,
LaserStreamSlots,
#[cfg(feature = "provider-websocket")]
WebsocketTransaction,
#[cfg(feature = "provider-websocket")]
WebsocketLogs,
#[cfg(feature = "provider-websocket")]
WebsocketAccount,
#[cfg(feature = "provider-websocket")]
WebsocketProgram,
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum ProviderSourceRole {
Primary,
Secondary,
Fallback,
ConfirmOnly,
}
impl ProviderSourceRole {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Primary => "primary",
Self::Secondary => "secondary",
Self::Fallback => "fallback",
Self::ConfirmOnly => "confirm_only",
}
}
#[must_use]
pub const fn default_priority(self) -> u16 {
match self {
Self::Primary => 300,
Self::Secondary => 200,
Self::Fallback => 100,
Self::ConfirmOnly => 400,
}
}
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum ProviderSourceArbitrationMode {
EmitAll,
FirstSeen,
FirstSeenThenPromote,
}
impl ProviderSourceArbitrationMode {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::EmitAll => "emit_all",
Self::FirstSeen => "first_seen",
Self::FirstSeenThenPromote => "first_seen_then_promote",
}
}
}
impl ProviderSourceId {
#[must_use]
pub fn as_str(&self) -> &str {
match self {
Self::Generic(label) => label.as_ref(),
Self::YellowstoneGrpc => "yellowstone_grpc",
Self::YellowstoneGrpcTransactionStatus => "yellowstone_grpc_transaction_status",
Self::YellowstoneGrpcAccounts => "yellowstone_grpc_accounts",
Self::YellowstoneGrpcBlockMeta => "yellowstone_grpc_block_meta",
Self::YellowstoneGrpcSlots => "yellowstone_grpc_slots",
Self::LaserStream => "laserstream",
Self::LaserStreamTransactionStatus => "laserstream_transaction_status",
Self::LaserStreamAccounts => "laserstream_accounts",
Self::LaserStreamBlockMeta => "laserstream_block_meta",
Self::LaserStreamSlots => "laserstream_slots",
#[cfg(feature = "provider-websocket")]
Self::WebsocketTransaction => "websocket_transaction",
#[cfg(feature = "provider-websocket")]
Self::WebsocketLogs => "websocket_logs",
#[cfg(feature = "provider-websocket")]
Self::WebsocketAccount => "websocket_account",
#[cfg(feature = "provider-websocket")]
Self::WebsocketProgram => "websocket_program",
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ProviderSourceHealthStatus {
Healthy,
Reconnecting,
Unhealthy,
Removed,
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum ProviderSourceReadiness {
Required,
Optional,
}
impl ProviderSourceReadiness {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Required => "required",
Self::Optional => "optional",
}
}
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum ProviderSourceHealthReason {
InitialConnectPending,
SubscriptionAckReceived,
UpstreamStreamClosedUnexpectedly,
UpstreamTransportFailure,
UpstreamProtocolFailure,
ReplayBackfillFailure,
ReconnectBudgetExhausted,
SourceRemoved,
}
impl ProviderSourceHealthReason {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::InitialConnectPending => "initial_connect_pending",
Self::SubscriptionAckReceived => "subscription_ack_received",
Self::UpstreamStreamClosedUnexpectedly => "upstream_stream_closed_unexpectedly",
Self::UpstreamTransportFailure => "upstream_transport_failure",
Self::UpstreamProtocolFailure => "upstream_protocol_failure",
Self::ReplayBackfillFailure => "replay_backfill_failure",
Self::ReconnectBudgetExhausted => "reconnect_budget_exhausted",
Self::SourceRemoved => "source_removed",
}
}
}
pub type ProviderStreamSender = mpsc::Sender<ProviderStreamUpdate>;
pub type ProviderStreamReceiver = mpsc::Receiver<ProviderStreamUpdate>;
pub type ProviderSourceRef = Arc<ProviderSourceIdentity>;
#[derive(Debug, Error)]
#[error("provider fan-in already contains source identity {0}")]
pub struct ProviderSourceIdentityRegistrationError(pub ProviderSourceIdentity);
#[derive(Debug)]
pub(crate) struct ProviderSourceReservation {
fan_in: ProviderStreamFanIn,
source: ProviderSourceIdentity,
removal_sender: Option<ProviderStreamSender>,
}
#[derive(Debug)]
struct ProviderSourceDeferredRelease {
fan_in: ProviderStreamFanIn,
source: ProviderSourceIdentity,
}
impl Drop for ProviderSourceDeferredRelease {
fn drop(&mut self) {
self.fan_in.release_source_identity(&self.source);
}
}
#[derive(Debug, Default)]
struct ProviderSourceReleaseGuard {
_identity: Option<ProviderSourceDeferredRelease>,
_reservation: Option<Arc<ProviderSourceReservation>>,
}
impl ProviderSourceReleaseGuard {
const fn for_identity(release: ProviderSourceDeferredRelease) -> Self {
Self {
_identity: Some(release),
_reservation: None,
}
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
const fn for_reservation(reservation: Arc<ProviderSourceReservation>) -> Self {
Self {
_identity: None,
_reservation: Some(reservation),
}
}
}
const SOURCE_REMOVED_NO_RUNTIME_RETRY_ATTEMPTS: usize = 64;
const SOURCE_REMOVED_NO_RUNTIME_RETRY_DELAY: Duration = Duration::from_millis(5);
impl Drop for ProviderSourceReservation {
fn drop(&mut self) {
let release = ProviderSourceReleaseGuard::for_identity(ProviderSourceDeferredRelease {
fan_in: self.fan_in.clone(),
source: self.source.clone(),
});
if let Some(sender) = self.removal_sender.clone() {
drop(emit_provider_source_removed(
&sender,
self.source.clone(),
ProviderSourceReadiness::Optional,
"reserved provider source sender dropped and was removed from tracking".to_owned(),
Some(release),
));
} else {
drop(release);
}
}
}
#[derive(Clone, Debug)]
pub struct ReservedProviderStreamSender {
sender: ProviderStreamSender,
reservation: Arc<ProviderSourceReservation>,
}
impl ReservedProviderStreamSender {
#[must_use]
pub fn source(&self) -> &ProviderSourceIdentity {
&self.reservation.source
}
fn bind_update(&self, update: ProviderStreamUpdate) -> ProviderStreamUpdate {
update.with_provider_source(self.reservation.source.clone())
}
pub async fn send(
&self,
update: ProviderStreamUpdate,
) -> Result<(), SendError<ProviderStreamUpdate>> {
if matches!(
update,
ProviderStreamUpdate::Health(ProviderSourceHealthEvent {
status: ProviderSourceHealthStatus::Removed,
..
})
) {
return Err(SendError(self.bind_update(update)));
}
self.sender.send(self.bind_update(update)).await
}
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
#[derive(Debug)]
pub(crate) struct ProviderSourceTaskGuard {
sender: ProviderStreamSender,
source: ProviderSourceIdentity,
readiness: ProviderSourceReadiness,
reservation: Option<Arc<ProviderSourceReservation>>,
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
impl ProviderSourceTaskGuard {
#[must_use]
pub(crate) const fn new(
sender: ProviderStreamSender,
source: ProviderSourceIdentity,
readiness: ProviderSourceReadiness,
reservation: Option<Arc<ProviderSourceReservation>>,
) -> Self {
Self {
sender,
source,
readiness,
reservation,
}
}
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
impl Drop for ProviderSourceTaskGuard {
fn drop(&mut self) {
drop(emit_provider_source_removed(
&self.sender,
self.source.clone(),
self.readiness,
"provider source task stopped and was removed from tracking".to_owned(),
self.reservation
.take()
.map(ProviderSourceReleaseGuard::for_reservation),
));
}
}
#[derive(Clone, Debug)]
pub struct ProviderStreamFanIn {
sender: ProviderStreamSender,
identities: Arc<Mutex<HashSet<ProviderSourceIdentity>>>,
}
impl ProviderStreamFanIn {
pub fn sender_for_source(
&self,
source: ProviderSourceIdentity,
) -> Result<ReservedProviderStreamSender, ProviderSourceIdentityRegistrationError> {
let reservation = self.reserve_source_identity_generic(source)?;
Ok(ReservedProviderStreamSender {
sender: self.sender.clone(),
reservation: Arc::new(reservation),
})
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
#[must_use]
pub(crate) fn sender(&self) -> ProviderStreamSender {
self.sender.clone()
}
pub(crate) fn reserve_source_identity(
&self,
source: ProviderSourceIdentity,
) -> Result<ProviderSourceReservation, ProviderSourceIdentityRegistrationError> {
let Ok(mut identities) = self.identities.lock() else {
return Err(ProviderSourceIdentityRegistrationError(source));
};
if !identities.insert(source.clone()) {
return Err(ProviderSourceIdentityRegistrationError(source));
}
Ok(ProviderSourceReservation {
fan_in: self.clone(),
source,
removal_sender: None,
})
}
fn reserve_source_identity_generic(
&self,
source: ProviderSourceIdentity,
) -> Result<ProviderSourceReservation, ProviderSourceIdentityRegistrationError> {
let mut reservation = self.reserve_source_identity(source)?;
reservation.removal_sender = Some(self.sender.clone());
Ok(reservation)
}
pub(crate) fn release_source_identity(&self, source: &ProviderSourceIdentity) {
if let Ok(mut identities) = self.identities.lock() {
identities.remove(source);
}
}
}
#[derive(Debug, Clone)]
pub struct SerializedTransactionEvent {
pub slot: u64,
pub commitment_status: TxCommitmentStatus,
pub confirmed_slot: Option<u64>,
pub finalized_slot: Option<u64>,
pub signature: Option<SignatureBytes>,
pub provider_source: Option<ProviderSourceRef>,
pub bytes: Box<[u8]>,
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct ProviderCommitmentWatermarks {
pub(crate) confirmed_slot: Option<u64>,
pub(crate) finalized_slot: Option<u64>,
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
impl ProviderCommitmentWatermarks {
#[inline]
pub(crate) fn observe_transaction_commitment(
&mut self,
slot: u64,
commitment_status: TxCommitmentStatus,
) {
match commitment_status {
TxCommitmentStatus::Processed => {}
TxCommitmentStatus::Confirmed => {
self.confirmed_slot = Some(self.confirmed_slot.unwrap_or(slot).max(slot));
}
TxCommitmentStatus::Finalized => {
self.confirmed_slot = Some(self.confirmed_slot.unwrap_or(slot).max(slot));
self.finalized_slot = Some(self.finalized_slot.unwrap_or(slot).max(slot));
}
}
}
#[inline]
pub(crate) fn observe_confirmed_slot(&mut self, slot: u64) {
self.confirmed_slot = Some(self.confirmed_slot.unwrap_or(slot).max(slot));
}
#[inline]
pub(crate) fn observe_finalized_slot(&mut self, slot: u64) {
self.observe_confirmed_slot(slot);
self.finalized_slot = Some(self.finalized_slot.unwrap_or(slot).max(slot));
}
}
#[must_use]
pub fn create_provider_stream_queue(
capacity: usize,
) -> (ProviderStreamSender, ProviderStreamReceiver) {
mpsc::channel(capacity.max(1))
}
#[must_use]
pub fn create_provider_stream_fan_in(
capacity: usize,
) -> (ProviderStreamFanIn, ProviderStreamReceiver) {
let (sender, receiver) = create_provider_stream_queue(capacity);
(
ProviderStreamFanIn {
sender,
identities: Arc::new(Mutex::new(HashSet::new())),
},
receiver,
)
}
pub(crate) fn classify_provider_transaction_kind(tx: &VersionedTransaction) -> TxKind {
let mut has_vote = false;
let mut has_non_vote_non_budget = false;
let keys = tx.message.static_account_keys();
for instruction in tx.message.instructions() {
if let Some(program_id) = keys.get(usize::from(instruction.program_id_index)) {
if *program_id == vote::id() {
has_vote = true;
if has_non_vote_non_budget {
return TxKind::Mixed;
}
continue;
}
if *program_id != compute_budget::id() {
has_non_vote_non_budget = true;
if has_vote {
return TxKind::Mixed;
}
}
}
}
if has_vote && !has_non_vote_non_budget {
TxKind::VoteOnly
} else if has_vote {
TxKind::Mixed
} else {
TxKind::NonVote
}
}
pub(crate) fn classify_provider_transaction_kind_view<D: TransactionData>(
view: &SanitizedTransactionView<D>,
) -> TxKind {
let mut has_vote = false;
let mut has_non_vote_non_budget = false;
for (program_id, _) in view.program_instructions_iter() {
if *program_id == vote::id() {
has_vote = true;
if has_non_vote_non_budget {
return TxKind::Mixed;
}
continue;
}
if *program_id != compute_budget::id() {
has_non_vote_non_budget = true;
if has_vote {
return TxKind::Mixed;
}
}
}
if has_vote && !has_non_vote_non_budget {
TxKind::VoteOnly
} else if has_vote {
TxKind::Mixed
} else {
TxKind::NonVote
}
}
fn emit_provider_source_removed(
sender: &ProviderStreamSender,
source: ProviderSourceIdentity,
readiness: ProviderSourceReadiness,
message: String,
release: Option<ProviderSourceReleaseGuard>,
) -> Option<ProviderSourceReleaseGuard> {
let event = ProviderStreamUpdate::Health(ProviderSourceHealthEvent {
source,
readiness,
status: ProviderSourceHealthStatus::Removed,
reason: ProviderSourceHealthReason::SourceRemoved,
message,
});
match sender.try_send(event) {
Ok(()) | Err(mpsc::error::TrySendError::Closed(_)) => release,
Err(mpsc::error::TrySendError::Full(update)) => {
let sender = sender.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
drop(sender.send(update).await);
drop(release);
});
None
} else {
std::thread::spawn(move || {
let mut pending_update = update;
for _ in 0..SOURCE_REMOVED_NO_RUNTIME_RETRY_ATTEMPTS {
match sender.try_send(pending_update) {
Ok(()) | Err(mpsc::error::TrySendError::Closed(_)) => {
drop(release);
return;
}
Err(mpsc::error::TrySendError::Full(retried_update)) => {
pending_update = retried_update;
std::thread::sleep(SOURCE_REMOVED_NO_RUNTIME_RETRY_DELAY);
}
}
}
drop(release);
});
None
}
}
}
}
#[cfg(any(feature = "provider-grpc", feature = "provider-websocket"))]
pub(crate) fn emit_provider_source_removed_with_reservation(
sender: &ProviderStreamSender,
source: ProviderSourceIdentity,
readiness: ProviderSourceReadiness,
message: String,
reservation: Option<Arc<ProviderSourceReservation>>,
) {
drop(emit_provider_source_removed(
sender,
source,
readiness,
message,
reservation.map(ProviderSourceReleaseGuard::for_reservation),
));
}
#[cfg(feature = "provider-grpc")]
pub mod yellowstone;
#[cfg(feature = "provider-grpc")]
pub mod laserstream;
#[cfg(feature = "provider-websocket")]
pub mod websocket;
#[cfg(all(test, any(feature = "provider-grpc", feature = "provider-websocket")))]
mod tests {
use super::*;
use solana_instruction::Instruction;
use solana_keypair::Keypair;
use solana_message::{Message, VersionedMessage};
use solana_sdk_ids::system_program;
use solana_signer::Signer;
use std::{sync::Arc, time::Instant};
use tokio::runtime::Runtime;
use tokio::time::{Duration, sleep, timeout};
fn profile_iterations(default: usize) -> usize {
std::env::var("SOF_PROFILE_ITERATIONS")
.ok()
.and_then(|value| value.parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
fn sample_mixed_transaction() -> VersionedTransaction {
let signer = Keypair::new();
let mut instructions = Vec::with_capacity(34);
instructions.push(Instruction::new_with_bytes(vote::id(), &[], vec![]));
instructions.push(Instruction::new_with_bytes(
system_program::id(),
&[],
vec![],
));
for _ in 0..32 {
instructions.push(Instruction::new_with_bytes(
compute_budget::id(),
&[],
vec![],
));
}
let message = Message::new(&instructions, Some(&signer.pubkey()));
VersionedTransaction::try_new(VersionedMessage::Legacy(message), &[&signer]).expect("tx")
}
fn classify_provider_transaction_kind_baseline(tx: &VersionedTransaction) -> TxKind {
let mut has_vote = false;
let mut has_non_vote_non_budget = false;
let keys = tx.message.static_account_keys();
for instruction in tx.message.instructions() {
if let Some(program_id) = keys.get(usize::from(instruction.program_id_index)) {
if *program_id == vote::id() {
has_vote = true;
continue;
}
if *program_id != compute_budget::id() {
has_non_vote_non_budget = true;
}
}
}
if has_vote && !has_non_vote_non_budget {
TxKind::VoteOnly
} else if has_vote {
TxKind::Mixed
} else {
TxKind::NonVote
}
}
#[test]
fn classify_provider_transaction_kind_detects_mixed() {
let tx = sample_mixed_transaction();
assert_eq!(classify_provider_transaction_kind(&tx), TxKind::Mixed);
}
#[test]
fn reserved_provider_sender_binds_reserved_source_identity() {
let (fan_in, mut rx) = create_provider_stream_fan_in(4);
let sender = fan_in
.sender_for_source(ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("custom")),
"source-a",
))
.expect("reserve source");
let other_source = ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("other")),
"source-b",
);
Runtime::new().expect("runtime").block_on(async move {
sender
.send(ProviderStreamUpdate::RecentBlockhash(
ObservedRecentBlockhashEvent {
slot: 7,
recent_blockhash: solana_hash::Hash::new_unique().to_bytes(),
dataset_tx_count: 0,
provider_source: Some(Arc::new(other_source)),
},
))
.await
.expect("send");
let update = rx.recv().await.expect("provider update");
let ProviderStreamUpdate::RecentBlockhash(event) = update else {
panic!("expected recent blockhash update");
};
let source = event.provider_source.expect("bound provider source");
assert_eq!(source.kind_str(), "custom");
assert_eq!(source.instance_str(), "source-a");
});
}
#[test]
fn reserved_provider_sender_emits_removed_health_on_drop() {
let (fan_in, mut rx) = create_provider_stream_fan_in(4);
let source = ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("custom")),
"source-a",
);
let sender = fan_in
.sender_for_source(source.clone())
.expect("reserve source");
Runtime::new().expect("runtime").block_on(async move {
sender
.send(ProviderStreamUpdate::Health(ProviderSourceHealthEvent {
source: source.clone(),
readiness: ProviderSourceReadiness::Required,
status: ProviderSourceHealthStatus::Healthy,
reason: ProviderSourceHealthReason::SubscriptionAckReceived,
message: "source subscription acknowledged".to_owned(),
}))
.await
.expect("send health");
drop(sender);
let first = rx.recv().await.expect("health update");
let ProviderStreamUpdate::Health(first) = first else {
panic!("expected first health update");
};
assert_eq!(first.source, source);
assert_eq!(first.status, ProviderSourceHealthStatus::Healthy);
let second = rx.recv().await.expect("removed update");
let ProviderStreamUpdate::Health(second) = second else {
panic!("expected removed health update");
};
assert_eq!(second.source, source);
assert_eq!(second.status, ProviderSourceHealthStatus::Removed);
assert_eq!(second.reason, ProviderSourceHealthReason::SourceRemoved);
});
}
#[test]
fn reserved_provider_sender_defers_identity_reuse_until_removed_is_enqueued() {
let (fan_in, mut rx) = create_provider_stream_fan_in(1);
let source = ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("custom")),
"source-a",
);
let sender = fan_in
.sender_for_source(source.clone())
.expect("reserve source");
Runtime::new().expect("runtime").block_on(async move {
sender
.send(ProviderStreamUpdate::Health(ProviderSourceHealthEvent {
source: source.clone(),
readiness: ProviderSourceReadiness::Required,
status: ProviderSourceHealthStatus::Healthy,
reason: ProviderSourceHealthReason::SubscriptionAckReceived,
message: "source subscription acknowledged".to_owned(),
}))
.await
.expect("send health");
drop(sender);
assert!(
fan_in.sender_for_source(source.clone()).is_err(),
"identity should stay reserved until removed is queued"
);
let first = rx.recv().await.expect("health update");
let ProviderStreamUpdate::Health(first) = first else {
panic!("expected first health update");
};
assert_eq!(first.status, ProviderSourceHealthStatus::Healthy);
let second = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("removed update should arrive")
.expect("removed health update");
let ProviderStreamUpdate::Health(second) = second else {
panic!("expected removed health update");
};
assert_eq!(second.status, ProviderSourceHealthStatus::Removed);
assert_eq!(second.reason, ProviderSourceHealthReason::SourceRemoved);
timeout(Duration::from_secs(1), async {
loop {
if fan_in.sender_for_source(source.clone()).is_ok() {
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("identity released after removed is enqueued");
});
}
#[test]
fn reserved_provider_sender_drop_outside_runtime_does_not_panic_when_queue_is_full() {
let (fan_in, mut rx) = create_provider_stream_fan_in(1);
let source = ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("custom")),
"source-a",
);
let sender = fan_in
.sender_for_source(source.clone())
.expect("reserve source");
Runtime::new().expect("runtime").block_on(async {
sender
.send(ProviderStreamUpdate::Health(ProviderSourceHealthEvent {
source: source.clone(),
readiness: ProviderSourceReadiness::Required,
status: ProviderSourceHealthStatus::Healthy,
reason: ProviderSourceHealthReason::SubscriptionAckReceived,
message: "source subscription acknowledged".to_owned(),
}))
.await
.expect("send health");
});
let mut sender = Some(sender);
let drop_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
drop(sender.take().expect("reserved sender"));
}));
assert!(
drop_result.is_ok(),
"dropping a reserved generic sender outside a tokio runtime should not panic"
);
Runtime::new().expect("runtime").block_on(async move {
let first = rx.recv().await.expect("health update");
let ProviderStreamUpdate::Health(first) = first else {
panic!("expected first health update");
};
assert_eq!(first.status, ProviderSourceHealthStatus::Healthy);
let second = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("removed update should arrive")
.expect("removed health update");
let ProviderStreamUpdate::Health(second) = second else {
panic!("expected removed health update");
};
assert_eq!(second.status, ProviderSourceHealthStatus::Removed);
assert_eq!(second.reason, ProviderSourceHealthReason::SourceRemoved);
timeout(Duration::from_secs(1), async {
loop {
if fan_in.sender_for_source(source.clone()).is_ok() {
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("identity released after background removal send");
});
}
#[test]
fn reserved_provider_sender_rejects_removed_health_while_alive() {
let (fan_in, mut rx) = create_provider_stream_fan_in(4);
let source = ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("custom")),
"source-a",
);
let sender = fan_in
.sender_for_source(source.clone())
.expect("reserve source");
Runtime::new().expect("runtime").block_on(async move {
let error = sender
.send(ProviderStreamUpdate::Health(ProviderSourceHealthEvent {
source: source.clone(),
readiness: ProviderSourceReadiness::Required,
status: ProviderSourceHealthStatus::Removed,
reason: ProviderSourceHealthReason::SourceRemoved,
message: "should be rejected".to_owned(),
}))
.await
.expect_err("reserved sender should reject removed health while alive");
let removed_update = error.0;
let ProviderStreamUpdate::Health(removed) = removed_update else {
panic!("expected removed health update in send error");
};
assert_eq!(removed.source, source);
assert_eq!(removed.status, ProviderSourceHealthStatus::Removed);
assert!(
timeout(Duration::from_millis(50), rx.recv()).await.is_err(),
"rejected removed update must not reach the provider queue"
);
});
}
#[test]
fn reserved_provider_sender_releases_identity_after_bounded_no_runtime_cleanup_retry() {
let (fan_in, _rx) = create_provider_stream_fan_in(1);
let source = ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("custom")),
"source-a",
);
let sender = fan_in
.sender_for_source(source.clone())
.expect("reserve source");
fan_in
.sender()
.try_send(ProviderStreamUpdate::Health(ProviderSourceHealthEvent {
source: ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("other")),
"other-source",
),
readiness: ProviderSourceReadiness::Optional,
status: ProviderSourceHealthStatus::Healthy,
reason: ProviderSourceHealthReason::SubscriptionAckReceived,
message: "occupied".to_owned(),
}))
.expect("fill provider queue");
let drop_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
drop(sender);
}));
assert!(drop_result.is_ok(), "drop outside runtime should not panic");
std::thread::sleep(Duration::from_millis(
(SOURCE_REMOVED_NO_RUNTIME_RETRY_ATTEMPTS as u64 * 5) + 100,
));
fan_in
.sender_for_source(source)
.expect("identity released after bounded no-runtime cleanup retry");
}
#[test]
fn provider_source_task_guard_emits_removed_health_on_drop() {
let (tx, mut rx) = create_provider_stream_queue(4);
let source = ProviderSourceIdentity::new(
ProviderSourceId::Generic(Arc::<str>::from("custom")),
"source-a",
);
{
let _guard = ProviderSourceTaskGuard::new(
tx,
source.clone(),
ProviderSourceReadiness::Required,
None,
);
}
Runtime::new().expect("runtime").block_on(async move {
let update = rx.recv().await.expect("provider update");
let ProviderStreamUpdate::Health(event) = update else {
panic!("expected health update");
};
assert_eq!(event.source, source);
assert_eq!(event.status, ProviderSourceHealthStatus::Removed);
assert_eq!(event.reason, ProviderSourceHealthReason::SourceRemoved);
});
}
#[test]
#[ignore = "profiling fixture for provider transaction kind classification A/B"]
fn provider_transaction_kind_profile_fixture() {
let iterations = profile_iterations(1_000_000);
let tx = sample_mixed_transaction();
let baseline_started = Instant::now();
for _ in 0..iterations {
std::hint::black_box(classify_provider_transaction_kind_baseline(&tx));
}
let baseline_elapsed = baseline_started.elapsed();
let optimized_started = Instant::now();
for _ in 0..iterations {
std::hint::black_box(classify_provider_transaction_kind(&tx));
}
let optimized_elapsed = optimized_started.elapsed();
eprintln!(
"provider_transaction_kind_profile_fixture iterations={} baseline_us={} optimized_us={}",
iterations,
baseline_elapsed.as_micros(),
optimized_elapsed.as_micros(),
);
}
#[test]
#[ignore = "profiling fixture for baseline provider tx kind classification"]
fn provider_transaction_kind_baseline_profile_fixture() {
let iterations = profile_iterations(1_000_000);
let tx = sample_mixed_transaction();
for _ in 0..iterations {
std::hint::black_box(classify_provider_transaction_kind_baseline(&tx));
}
}
#[test]
#[ignore = "profiling fixture for optimized provider tx kind classification"]
fn provider_transaction_kind_optimized_profile_fixture() {
let iterations = profile_iterations(1_000_000);
let tx = sample_mixed_transaction();
for _ in 0..iterations {
std::hint::black_box(classify_provider_transaction_kind(&tx));
}
}
}