Module provider_stream
Expand description
Processed provider-stream ingress types and adapters. Processed provider-stream ingress surfaces for SOF.
Use this module when the upstream source is already beyond raw shreds, for example Yellowstone gRPC or LaserStream-style processed transaction feeds. These updates bypass SOF’s packet, shred, FEC, and reconstruction stages and enter directly at the plugin/derived-state transaction layer.
Built-in mode capability summary:
YellowstoneGrpc: built-in adapter emitson_transactionLaserStream: built-in adapter emitson_transactionWebsocketTransaction: built-in adapter emitson_transaction
Generic provider producers may still enqueue TransactionViewBatch,
RecentBlockhash, SlotStatus, ClusterTopology, LeaderSchedule, or
Reorg updates directly.
ProviderStreamMode::Generic is SOF’s typed custom-adapter mode.
Your producer ingests an upstream format and maps it into one of the
ProviderStreamUpdate variants below before handing it to SOF.
Variant-to-runtime mapping:
Transaction:- drives
on_transaction - drives derived-state transaction apply when enabled
- synthesizes
on_recent_blockhashfrom the transaction message when that hook is requested
- drives
SerializedTransaction:- same transaction-family path, but lets SOF prefilter before full decode
TransactionLog:- drives
on_transaction_log
- drives
TransactionViewBatch:- drives
on_transaction_view_batch
- drives
RecentBlockhash:- drives
on_recent_blockhash
- drives
SlotStatus:- drives
on_slot_status
- drives
ClusterTopology:- drives
on_cluster_topology
- drives
LeaderSchedule:- drives
on_leader_schedule
- drives
Reorg:- drives
on_reorg
- drives
Health:- updates provider health/readiness/observability
- does not dispatch into plugin hooks
§Feed Provider Transactions Into SOF
use std::sync::Arc;
use solana_hash::Hash;
use solana_keypair::Keypair;
use solana_message::{Message, VersionedMessage};
use solana_signer::Signer;
use solana_transaction::versioned::VersionedTransaction;
use sof::{
event::{TxCommitmentStatus, TxKind},
framework::TransactionEvent,
provider_stream::{
create_provider_stream_queue, ProviderStreamMode, ProviderStreamUpdate,
},
runtime::ObserverRuntime,
};
let (tx, rx) = create_provider_stream_queue(128);
let signer = Keypair::new();
let message = Message::new(&[], Some(&signer.pubkey()));
let transaction = VersionedTransaction::try_new(VersionedMessage::Legacy(message), &[&signer])?;
tx.send(ProviderStreamUpdate::Transaction(TransactionEvent {
slot: 1,
commitment_status: TxCommitmentStatus::Processed,
confirmed_slot: None,
finalized_slot: None,
signature: transaction.signatures.first().copied(),
tx: Arc::new(transaction),
kind: TxKind::NonVote,
}))
.await?;
ObserverRuntime::new()
.with_provider_stream_ingress(ProviderStreamMode::Generic, rx)
.run_until(async {})
.await?;Structs§
- Provider
Source Health Event - One provider source health transition observed by SOF.
- Serialized
Transaction Event - One serialized provider-fed transaction that has not yet been materialized.
Enums§
- Provider
Replay Mode - Replay policy for processed provider transaction streams.
- Provider
Source Health Reason - Typed reason for one provider source health transition.
- Provider
Source Health Status - Health state for one provider source feeding SOF.
- Provider
Source Id - Stable provider source identifier used in runtime health reporting.
- Provider
Stream Mode - Identifies the processed provider family driving SOF’s direct plugin ingress.
- Provider
Stream Update - One processed provider-stream update accepted by SOF.
Constants§
- DEFAULT_
PROVIDER_ STREAM_ QUEUE_ CAPACITY - Default queue capacity for processed provider-stream ingress.
Functions§
- create_
provider_ stream_ queue - Creates one bounded queue for processed provider-stream updates.
Type Aliases§
- Provider
Stream Receiver - Receiver type for processed provider-stream ingress.
- Provider
Stream Sender - Sender type for processed provider-stream ingress.