Skip to main content

yellowstone_block_machine/
lib.rs

1//! Yellowstone Block Machine
2//!
3//! This crate provides a block machine implementation that processes Solana Geyser events such that block reconstruction
4//! is done right!
5//!
6//! The block machine processes Geyser events and produces blocks that contain all transactions and account states
7//! that were part of the block when it was originally produced by the Solana node.
8//!
9//! The block machine also detects forks and dead blocks, and provides slot commitment updates.
10//!
11//! The block machine can be used as a library or as a standalone application (dragonsmouth).
12//!
13//! # Challenge rebuilding blocks from Geyser events
14//!
15//! Trying to rebuild a block from Geyser events can be challenging due to the multiple rules and edge cases
16//! surrounding slot lifecycle events and commitment level updates.
17//!
18//! The bootstrapping problem: typically you receive geyser event via Dragonsmouth's gRPC interface.
19//! However, the first couple of events you receive may be in the middle of a slot, or even at the end of a slot.
20//! You would have to discard these events since you don't know if you missed any events at the beginning of the slot.
21//!
22//! To make sure you have received all events for a slot, you must make sure your program detected
23//! either SlotStatus::FIRST_SHRED_RECEIVED or SlotStatus::BANK_CREATED for the slot, otherwise
24//! you may have missed some events at the beginning of the slot.
25//!
26//! To better understand the slot lifecycle, the following section provides a detailed overview of the various
27//! slot status events and their significance.
28//!
29//! # Intra-slot Update
30//!
31//! ```ignore
32//! enum SlotStatus {
33//!   ...
34//!   SLOT_FIRST_SHRED_RECEIVED = 3;
35//!   SLOT_COMPLETED = 4;
36//!   SLOT_CREATED_BANK = 5;
37//!   SLOT_DEAD = 6;
38//! }
39//! ```
40//!
41//!
42//! - SLOT_FIRST_SHRED_RECEIVED: The remote RPC node you're connected to has received the first shred of a given slot. This does not indicate it has been replayed yet. This event occurs during the retransmit stage in the TVU.
43//! - SLOT_CREATED_BANK: A bank for the given slot has been created on the remote RPC node you're connected to. Within a validator, a Bank acts as an isolated execution environment during the replay stage (which follows the retransmit stage). Due to the decentralized nature of blockchains, forks are inevitable, meaning a slot can have multiple descendants.
44//!   To handle this, validators must be capable of replaying multiple slots that share the same ancestor without their execution interfering with one another. Each slot is assigned its own Bank instance, and these Banks form a fork graph, where each edge represents a parent-child relationship between two banks.
45//!   Banks serve as self-contained execution contexts, maintaining replay results and essential metadata about the slot and its lineage. Importantly, a Bank is instantiated once per slot.
46//! - SLOT_COMPLETED: All the shreds for the given slot have been received by the RPC node you're connected to. However, this does not necessarily mean that the slot has been fully replayed yet.
47//! - SLOT_DEAD: Dead slots are slots that have been rejected by the validator for various reasons, such as invalid transaction signatures in the leader's shreds, incorrect entry hashes during Proof of History (PoH) verification, or an unexpected number of entries in the slot. When a slot is marked as dead, it is discarded by the network as a whole and effectively skipped. This can occur at any point during the replay process, even after the slot has been marked as 'completed'.
48//!
49//! Here's a "simplfied" overview of the expected lifecycle of a slot:\
50//!                                                                                                                                             
51//!                                                                                                                                       
52//!                                                                                                                                       
53//!                                     TIME ->                                                                                           
54//! ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────►   
55//! ┌───────────────────────────────────────────────────────┐                                                                             
56//! │ Slot download                                         │                                                                             
57//! │ ┌───────────┐┌──────┐         ┌───────┐┌───────────┐  │                                                                             
58//! │ │FIRST_SHRED││SHRED2│  ...    │SHRED N││ COMPLETED │  │                                                                             
59//! │ │ RECEIVED  │└──────┘         └───────┘└───────────┘  │                                                                             
60//! │ └───────────                                          │                                                                             
61//! └──────────────┌───────────────────────────────────────────────────────────────────────────────┐                                      
62//!                │ REPLAY STAGE                                                                  │                                      
63//!                │┌─────────────┐ ┌──────────────┐ ┌───┌───┐┌──────┐    ┌──────────┐ ┌─────────┐ │                                      
64//!                ││BANK_CREATED │ │ACCOUNT UPDATE│ │TX1│TX2││ENTRY1│... │BLOCK_META│ │PROCESSED│ │                                      
65//!                │└─────────────┘ └──────────────┘ └───└───┘└──────┘    └──────────┘ └─────────┘ │                                      
66//!                │                                                                               │                                      
67//!                └───────────────────────────────────────────────────────────────────────────────┘                                      
68//!                                                                                       ┌──────────────────────────────────┐    
69//!                                                                                       │ CONSENSUS                        │    
70//!                                                                                       │ ┌──────────┐      ┌───────────┐  │    
71//!                                                                                       │ │CONFIRMED │      │FINALIZED  │  │    
72//!                                                                                       │ └──────────┘      └───────────┘  │    
73//!                                                                                       │                                  │    
74//!                                                                                       └──────────────────────────────────┘    
75//!                                                                                                                                       
76//!
77//!
78//!
79//! ## IMPORTANT QUIRKS
80//!
81//! - Sometimes, BANK_CREATED is received before FIRST_SHRED_RECEIVED. This is because of internal Agave logic which sends
82//!   FIRST_SHRED_RECEIVED on a queue and BANK_CREATED directly via a callback.
83//! - DEAD slots can be received at any time, even after COMPLETED.
84//! - COMPLETED can be received after BlockMeta and any Commitment Level status update.
85//! - A slot can receive PROCESSED or CONFIRMED before BlockMeta.
86//!
87//!
88//! # Dragonsmouth Integration Examples
89//!
90//! The easiest way to use the block machine is via the dragonsmouth module, which provides
91//! integration with the dragonsmouth gRPC interface.
92//!
93//!
94//! ```ignore
95//! #[derive(Debug, Clone, serde::Deserialize)]
96//! struct Config {
97//!     endpoint: String,
98//!     x_token: Option<String>,
99//! }
100//!
101//! async fn process_block<W>(
102//!     mut block_recv: mpsc::Receiver<Result<BlockMachineOutput, BlockMachineError>>,
103//!     sample: usize,
104//!     mut out: W,
105//! ) where
106//!     W: std::io::Write,
107//! {
108//!     let mut i = 0;
109//!     while let Some(result) = block_recv.recv().await {
110//!         match result {
111//!             Ok(output) => match output {
112//!                 BlockMachineOutput::Block(block) => {
113//!                     let n = block.len();
114//!                     let slot = block.slot;
115//!                     let account_cnt = block.account_len();
116//!                     let txn_cnt = block.txn_len();
117//!                     let entry_cnt = block.entry_len();
118//!                     writeln!(out, "Block {slot} len: {n}, {txn_cnt} tx, {account_cnt} accounts, {entry_cnt} entries").expect("write");
119//!                     i += 1;
120//!                 }
121//!                 BlockMachineOutput::SlotCommitmentUpdate(slot_commitment_status_update) => {
122//!                     writeln!(
123//!                         out,
124//!                         "SlotCommitmentUpdate: {:?}",
125//!                         slot_commitment_status_update
126//!                     )
127//!                     .expect("write");
128//!                 }
129//!                 BlockMachineOutput::ForkDetected(fork_detected) => {
130//!                     writeln!(out, "ForkDetected: {}", fork_detected.slot).expect("write");
131//!                 }
132//!                 BlockMachineOutput::DeadBlockDetect(dead_block_detected) => {
133//!                     writeln!(out, "DeadBlockDetect: {}", dead_block_detected.slot).expect("write");
134//!                 }
135//!             },
136//!             Err(e) => {
137//!                 writeln!(out, "BlockMachineError: {:?}", e).expect("write");
138//!                 break;
139//!             }
140//!         }
141//!         if i >= sample {
142//!             break;
143//!         }
144//!     }
145//! }
146//!
147//! #[tokio::main]
148//! async fn main() {
149//!     init_tracing();
150//!     let args = Args::parse();
151//!     let config: Config =
152//!         serde_yaml::from_reader(std::fs::File::open(args.config).unwrap()).expect("open config");
153//!     let endpoint = config.endpoint;
154//!     let x_token = config.x_token;
155//!     let mut geyser = GeyserGrpcBuilder::from_shared(endpoint)
156//!         .expect("Failed to parse endpoint")
157//!         .x_token(x_token)
158//!         .expect("x_token")
159//!         .tls_config(ClientTlsConfig::new().with_native_roots())
160//!         .expect("tls_config")
161//!         .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
162//!         .connect()
163//!         .await
164//!         .expect("Failed to connect to geyser");
165//!
166//!     // This request listen for all account updates and transaction updates
167//!     let request = SubscribeRequest {
168//!         accounts: hash_map! {
169//!             "test".to_string() => Default::default(),
170//!         },
171//!         transactions: hash_map! {
172//!             "test".to_string() => Default::default(),
173//!         },
174//!         entry: hash_map! {
175//!             "test".to_string() => Default::default(),
176//!         },
177//!         commitment: Some(CommitmentLevel::Confirmed as i32),
178//!         ..Default::default()
179//!     };
180//!
181//!     let block_machine_rx = geyser
182//!         .subscribe_block(request)
183//!         .await
184//!         .expect("subscribe_block");
185//!     process_block(block_machine_rx, args.samples, std::io::stdout()).await;
186//! }
187//! ```
188//!
189//! # How to build your custom block machine integration
190//!
191//! You can build your own block machine integration by building a driver that feeds [`yellowstone_block_machine::state_machine::BlockSM`] instance.
192//!
193//! You must feed the state machine with every:
194//! 1. Block entries
195//! 2. Block meta summary (FINAL message to make the block freeze)
196//! 3. Slot lifecycle events (SLOT_FIRST_SHRED_RECEIVED, SLOT_COMPLETED, SLOT_CREATED_BANK, SLOT_DEAD)
197//! 4. Commitment level updates (PROCESSED, CONFIRMED, FINALIZED)
198//!
199//!
200//! ```ignore
201//! struct DragonsmouthBlockMachine {
202//!     minimum_commitment_level: CommitmentLevel,
203//!     block_storage: InMemoryBlockStore,
204//!     sm: BlockSM,
205//! }
206//!
207//! impl From<SubscribeUpdateEntry> for EntryInfo {
208//!     fn from(value: SubscribeUpdateEntry) -> Self {
209//!         Self {
210//!             entry_hash: Hash::new_from_array(value.hash.try_into().expect("entry format")),
211//!             slot: value.slot,
212//!             entry_index: value.index,
213//!             starting_txn_index: value.starting_transaction_index,
214//!             executed_txn_count: value.executed_transaction_count,
215//!         }
216//!     }
217//! }
218//!
219//! fn compare_commitment(cl1: CommitmentLevel, cl2: CommitmentLevel) -> Ordering {
220//!     match (cl1, cl2) {
221//!         (CommitmentLevel::Processed, CommitmentLevel::Processed) => Ordering::Equal,
222//!         (CommitmentLevel::Confirmed, CommitmentLevel::Confirmed) => Ordering::Equal,
223//!         (CommitmentLevel::Finalized, CommitmentLevel::Finalized) => Ordering::Equal,
224//!         (CommitmentLevel::Processed, _) => Ordering::Less,
225//!         (CommitmentLevel::Confirmed, CommitmentLevel::Processed) => Ordering::Greater,
226//!         (CommitmentLevel::Finalized, CommitmentLevel::Processed) => Ordering::Greater,
227//!         (CommitmentLevel::Finalized, CommitmentLevel::Confirmed) => Ordering::Greater,
228//!         (CommitmentLevel::Confirmed, CommitmentLevel::Finalized) => Ordering::Less,
229//!     }
230//! }
231//!
232//! impl DragonsmouthBlockMachine {
233//!     fn handle_block_entry(&mut self, entry: SubscribeUpdateEntry) {
234//!         let entry_info: EntryInfo = entry.into();
235//!         self.sm.process_event(entry_info.into());
236//!     }
237//!
238//!     fn handle_slot_update(&mut self, slot_update: &SubscribeUpdateSlot) {
239//!         let slot_status = slot_update.status();
240//!         const LIFE_CYCLE_STATUS: [SlotStatus; 4] = [
241//!             SlotStatus::SlotFirstShredReceived,
242//!             SlotStatus::SlotCompleted,
243//!             SlotStatus::SlotCreatedBank,
244//!             SlotStatus::SlotDead,
245//!         ];
246//!
247//!         if LIFE_CYCLE_STATUS.contains(&slot_status) {
248//!             let lifecycle_update = SlotLifecycleUpdate {
249//!                 slot: slot_update.slot,
250//!                 parent_slot: slot_update.parent,
251//!                 stage: match slot_status {
252//!                     SlotStatus::SlotFirstShredReceived => SlotLifecycle::FirstShredReceived,
253//!                     SlotStatus::SlotCompleted => SlotLifecycle::Completed,
254//!                     SlotStatus::SlotCreatedBank => SlotLifecycle::CreatedBank,
255//!                     SlotStatus::SlotDead => SlotLifecycle::Dead,
256//!                     _ => unreachable!(),
257//!                 },
258//!             };
259//!             self.sm.process_event(lifecycle_update.into());
260//!         } else {
261//!             let commitment_level_update = SlotCommitmentStatusUpdate {
262//!                 parent_slot: slot_update.parent,
263//!                 slot: slot_update.slot,
264//!                 commitment: match slot_status {
265//!                     SlotStatus::SlotProcessed => CommitmentLevel::Processed,
266//!                     SlotStatus::SlotConfirmed => CommitmentLevel::Confirmed,
267//!                     SlotStatus::SlotFinalized => CommitmentLevel::Finalized,
268//!                     _ => unreachable!(),
269//!                 },
270//!             };
271//!
272//!             self.sm.process_event(commitment_level_update.into());
273//!         }
274//!     }
275//!
276//!     fn handle_block_meta(&mut self, block_meta: SubscribeUpdateBlockMeta) {
277//!         let bh = bs58::decode(block_meta.blockhash)
278//!             .into_vec()
279//!             .expect("blockhash format");
280//!         let block_summary = BlockSummary {
281//!             slot: block_meta.slot,
282//!             entry_count: block_meta.entries_count,
283//!             executed_transaction_count: block_meta.executed_transaction_count,
284//!             blockhash: Hash::new_from_array(bh.try_into().expect("blockhash length")),
285//!         };
286//!         self.sm.process_event(block_summary.into());
287//!         // Currently not used in block reconstruction
288//!     }
289//!     
290//!     ///
291//!     /// Main Logic to handle incoming geyser events and feed the block state machine
292//!     ///
293//!     fn handle_new_geyser_event(&mut self, event: SubscribeUpdate) {
294//!         let SubscribeUpdate {
295//!             filters,
296//!             created_at,
297//!             update_oneof,
298//!         } = event;
299//!         let Some(update_oneof) = update_oneof else {
300//!             return;
301//!         };
302//!         match update_oneof {
303//!             UpdateOneof::Account(acc) => {
304//!                 let slot = acc.slot;
305//!                 let subscribe_update = SubscribeUpdate {
306//!                     filters,
307//!                     created_at,
308//!                     update_oneof: Some(UpdateOneof::Account(acc)),
309//!                 };
310//!                 self.block_storage.insert_block_data(slot, subscribe_update);
311//!             }
312//!             UpdateOneof::Slot(subscribe_update_slot) => {
313//!                 self.handle_slot_update(&subscribe_update_slot);
314//!             }
315//!             UpdateOneof::Transaction(tx) => {
316//!                 let slot = tx.slot;
317//!                 let subscribe_update = SubscribeUpdate {
318//!                     filters,
319//!                     created_at,
320//!                     update_oneof: Some(UpdateOneof::Transaction(tx)),
321//!                 };
322//!                 self.block_storage.insert_block_data(slot, subscribe_update);
323//!             }
324//!             UpdateOneof::BlockMeta(subscribe_update_block_meta) => {
325//!                 self.handle_block_meta(subscribe_update_block_meta);
326//!             }
327//!             UpdateOneof::Entry(subscribe_update_entry) => {
328//!                 let slot = subscribe_update_entry.slot;
329//!                 self.handle_block_entry(subscribe_update_entry.clone());
330//!                 if filters.iter().any(|f| f.as_str() != RESERVED_FILTER_NAME) {
331//!                     let subscribe_update = SubscribeUpdate {
332//!                         filters,
333//!                         created_at,
334//!                         update_oneof: Some(UpdateOneof::Entry(subscribe_update_entry)),
335//!                     };
336//!                     self.block_storage.insert_block_data(slot, subscribe_update);
337//!                 }
338//!             }
339//!             _ => {
340//!                 tracing::trace!("Unsupported update type received: {:?}", update_oneof);
341//!             }
342//!         }
343//!     }
344//!     
345//!     ///
346//!     /// Handle a block machine output event from the block state machine
347//!     /// This function will handle the event and produce BlockMachineOutput events
348//!     ///
349//!     fn handle_blockstore_output<Ext>(&mut self, ev: BlockStateMachineOuput, out: &mut Ext)
350//!     where
351//!         Ext: Extend<BlockMachineOutput>,
352//!     {
353//!         match ev {
354//!             BlockStateMachineOuput::FrozenBlock(info) => {
355//!                 self.block_storage.mark_block_as_frozen(info.slot);
356//!             }
357//!             BlockStateMachineOuput::SlotStatus(st) => {
358//!                 let ord = compare_commitment(st.commitment, self.minimum_commitment_level);
359//!                 if ord == Ordering::Greater || ord == Ordering::Equal {
360//!                     let block = self.block_storage.remove_slot(st.slot);
361//!                     if let Some(block_replay) = block {
362//!                         out.extend([block_replay.into()]);
363//!                     } else {
364//!                         tracing::trace!("No block replay found for slot {}", st.slot);
365//!                     }
366//!                     let commitment_update = SlotCommitmentStatusUpdate {
367//!                         slot: st.slot,
368//!                         parent_slot: st.parent_slot,
369//!                         commitment: st.commitment,
370//!                     };
371//!                     out.extend([commitment_update.into()]);
372//!                 }
373//!             }
374//!             BlockStateMachineOuput::ForksDetected(slot) => {
375//!                 out.extend([BlockMachineOutput::ForkDetected(slot)]);
376//!             }
377//!             BlockStateMachineOuput::DeadSlotDetected(info) => {
378//!                 out.extend([BlockMachineOutput::DeadBlockDetect(info)]);
379//!             }
380//!         }
381//!     }
382//!
383//!     fn drain_unprocess_bm_output<Ext>(&mut self, out: &mut Ext)
384//!     where
385//!         Ext: Extend<BlockMachineOutput>,
386//!     {
387//!         while let Some(ev) = self.sm.pop_next_unprocess_blockstore_update() {
388//!             self.handle_blockstore_output(ev, out);
389//!         }
390//!     }
391//! }
392//! ```
393//!
394//! # Feature flags
395//! - `dragonsmouth`: Enables the dragonsmouth subscription of geyser events as the blockmachine input source.
396//!
397//!
398#[cfg(feature = "dragonsmouth-thin")]
399pub mod dragonsmouth;
400pub mod forks;
401pub mod state_machine;
402#[cfg(test)]
403pub mod testkit;