Module provider_stream
Expand description
Processed provider-stream ingress types and adapters. Processed provider-stream ingress surfaces for SOF.
Use this module when the upstream source is already beyond raw shreds, for example Yellowstone gRPC or LaserStream-style processed transaction feeds. These updates bypass SOF’s packet, shred, FEC, and reconstruction stages and enter directly at the plugin/derived-state transaction layer.
Built-in mode capability summary:
YellowstoneGrpc: built-in Yellowstone transaction feedYellowstoneGrpcTransactionStatus: built-in Yellowstone transaction-status feedYellowstoneGrpcAccounts: built-in Yellowstone account-update feedYellowstoneGrpcBlockMeta: built-in Yellowstone block-meta feedYellowstoneGrpcSlots: built-in Yellowstone slot feedLaserStream: built-in LaserStream transaction feedLaserStreamTransactionStatus: built-in LaserStream transaction-status feedLaserStreamAccounts: built-in LaserStream account-update feedLaserStreamBlockMeta: built-in LaserStream block-meta feedLaserStreamSlots: built-in LaserStream slot feedWebsocketTransaction: built-in websockettransactionSubscribeWebsocketLogs: built-in websocketlogsSubscribeWebsocketAccount: built-in websocketaccountSubscribeWebsocketProgram: built-in websocketprogramSubscribe
Each built-in source config can report its matching runtime mode directly
through runtime_mode(). ProviderStreamMode::Generic remains the typed
custom-adapter path and the fan-in mode when you want to combine multiple
heterogeneous upstream sources into one runtime ingress.
Generic provider producers may still enqueue TransactionViewBatch,
BlockMeta, RecentBlockhash, SlotStatus, ClusterTopology,
LeaderSchedule, or Reorg updates directly.
ProviderStreamMode::Generic is SOF’s typed custom-adapter mode.
Your producer ingests an upstream format and maps it into one of the
ProviderStreamUpdate variants below before handing it to SOF.
Built-in provider source configs extend that same typed surface:
- websocket:
- [
websocket::WebsocketTransactionConfig] can target [websocket::WebsocketPrimaryStream::Transaction], [websocket::WebsocketPrimaryStream::Account], or [websocket::WebsocketPrimaryStream::Program] - [
websocket::WebsocketLogsConfig] targetslogsSubscribe
- [
- Yellowstone:
- [
yellowstone::YellowstoneGrpcConfig] can target [yellowstone::YellowstoneGrpcStream::Transaction], [yellowstone::YellowstoneGrpcStream::TransactionStatus], [yellowstone::YellowstoneGrpcStream::Accounts], or [yellowstone::YellowstoneGrpcStream::BlockMeta] - [
yellowstone::YellowstoneGrpcSlotsConfig] targets slot updates
- [
- LaserStream:
- [
laserstream::LaserStreamConfig] can target [laserstream::LaserStreamStream::Transaction], [laserstream::LaserStreamStream::TransactionStatus], [laserstream::LaserStreamStream::Accounts], or [laserstream::LaserStreamStream::BlockMeta] - [
laserstream::LaserStreamSlotsConfig] targets slot updates
- [
Those source selectors do not create a second runtime API. They extend the
existing provider config objects and emit the matching ProviderStreamUpdate
variants into the same runtime dispatch path.
Built-in configs may also set:
- a stable source instance label for observability via
with_source_instance(...) - whether one source is readiness-gating or optional via
with_readiness(...) - an operational role via
with_source_role(...) - an explicit arbitration priority via
with_source_priority(...) - duplicate handling via
with_source_arbitration(...)EmitAllkeeps the historical behavior and forwards overlapping provider events from every sourceFirstSeensuppresses later overlapping duplicates across sourcesFirstSeenThenPromotekeeps the first event immediate, but allows one later higher-priority duplicate to promote through
Generic multi-source producers can reserve one stable source identity with
[ProviderStreamFanIn::sender_for_source]. The returned
[ReservedProviderStreamSender] automatically attributes every update it
sends to that reserved provider source.
Generic producers can set the same source policy directly on
[ProviderSourceIdentity] with with_role(...), with_priority(...), and
with_arbitration(...).
Generic readiness becomes source-aware only after a custom producer emits
[ProviderStreamUpdate::Health] for that reserved source. Until then,
ProviderStreamMode::Generic falls back to progress-based readiness and
only knows that typed updates are flowing at all.
Fan-in duplicate arbitration is source-aware and keyed by the logical event, not just by queue position. That means two sources carrying the same transaction or control-plane item can now either:
- both dispatch (
EmitAll, the default) - dispatch once (
FirstSeen) - dispatch once immediately and then allow one later higher-priority source
to promote (
FirstSeenThenPromote)
Variant-to-runtime mapping:
Transaction:- drives
on_transaction - drives derived-state transaction apply when enabled
- synthesizes
on_recent_blockhashfrom the transaction message when that hook is requested
- drives
SerializedTransaction:- same transaction-family path, but lets SOF prefilter before full decode
TransactionLog:- drives
on_transaction_log
- drives
TransactionStatus:- drives
on_transaction_status
- drives
TransactionViewBatch:- drives
on_transaction_view_batch
- drives
AccountUpdate:- drives
on_account_update
- drives
BlockMeta:- drives
on_block_meta
- drives
RecentBlockhash:- drives
on_recent_blockhash
- drives
SlotStatus:- drives
on_slot_status
- drives
ClusterTopology:- drives
on_cluster_topology
- drives
LeaderSchedule:- drives
on_leader_schedule
- drives
Reorg:- drives
on_reorg
- drives
Health:- updates provider health/readiness/observability
- does not dispatch into plugin hooks
§Feed Provider Transactions Into SOF
use std::sync::Arc;
use solana_hash::Hash;
use solana_keypair::Keypair;
use solana_message::{Message, VersionedMessage};
use solana_signer::Signer;
use solana_transaction::versioned::VersionedTransaction;
use sof::{
event::{TxCommitmentStatus, TxKind},
framework::TransactionEvent,
provider_stream::{
create_provider_stream_queue, ProviderStreamMode, ProviderStreamUpdate,
},
runtime::ObserverRuntime,
};
let (tx, rx) = create_provider_stream_queue(128);
let signer = Keypair::new();
let message = Message::new(&[], Some(&signer.pubkey()));
let transaction = VersionedTransaction::try_new(VersionedMessage::Legacy(message), &[&signer])?;
tx.send(ProviderStreamUpdate::Transaction(TransactionEvent {
slot: 1,
commitment_status: TxCommitmentStatus::Processed,
confirmed_slot: None,
finalized_slot: None,
signature: transaction.signatures.first().copied(),
tx: Arc::new(transaction),
kind: TxKind::NonVote,
}))
.await?;
ObserverRuntime::new()
.with_provider_stream_ingress(ProviderStreamMode::Generic, rx)
.run_until(async {})
.await?;Structs§
- Provider
Source Health Event - One provider source health transition observed by SOF.
- Provider
Source Identity - Stable provider source instance identity used in runtime health and provider-origin events.
- Provider
Source Identity Registration Error - Duplicate provider source identity registration failure for multi-source fan-in.
- Provider
Stream FanIn - Helper for feeding one SOF provider queue from multiple provider sources.
- Reserved
Provider Stream Sender - One reserved provider source identity plus a sender bound to that reservation.
- Serialized
Transaction Event - One serialized provider-fed transaction that has not yet been materialized.
Enums§
- Provider
Replay Mode - Replay policy for processed provider transaction streams.
- Provider
Source Arbitration Mode - Duplicate arbitration mode for one provider source inside fan-in.
- Provider
Source Health Reason - Typed reason for one provider source health transition.
- Provider
Source Health Status - Health state for one provider source feeding SOF.
- Provider
Source Id - Stable provider source identifier used in runtime health reporting.
- Provider
Source Readiness - Readiness class for one provider source observed by SOF.
- Provider
Source Role - Relative operational role for one provider source inside a fan-in graph.
- Provider
Stream Mode - Identifies the processed provider family driving SOF’s direct plugin ingress.
- Provider
Stream Update - One processed provider-stream update accepted by SOF.
Constants§
- DEFAULT_
PROVIDER_ STREAM_ QUEUE_ CAPACITY - Default queue capacity for processed provider-stream ingress.
Functions§
- create_
provider_ stream_ fan_ in - Creates one bounded queue plus a typed helper for multi-source provider fan-in.
- create_
provider_ stream_ queue - Creates one bounded queue for processed provider-stream updates.
Type Aliases§
- Provider
Source Ref - Shared provider source identity carried by provider-origin events.
- Provider
Stream Receiver - Receiver type for processed provider-stream ingress.
- Provider
Stream Sender - Sender type for processed provider-stream ingress.