1pub mod constants;
46pub mod error;
47pub mod fork_choice;
48pub mod mempool;
49pub mod payload;
50pub mod tracing;
51pub mod vm;
52
53use ::tracing::{debug, error, info, instrument, warn};
54use constants::{AMSTERDAM_MAX_INITCODE_SIZE, MAX_INITCODE_SIZE, POST_OSAKA_GAS_LIMIT_CAP};
55use error::MempoolError;
56use error::{ChainError, InvalidBlockError};
57use ethrex_common::constants::{EMPTY_TRIE_HASH, MIN_BASE_FEE_PER_BLOB_GAS};
58
59use crossbeam::channel::{self as cb, TryRecvError, select};
60#[cfg(feature = "c-kzg")]
62use ethrex_common::types::EIP4844Transaction;
63#[cfg(feature = "c-kzg")]
64use ethrex_common::types::MAX_BLOB_TX_SIZE;
65use ethrex_common::types::MAX_TX_SIZE;
66use ethrex_common::types::block_access_list::BlockAccessList;
67use ethrex_common::types::block_execution_witness::ExecutionWitness;
68use ethrex_common::types::fee_config::FeeConfig;
69use ethrex_common::types::{
70 AccountInfo, AccountState, AccountUpdate, BalSynthesisItem, Block, BlockHash, BlockHeader,
71 BlockNumber, ChainConfig, Code, Receipt, Transaction, WrappedEIP4844Transaction,
72 synthesize_bal_updates, validate_block_body,
73};
74use ethrex_common::types::{ELASTICITY_MULTIPLIER, P2PTransaction};
75use ethrex_common::types::{Fork, MempoolTransaction};
76use ethrex_common::utils::keccak;
77use ethrex_common::{Address, H256, TrieLogger, U256};
78pub use ethrex_common::{
79 get_total_blob_gas, validate_block_access_list_hash, validate_block_pre_execution,
80 validate_gas_used, validate_receipts_root_and_logs_bloom, validate_requests_hash,
81};
82use ethrex_crypto::NativeCrypto;
83use ethrex_metrics::metrics;
84use ethrex_rlp::constants::RLP_NULL;
85use ethrex_rlp::decode::RLPDecode;
86use ethrex_rlp::encode::RLPEncode;
87use ethrex_storage::{
88 AccountUpdatesList, Store, UpdateBatch, error::StoreError, hash_address, hash_key,
89};
90use ethrex_trie::node::{BranchNode, ExtensionNode, LeafNode};
91use ethrex_trie::{Nibbles, Node, NodeRef, Trie, TrieError, TrieNode};
92use ethrex_vm::backends::CachingDatabase;
93#[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
94use ethrex_vm::backends::levm::LEVM;
95use ethrex_vm::backends::levm::db::DatabaseLogger;
96use ethrex_vm::{BlockExecutionResult, DynVmDatabase, Evm, EvmError};
97use mempool::Mempool;
98use payload::PayloadOrTask;
99use rustc_hash::{FxHashMap, FxHashSet};
100use std::collections::hash_map::Entry;
101use std::collections::{BTreeMap, HashMap, HashSet};
102use std::sync::LazyLock;
103use std::sync::mpsc::Sender;
104use std::sync::{
105 Arc, RwLock,
106 atomic::{AtomicBool, AtomicUsize, Ordering},
107 mpsc::{Receiver, channel},
108};
109use std::time::{Duration, Instant};
110use tokio::sync::Mutex as TokioMutex;
111use tokio_util::sync::CancellationToken;
112
113use vm::StoreVmDatabase;
114
115#[cfg(feature = "metrics")]
116use ethrex_metrics::bal::METRICS_BAL;
117#[cfg(feature = "metrics")]
118use ethrex_metrics::blocks::METRICS_BLOCKS;
119
120#[cfg(feature = "c-kzg")]
121use ethrex_common::types::BlobsBundle;
122
123const MAX_PAYLOADS: usize = 10;
124const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000;
125
126static DROP_SENDER: LazyLock<Sender<Box<dyn Send>>> = LazyLock::new(|| {
130 let (tx, rx) = channel::<Box<dyn Send>>();
131 std::thread::Builder::new()
132 .name("drop_thread".to_string())
133 .spawn(move || for _ in rx {})
134 .expect("failed to spawn drop thread");
135 tx
136});
137
138type BlockExecutionPipelineResult = (
140 BlockExecutionResult,
141 AccountUpdatesList,
142 Option<Vec<AccountUpdate>>,
143 Option<BlockAccessList>, usize, [Instant; 7], Duration, );
148
149type AddBlockPipelineInnerResult = (
150 Option<BlockAccessList>,
151 Option<ExecutionWitness>,
152 Result<(), ChainError>,
153);
154
155#[derive(Debug, Clone, Default)]
160pub enum BlockchainType {
161 #[default]
163 L1,
164 L2(L2Config),
166}
167
168#[derive(Debug, Clone, Default)]
170pub struct L2Config {
171 pub fee_config: Arc<RwLock<FeeConfig>>,
175}
176
177#[derive(Debug)]
204pub struct Blockchain {
205 storage: Store,
207 pub mempool: Mempool,
209 is_synced: AtomicBool,
214 pub options: BlockchainOptions,
216 pub payloads: Arc<TokioMutex<Vec<(u64, PayloadOrTask)>>>,
221 merkle_pool: Arc<rayon::ThreadPool>,
228}
229
230#[derive(Debug, Clone)]
232pub struct BlockchainOptions {
233 pub max_mempool_size: usize,
235 pub perf_logs_enabled: bool,
237 pub r#type: BlockchainType,
239 pub max_blobs_per_block: Option<u32>,
242 pub precompute_witnesses: bool,
244 pub precompile_cache_enabled: bool,
248 pub bal_parallel_exec_enabled: bool,
252 pub bal_prefetch_enabled: bool,
256 pub bal_parallel_trie_enabled: bool,
261}
262
263impl Default for BlockchainOptions {
264 fn default() -> Self {
265 Self {
266 max_mempool_size: MAX_MEMPOOL_SIZE_DEFAULT,
267 perf_logs_enabled: false,
268 r#type: BlockchainType::default(),
269 max_blobs_per_block: None,
270 precompute_witnesses: false,
271 precompile_cache_enabled: true,
272 bal_parallel_exec_enabled: true,
273 bal_prefetch_enabled: true,
274 bal_parallel_trie_enabled: true,
275 }
276 }
277}
278
279#[derive(Debug, Clone)]
280pub struct BatchBlockProcessingFailure {
281 pub last_valid_hash: H256,
282 pub failed_block_hash: H256,
283}
284
285fn log_batch_progress(batch_size: u32, current_block: u32) {
286 let progress_needed = batch_size > 10;
287 const PERCENT_MARKS: [u32; 4] = [20, 40, 60, 80];
288 if progress_needed {
289 PERCENT_MARKS.iter().for_each(|mark| {
290 if (batch_size * mark) / 100 == current_block {
291 info!("[SYNCING] {mark}% of batch processed");
292 }
293 });
294 }
295}
296
297enum WorkerRequest {
298 ProcessAccount {
300 prefix: H256,
301 info: Option<AccountInfo>,
302 storage: FxHashMap<H256, U256>,
303 removed: bool,
304 removed_storage: bool,
305 },
306 FinishRouting,
308 MerklizeAccounts {
309 accounts: Vec<H256>,
310 },
311 CollectState {
312 tx: Sender<CollectedStateMsg>,
313 },
314 MerklizeStorage {
316 prefix: H256,
317 key: H256,
318 value: U256,
319 storage_root: H256,
320 },
321 DeleteStorage(H256),
322 RoutingDone {
324 from: u8,
325 },
326 StorageShard {
328 prefix: H256,
329 index: u8,
330 subroot: Box<BranchNode>,
331 nodes: Vec<TrieNode>,
332 },
333}
334
335struct CollectedStateMsg {
336 index: u8,
337 subroot: Box<BranchNode>,
338 state_nodes: Vec<TrieNode>,
339 storage_nodes: Vec<(H256, Vec<TrieNode>)>,
340}
341
342#[derive(Default)]
343struct PreMerkelizedAccountState {
344 storage_root: Option<Box<BranchNode>>,
345 nodes: Vec<TrieNode>,
346}
347
348struct BalStateWorkItem {
350 hashed_address: H256,
351 nonce: Option<u64>,
352 balance: Option<U256>,
353 code_hash: Option<H256>,
354 storage_root: Option<H256>,
356}
357
358impl Blockchain {
359 pub fn build_merkle_pool() -> Arc<rayon::ThreadPool> {
363 Arc::new(
364 rayon::ThreadPoolBuilder::new()
365 .num_threads(17)
366 .thread_name(|i| format!("merkle-worker-{i}"))
367 .build()
368 .expect("Failed to create merkle thread pool"),
369 )
370 }
371
372 pub fn new(store: Store, blockchain_opts: BlockchainOptions) -> Self {
373 Self {
374 storage: store,
375 mempool: Mempool::new(blockchain_opts.max_mempool_size),
376 is_synced: AtomicBool::new(false),
377 payloads: Arc::new(TokioMutex::new(Vec::new())),
378 options: blockchain_opts,
379 merkle_pool: Self::build_merkle_pool(),
380 }
381 }
382
383 pub fn default_with_store_and_pool(store: Store, pool: Arc<rayon::ThreadPool>) -> Self {
393 Self {
394 storage: store,
395 mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT),
396 is_synced: AtomicBool::new(false),
397 payloads: Arc::new(TokioMutex::new(Vec::new())),
398 options: BlockchainOptions::default(),
399 merkle_pool: pool,
400 }
401 }
402
403 pub fn default_with_store(store: Store) -> Self {
404 Self {
405 storage: store,
406 mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT),
407 is_synced: AtomicBool::new(false),
408 payloads: Arc::new(TokioMutex::new(Vec::new())),
409 options: BlockchainOptions::default(),
410 merkle_pool: Self::build_merkle_pool(),
411 }
412 }
413
414 fn validate_l1_transaction_types(&self, block: &Block) -> Result<(), ChainError> {
421 if !matches!(self.options.r#type, BlockchainType::L1) {
422 return Ok(());
423 }
424 for tx in &block.body.transactions {
425 if tx.tx_type().is_l2_only() {
426 return Err(ChainError::InvalidBlock(
427 InvalidBlockError::UnsupportedTransactionType(tx.tx_type() as u8),
428 ));
429 }
430 }
431 Ok(())
432 }
433
434 fn execute_block(
436 &self,
437 block: &Block,
438 ) -> Result<(BlockExecutionResult, Vec<AccountUpdate>), ChainError> {
439 let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
441 self.storage.add_pending_block(block.clone())?;
443 return Err(ChainError::ParentNotFound);
444 };
445
446 let chain_config = self.storage.get_chain_config();
447
448 validate_block_pre_execution(block, &parent_header, &chain_config, ELASTICITY_MULTIPLIER)?;
450 self.validate_l1_transaction_types(block)?;
451
452 let vm_db = StoreVmDatabase::new(self.storage.clone(), parent_header)?;
453 let mut vm = self.new_evm(vm_db)?;
454
455 let (execution_result, bal) = vm.execute_block(block)?;
456 let account_updates = vm.get_state_transitions()?;
457
458 if let Err(e) = validate_gas_used(execution_result.block_gas_used, &block.header) {
460 ethrex_vm::log_gas_used_mismatch(
461 &execution_result.tx_gas_breakdowns,
462 block.header.number,
463 execution_result.block_gas_used,
464 block.header.gas_used,
465 );
466 return Err(e.into());
467 }
468 validate_receipts_root_and_logs_bloom(
469 &block.header,
470 &execution_result.receipts,
471 &NativeCrypto,
472 )?;
473 validate_requests_hash(&block.header, &chain_config, &execution_result.requests)?;
474 if let Some(bal) = &bal {
475 validate_block_access_list_hash(
476 &block.header,
477 &chain_config,
478 bal,
479 block.body.transactions.len(),
480 )?;
481 }
482
483 Ok((execution_result, account_updates))
484 }
485
486 pub fn generate_bal_for_block(
490 &self,
491 block: &Block,
492 ) -> Result<Option<BlockAccessList>, ChainError> {
493 let chain_config = self.storage.get_chain_config();
494
495 if !chain_config.is_amsterdam_activated(block.header.timestamp) {
497 return Ok(None);
498 }
499
500 let parent_header = find_parent_header(&block.header, &self.storage)?;
502
503 let vm_db = StoreVmDatabase::new(self.storage.clone(), parent_header)?;
505 let mut vm = self.new_evm(vm_db)?;
506
507 let (_execution_result, bal) = vm.execute_block(block)?;
508
509 Ok(bal)
510 }
511
512 #[instrument(
514 level = "trace",
515 name = "Execute Block",
516 skip_all,
517 fields(namespace = "block_execution")
518 )]
519 fn execute_block_pipeline(
520 &self,
521 block: &Block,
522 parent_header: &BlockHeader,
523 vm: &mut Evm,
524 bal: Option<&BlockAccessList>,
525 collect_witness: bool,
526 ) -> Result<BlockExecutionPipelineResult, ChainError> {
527 let start_instant = Instant::now();
528
529 let chain_config = self.storage.get_chain_config();
530
531 validate_block_pre_execution(block, parent_header, &chain_config, ELASTICITY_MULTIPLIER)?;
533 self.validate_l1_transaction_types(block)?;
534 validate_block_body(&block.header, &block.body, &NativeCrypto)
535 .map_err(|e| ChainError::InvalidBlock(InvalidBlockError::InvalidBody(e)))?;
536 let block_validated_instant = Instant::now();
537
538 let exec_merkle_start = Instant::now();
539 let queue_length = AtomicUsize::new(0);
540 let queue_length_ref = &queue_length;
541 let mut max_queue_length = 0;
542
543 let original_store = vm.db.store.clone();
546 let caching_store: Arc<dyn ethrex_vm::backends::LevmDatabase> = Arc::new(
547 CachingDatabase::new(original_store, self.options.precompile_cache_enabled),
548 );
549
550 vm.db.store = caching_store.clone();
552
553 let cancelled = AtomicBool::new(false);
554 let bal_parallel_exec_enabled = self.options.bal_parallel_exec_enabled && !collect_witness;
559
560 let optimistic_updates: Option<FxHashMap<Address, BalSynthesisItem>> =
570 if self.options.bal_parallel_trie_enabled && !collect_witness {
571 bal.map(synthesize_bal_updates)
572 } else {
573 None
574 };
575
576 #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
611 if self.options.bal_prefetch_enabled
612 && !collect_witness
613 && let Some(bal) = bal
614 {
615 let slots = LEVM::bal_storage_slots(bal);
616 if !slots.is_empty() {
617 let _ = caching_store.prefetch_storage(&slots);
618 }
619 }
620
621 let (execution_result, merkleization_result, warmer_duration) =
622 std::thread::scope(|s| -> Result<_, ChainError> {
623 #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
624 let vm_type = vm.vm_type;
625 let cancelled_ref = &cancelled;
626 #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
627 let bal_prefetch_enabled = self.options.bal_prefetch_enabled;
628 #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
629 let warm_handle = (!collect_witness)
630 .then(|| {
631 std::thread::Builder::new()
632 .name("block_executor_warmer".to_string())
633 .spawn_scoped(s, move || {
634 let start = Instant::now();
637 if let Some(bal) = bal {
638 if bal_prefetch_enabled {
639 if let Err(e) = LEVM::warm_block_from_bal(
641 bal,
642 caching_store,
643 cancelled_ref,
644 ) {
645 debug!("BAL warming failed (non-fatal): {e}");
646 }
647 } else if !bal_parallel_exec_enabled {
648 if let Err(e) = LEVM::warm_block(
654 block,
655 caching_store,
656 vm_type,
657 &NativeCrypto,
658 cancelled_ref,
659 ) {
660 debug!("Block warming failed (non-fatal): {e}");
661 }
662 }
663 } else {
664 if let Err(e) = LEVM::warm_block(
666 block,
667 caching_store,
668 vm_type,
669 &NativeCrypto,
670 cancelled_ref,
671 ) {
672 debug!("Block warming failed (non-fatal): {e}");
673 }
674 }
675 start.elapsed()
676 })
677 .map_err(|e| {
678 ChainError::Custom(format!("Failed to spawn warmer thread: {e}"))
679 })
680 })
681 .transpose()?;
682 let max_queue_length_ref = &mut max_queue_length;
683 let (tx, rx_for_merkle) =
693 if optimistic_updates.is_some() && bal_parallel_exec_enabled {
694 (None, None)
695 } else {
696 let (tx, rx) = channel();
697 (Some(tx), Some(rx))
698 };
699
700 let execution_handle = std::thread::Builder::new()
701 .name("block_executor_execution".to_string())
702 .spawn_scoped(s, move || -> Result<_, ChainError> {
703 let result = vm.execute_block_pipeline(
704 block,
705 tx,
706 queue_length_ref,
707 bal,
708 bal_parallel_exec_enabled,
709 );
710 cancelled_ref.store(true, Ordering::Relaxed);
711 let (execution_result, produced_bal) = result?;
712
713 if let Err(e) =
715 validate_gas_used(execution_result.block_gas_used, &block.header)
716 {
717 ethrex_vm::log_gas_used_mismatch(
718 &execution_result.tx_gas_breakdowns,
719 block.header.number,
720 execution_result.block_gas_used,
721 block.header.gas_used,
722 );
723 return Err(e.into());
724 }
725 validate_receipts_root_and_logs_bloom(
726 &block.header,
727 &execution_result.receipts,
728 &NativeCrypto,
729 )?;
730 validate_requests_hash(
731 &block.header,
732 &chain_config,
733 &execution_result.requests,
734 )?;
735 if let Some(bal) = &produced_bal {
756 validate_block_access_list_hash(
757 &block.header,
758 &chain_config,
759 bal,
760 block.body.transactions.len(),
761 )?;
762 } else if let Some(header_bal) = bal
763 && chain_config.is_amsterdam_activated(block.header.timestamp)
764 && !header_bal.matches_commitment(block.header.block_access_list_hash)
765 {
766 return Err(InvalidBlockError::BlockAccessListHashMismatch.into());
767 }
768
769 let exec_end_instant = Instant::now();
770 Ok((execution_result, produced_bal, exec_end_instant))
771 })
772 .map_err(|e| {
773 ChainError::Custom(format!("Failed to spawn execution thread: {e}"))
774 })?;
775 let parent_header_ref = &parent_header; type MerkleResult = Result<
778 (
779 AccountUpdatesList,
780 Option<Vec<AccountUpdate>>,
781 Instant,
782 Instant,
783 ),
784 StoreError,
785 >;
786 let merkleize_handle = std::thread::Builder::new()
787 .name("block_executor_merkleizer".to_string())
788 .spawn_scoped(s, move || -> MerkleResult {
789 let merkle_start_instant = Instant::now();
790 let (account_updates_list, streaming_witness) =
791 if let Some(prepared) = optimistic_updates {
792 let list = self.handle_merkleization_bal_from_updates(
793 prepared,
794 parent_header_ref,
795 )?;
796 (list, None)
797 } else {
798 self.handle_merkleization(
799 rx_for_merkle.expect("rx is Some on non-BAL path"),
800 parent_header_ref,
801 queue_length_ref,
802 max_queue_length_ref,
803 collect_witness,
804 )?
805 };
806 let merkle_end_instant = Instant::now();
807 Ok((
808 account_updates_list,
809 streaming_witness,
810 merkle_start_instant,
811 merkle_end_instant,
812 ))
813 })
814 .map_err(|e| {
815 ChainError::Custom(format!("Failed to spawn merkleizer thread: {e}"))
816 })?;
817 let execution_result = execution_handle.join().unwrap_or_else(|_| {
818 Err(ChainError::Custom("execution thread panicked".to_string()))
819 });
820 let merkleization_result = merkleize_handle.join().unwrap_or_else(|_| {
821 Err(StoreError::Custom(
822 "merkleization thread panicked".to_string(),
823 ))
824 });
825 #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
826 let warmer_duration = warm_handle
827 .map(|handle| {
828 handle
829 .join()
830 .inspect_err(|e| warn!("Warming thread error: {e:?}"))
831 .ok()
832 .unwrap_or(Duration::ZERO)
833 })
834 .unwrap_or(Duration::ZERO);
835 #[cfg(any(not(feature = "rayon"), feature = "eip-8025"))]
836 let warmer_duration = Duration::ZERO;
837 Ok((execution_result, merkleization_result, warmer_duration))
838 })?;
839 let (account_updates_list, streaming_witness, merkle_start_instant, merkle_end_instant) =
840 merkleization_result?;
841 let (execution_result, produced_bal, exec_end_instant) = execution_result?;
842
843 let accumulated_updates = streaming_witness;
847
848 let exec_merkle_end_instant = Instant::now();
849
850 Ok((
851 execution_result,
852 account_updates_list,
853 accumulated_updates,
854 produced_bal,
855 max_queue_length,
856 [
857 start_instant,
858 block_validated_instant,
859 exec_merkle_start,
860 merkle_start_instant,
861 exec_end_instant,
862 merkle_end_instant,
863 exec_merkle_end_instant,
864 ],
865 warmer_duration,
866 ))
867 }
868
869 #[instrument(
870 level = "trace",
871 name = "Trie update",
872 skip_all,
873 fields(namespace = "block_execution")
874 )]
875 fn handle_merkleization(
876 &self,
877 rx: Receiver<Vec<AccountUpdate>>,
878 parent_header: &BlockHeader,
879 queue_length: &AtomicUsize,
880 max_queue_length: &mut usize,
881 collect_witness: bool,
882 ) -> Result<(AccountUpdatesList, Option<Vec<AccountUpdate>>), StoreError> {
883 let parent_state_root = parent_header.state_root;
884
885 let mut workers_tx = Vec::with_capacity(16);
887 let mut workers_rx = Vec::with_capacity(16);
888 for _ in 0..16 {
889 let (tx, rx) = cb::unbounded();
890 workers_tx.push(tx);
891 workers_rx.push(rx);
892 }
893
894 let (shutdown_tx, shutdown_rx) = cb::bounded::<()>(0);
896 let (done_tx, done_rx) = cb::unbounded::<Result<(), StoreError>>();
898
899 let watcher_error: Arc<std::sync::Mutex<Option<StoreError>>> = Default::default();
904 let result = self.merkle_pool.in_place_scope(|s| {
905 for (i, rx) in workers_rx.into_iter().enumerate() {
907 let all_senders = workers_tx.clone();
908 let storage_clone = self.storage.clone();
909 let shutdown_rx = shutdown_rx.clone();
910 let done_tx = done_tx.clone();
911 s.spawn(move |_| {
912 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
913 handle_subtrie(
914 storage_clone,
915 rx,
916 parent_state_root,
917 i as u8,
918 all_senders,
919 shutdown_rx,
920 )
921 }));
922 let result = match result {
923 Ok(r) => r,
924 Err(_) => Err(StoreError::Custom(format!("shard worker {i} panicked"))),
925 };
926 if let Err(cb::SendError(Err(e))) = done_tx.send(result) {
927 error!("Failed to send worker {i} error to watcher: {e}");
928 }
929 });
930 }
931 drop(done_tx); drop(shutdown_rx); let watcher_error = watcher_error.clone();
937 s.spawn(move |_| {
938 let _shutdown = shutdown_tx;
939 for result in done_rx {
940 if let Err(e) = result {
941 *watcher_error.lock().expect("watcher mutex poisoned") = Some(e);
943 return;
944 }
945 }
946 });
947
948 let mut code_updates: Vec<(H256, Code)> = vec![];
950 let mut hashed_address_cache: FxHashMap<Address, H256> = Default::default();
951 let mut has_storage: FxHashSet<H256> = Default::default();
952
953 let mut accumulator: Option<FxHashMap<Address, AccountUpdate>> =
954 collect_witness.then(FxHashMap::default);
955
956 for updates in rx {
957 let current_length = queue_length.fetch_sub(1, Ordering::Acquire);
958 *max_queue_length = current_length.max(*max_queue_length);
959 if let Some(acc) = &mut accumulator {
961 for update in updates.clone() {
962 match acc.entry(update.address) {
963 Entry::Vacant(e) => {
964 e.insert(update);
965 }
966 Entry::Occupied(mut e) => {
967 e.get_mut().merge(update);
968 }
969 }
970 }
971 }
972
973 for update in updates {
974 let hashed_address = *hashed_address_cache
975 .entry(update.address)
976 .or_insert_with(|| keccak(update.address));
977
978 let (info, code, storage) = if update.removed {
979 (Some(Default::default()), None, Default::default())
980 } else {
981 (update.info, update.code, update.added_storage)
982 };
983
984 if let Some(ref info) = info
986 && let Some(code) = code
987 {
988 code_updates.push((info.code_hash, code));
989 }
990
991 if update.removed || update.removed_storage || !storage.is_empty() {
992 has_storage.insert(hashed_address);
993 }
994
995 let bucket = hashed_address.as_fixed_bytes()[0] >> 4;
996 workers_tx[bucket as usize]
997 .send(WorkerRequest::ProcessAccount {
998 prefix: hashed_address,
999 info,
1000 storage,
1001 removed: update.removed,
1002 removed_storage: update.removed_storage,
1003 })
1004 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
1005 }
1006 }
1007
1008 for tx in &workers_tx {
1010 tx.send(WorkerRequest::FinishRouting)
1011 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
1012 }
1013
1014 let mut early_batches: [Vec<H256>; 16] = Default::default();
1016 for hashed_account in hashed_address_cache.values() {
1017 if !has_storage.contains(hashed_account) {
1018 let bucket = hashed_account.as_fixed_bytes()[0] >> 4;
1019 early_batches[bucket as usize].push(*hashed_account);
1020 }
1021 }
1022 for (i, batch) in early_batches.into_iter().enumerate() {
1023 if !batch.is_empty() {
1024 workers_tx[i]
1025 .send(WorkerRequest::MerklizeAccounts { accounts: batch })
1026 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
1027 }
1028 }
1029
1030 let mut storage_updates: Vec<(H256, Vec<TrieNode>)> = Default::default();
1032 let (gatherer_tx, gatherer_rx) = channel();
1033 for tx in &workers_tx {
1034 tx.send(WorkerRequest::CollectState {
1035 tx: gatherer_tx.clone(),
1036 })
1037 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
1038 }
1039 drop(gatherer_tx);
1040 drop(workers_tx);
1041
1042 let mut root = BranchNode::default();
1043 let mut state_updates = Vec::new();
1044 for CollectedStateMsg {
1045 index,
1046 subroot,
1047 state_nodes,
1048 storage_nodes,
1049 } in gatherer_rx
1050 {
1051 storage_updates.extend(storage_nodes);
1052 state_updates.extend(state_nodes);
1053 root.choices[index as usize] = subroot.choices[index as usize].clone();
1054 }
1055
1056 let collapsed = self.collapse_root_node(parent_header, None, root)?;
1057 let state_trie_hash = if let Some(root) = collapsed {
1058 let mut root = NodeRef::from(root);
1059 let hash = root.commit(Nibbles::default(), &mut state_updates, &NativeCrypto);
1060 let _ = DROP_SENDER.send(Box::new(root));
1061 hash.finalize(&NativeCrypto)
1062 } else {
1063 state_updates.push((Nibbles::default(), vec![RLP_NULL]));
1064 *EMPTY_TRIE_HASH
1065 };
1066
1067 let accumulated_updates = accumulator.map(|acc| acc.into_values().collect());
1068
1069 Ok((
1070 AccountUpdatesList {
1071 state_trie_hash,
1072 state_updates,
1073 storage_updates,
1074 code_updates,
1075 },
1076 accumulated_updates,
1077 ))
1078 });
1079
1080 if let Some(err) = watcher_error.lock().expect("watcher mutex poisoned").take() {
1082 return Err(err);
1083 }
1084
1085 result
1086 }
1087
1088 #[instrument(
1098 level = "trace",
1099 name = "Trie update (BAL)",
1100 skip_all,
1101 fields(namespace = "block_execution")
1102 )]
1103 fn handle_merkleization_bal_from_updates(
1104 &self,
1105 prepared: FxHashMap<Address, BalSynthesisItem>,
1106 parent_header: &BlockHeader,
1107 ) -> Result<AccountUpdatesList, StoreError> {
1108 const NUM_WORKERS: usize = 16;
1109 let parent_state_root = parent_header.state_root;
1110
1111 let mut code_updates: Vec<(H256, Code)> = Vec::new();
1115 let mut accounts: Vec<(H256, BalSynthesisItem)> = Vec::with_capacity(prepared.len());
1116 for (addr, item) in prepared {
1117 let hashed = keccak(addr);
1118 if let Some(ch) = item.code_hash
1119 && let Some(ref code) = item.code
1120 {
1121 code_updates.push((ch, code.clone()));
1122 }
1123 accounts.push((hashed, item));
1124 }
1125
1126 let mut work_indices: Vec<(usize, usize)> = accounts
1136 .iter()
1137 .enumerate()
1138 .map(|(i, (_, item))| {
1139 let weight = if !item.added_storage.is_empty() {
1140 1.max(item.added_storage.len())
1141 } else {
1142 0
1143 };
1144 (i, weight)
1145 })
1146 .collect();
1147 work_indices.sort_unstable_by(|a, b| b.1.cmp(&a.1));
1148
1149 let mut bins: Vec<Vec<usize>> = (0..NUM_WORKERS).map(|_| Vec::new()).collect();
1151 let mut bin_weights: Vec<usize> = vec![0; NUM_WORKERS];
1152 for (idx, weight) in work_indices {
1153 let min_bin = bin_weights
1154 .iter()
1155 .enumerate()
1156 .min_by_key(|(_, w)| **w)
1157 .expect("bin_weights is non-empty")
1158 .0;
1159 bins[min_bin].push(idx);
1160 bin_weights[min_bin] += weight;
1161 }
1162
1163 let mut storage_roots: Vec<Option<H256>> = vec![None; accounts.len()];
1165 let mut storage_updates: Vec<(H256, Vec<TrieNode>)> = Vec::new();
1166
1167 std::thread::scope(|s| -> Result<(), StoreError> {
1168 let accounts_ref = &accounts;
1169 let handles: Vec<_> = bins
1170 .into_iter()
1171 .enumerate()
1172 .filter_map(|(worker_id, bin)| {
1173 if bin.is_empty() {
1174 return None;
1175 }
1176 Some(
1177 std::thread::Builder::new()
1178 .name(format!("bal_storage_worker_{worker_id}"))
1179 .spawn_scoped(
1180 s,
1181 move || -> Result<Vec<(usize, H256, Vec<TrieNode>)>, StoreError> {
1182 let mut results: Vec<(usize, H256, Vec<TrieNode>)> = Vec::new();
1183 let state_trie =
1185 self.storage.open_state_trie(parent_state_root)?;
1186 for idx in bin {
1187 let (hashed_address, item) = &accounts_ref[idx];
1188 if item.added_storage.is_empty() {
1189 continue;
1190 }
1191
1192 let storage_root = match state_trie
1193 .get(hashed_address.as_bytes())?
1194 {
1195 Some(rlp) => AccountState::decode(&rlp)?.storage_root,
1196 None => *EMPTY_TRIE_HASH,
1197 };
1198 let mut trie = self.storage.open_storage_trie(
1199 *hashed_address,
1200 parent_state_root,
1201 storage_root,
1202 )?;
1203
1204 let mut hashed_storage: Vec<(H256, U256)> = item
1207 .added_storage
1208 .iter()
1209 .map(|(k, v)| (keccak(k), *v))
1210 .collect();
1211 hashed_storage.sort_unstable_by(|a, b| a.0.cmp(&b.0));
1212 for (hashed_key, value) in &hashed_storage {
1213 if value.is_zero() {
1214 trie.remove(hashed_key.as_bytes())?;
1215 } else {
1216 trie.insert(
1217 hashed_key.as_bytes().to_vec(),
1218 value.encode_to_vec(),
1219 )?;
1220 }
1221 }
1222
1223 let (root_hash, nodes) =
1224 trie.collect_changes_since_last_hash(&NativeCrypto);
1225 results.push((idx, root_hash, nodes));
1226 }
1227 Ok(results)
1228 },
1229 )
1230 .map_err(|e| StoreError::Custom(format!("spawn failed: {e}"))),
1231 )
1232 })
1233 .collect::<Result<Vec<_>, _>>()?;
1234
1235 for handle in handles {
1236 let results = handle
1237 .join()
1238 .map_err(|_| StoreError::Custom("storage worker panicked".to_string()))??;
1239 for (idx, root_hash, nodes) in results {
1240 storage_roots[idx] = Some(root_hash);
1241 storage_updates.push((accounts_ref[idx].0, nodes));
1242 }
1243 }
1244 Ok(())
1245 })?;
1246
1247 let mut shards: Vec<Vec<BalStateWorkItem>> = (0..NUM_WORKERS).map(|_| Vec::new()).collect();
1251 for (idx, (hashed_address, item)) in accounts.iter().enumerate() {
1252 let bucket = (hashed_address.as_fixed_bytes()[0] >> 4) as usize;
1253 shards[bucket].push(BalStateWorkItem {
1254 hashed_address: *hashed_address,
1255 nonce: item.nonce,
1256 balance: item.balance,
1257 code_hash: item.code_hash,
1258 storage_root: storage_roots[idx],
1259 });
1260 }
1261
1262 let mut root = BranchNode::default();
1263 let mut state_updates = Vec::new();
1264
1265 std::thread::scope(|s| -> Result<(), StoreError> {
1270 let handles: Vec<_> = shards
1271 .into_iter()
1272 .enumerate()
1273 .map(|(index, shard_items)| {
1274 std::thread::Builder::new()
1275 .name(format!("bal_state_shard_{index}"))
1276 .spawn_scoped(
1277 s,
1278 move || -> Result<(Box<BranchNode>, Vec<TrieNode>), StoreError> {
1279 let mut state_trie =
1280 self.storage.open_state_trie(parent_state_root)?;
1281
1282 for item in &shard_items {
1283 let path = item.hashed_address.as_bytes();
1284
1285 let mut account_state = match state_trie.get(path)? {
1287 Some(rlp) => {
1288 let state = AccountState::decode(&rlp)?;
1289 state_trie.insert(path.to_vec(), rlp)?;
1294 state
1295 }
1296 None => AccountState::default(),
1297 };
1298
1299 if let Some(n) = item.nonce {
1300 account_state.nonce = n;
1301 }
1302 if let Some(b) = item.balance {
1303 account_state.balance = b;
1304 }
1305 if let Some(ch) = item.code_hash {
1306 account_state.code_hash = ch;
1307 }
1308 if let Some(storage_root) = item.storage_root {
1309 account_state.storage_root = storage_root;
1310 }
1311
1312 if account_state != AccountState::default() {
1315 state_trie
1316 .insert(path.to_vec(), account_state.encode_to_vec())?;
1317 } else {
1318 state_trie.remove(path)?;
1319 }
1320 }
1321
1322 collect_trie(index as u8, state_trie)
1323 .map_err(|e| StoreError::Custom(format!("{e}")))
1324 },
1325 )
1326 .map_err(|e| StoreError::Custom(format!("spawn failed: {e}")))
1327 })
1328 .collect::<Result<Vec<_>, _>>()?;
1329
1330 for (i, handle) in handles.into_iter().enumerate() {
1331 let (subroot, state_nodes) = handle
1332 .join()
1333 .map_err(|_| StoreError::Custom("state shard worker panicked".to_string()))??;
1334 state_updates.extend(state_nodes);
1335 root.choices[i] = subroot.choices[i].clone();
1336 }
1337 Ok(())
1338 })?;
1339
1340 let state_trie_hash =
1342 if let Some(root) = self.collapse_root_node(parent_header, None, root)? {
1343 let mut root = NodeRef::from(root);
1344 let hash = root.commit(Nibbles::default(), &mut state_updates, &NativeCrypto);
1345 let _ = DROP_SENDER.send(Box::new(root));
1346 hash.finalize(&NativeCrypto)
1347 } else {
1348 state_updates.push((Nibbles::default(), vec![RLP_NULL]));
1349 *EMPTY_TRIE_HASH
1350 };
1351
1352 Ok(AccountUpdatesList {
1353 state_trie_hash,
1354 state_updates,
1355 storage_updates,
1356 code_updates,
1357 })
1358 }
1359
1360 fn collapse_root_node(
1361 &self,
1362 parent_header: &BlockHeader,
1363 prefix: Option<H256>,
1364 root: BranchNode,
1365 ) -> Result<Option<Node>, StoreError> {
1366 collapse_root_node(&self.storage, parent_header.state_root, prefix, root)
1367 }
1368
1369 fn execute_block_from_state(
1371 &self,
1372 parent_header: &BlockHeader,
1373 block: &Block,
1374 chain_config: &ChainConfig,
1375 vm: &mut Evm,
1376 ) -> Result<BlockExecutionResult, ChainError> {
1377 validate_block_pre_execution(block, parent_header, chain_config, ELASTICITY_MULTIPLIER)?;
1379 self.validate_l1_transaction_types(block)?;
1380 let (execution_result, bal) = vm.execute_block(block)?;
1381 if let Err(e) = validate_gas_used(execution_result.block_gas_used, &block.header) {
1383 ethrex_vm::log_gas_used_mismatch(
1384 &execution_result.tx_gas_breakdowns,
1385 block.header.number,
1386 execution_result.block_gas_used,
1387 block.header.gas_used,
1388 );
1389 return Err(e.into());
1390 }
1391 validate_receipts_root_and_logs_bloom(
1392 &block.header,
1393 &execution_result.receipts,
1394 &NativeCrypto,
1395 )?;
1396 validate_requests_hash(&block.header, chain_config, &execution_result.requests)?;
1397 if let Some(bal) = &bal {
1398 validate_block_access_list_hash(
1399 &block.header,
1400 chain_config,
1401 bal,
1402 block.body.transactions.len(),
1403 )?;
1404 }
1405
1406 Ok(execution_result)
1407 }
1408
1409 pub async fn generate_witness_for_blocks(
1410 &self,
1411 blocks: &[Block],
1412 ) -> Result<ExecutionWitness, ChainError> {
1413 self.generate_witness_for_blocks_with_fee_configs(blocks, None)
1414 .await
1415 }
1416
1417 pub async fn generate_witness_for_blocks_with_fee_configs(
1418 &self,
1419 blocks: &[Block],
1420 fee_configs: Option<&[FeeConfig]>,
1421 ) -> Result<ExecutionWitness, ChainError> {
1422 let first_block_header = &blocks
1423 .first()
1424 .ok_or(ChainError::WitnessGeneration(
1425 "Empty block batch".to_string(),
1426 ))?
1427 .header;
1428
1429 let trie = self
1431 .storage
1432 .state_trie(first_block_header.parent_hash)
1433 .map_err(|_| ChainError::ParentStateNotFound)?
1434 .ok_or(ChainError::ParentStateNotFound)?;
1435 let initial_state_root = trie.hash_no_commit(&NativeCrypto);
1436
1437 let (mut current_trie_witness, mut trie) = TrieLogger::open_trie(trie);
1438
1439 let mut accumulated_state_trie_witness = current_trie_witness
1443 .lock()
1444 .map_err(|_| {
1445 ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
1446 })?
1447 .clone();
1448
1449 let mut touched_account_storage_slots = BTreeMap::new();
1450 let mut used_trie_nodes = Vec::new();
1452
1453 let root_node = trie.root_node().map_err(|_| {
1455 ChainError::WitnessGeneration("Failed to get root state node".to_string())
1456 })?;
1457
1458 let mut blockhash_opcode_references = HashMap::new();
1459 let mut codes = Vec::new();
1460
1461 for (i, block) in blocks.iter().enumerate() {
1462 let parent_hash = block.header.parent_hash;
1463 let parent_header = self
1464 .storage
1465 .get_block_header_by_hash(parent_hash)
1466 .map_err(ChainError::StoreError)?
1467 .ok_or(ChainError::ParentNotFound)?;
1468
1469 let vm_db: DynVmDatabase =
1475 Box::new(StoreVmDatabase::new(self.storage.clone(), parent_header)?);
1476
1477 let logger = Arc::new(DatabaseLogger::new(Arc::new(vm_db)));
1478
1479 let mut vm = match self.options.r#type {
1480 BlockchainType::L1 => {
1481 Evm::new_from_db_for_l1(logger.clone(), Arc::new(NativeCrypto))
1482 }
1483 BlockchainType::L2(_) => {
1484 let l2_config = match fee_configs {
1485 Some(fee_configs) => {
1486 fee_configs.get(i).ok_or(ChainError::WitnessGeneration(
1487 "FeeConfig not found for witness generation".to_string(),
1488 ))?
1489 }
1490 None => Err(ChainError::WitnessGeneration(
1491 "L2Config not found for witness generation".to_string(),
1492 ))?,
1493 };
1494 Evm::new_from_db_for_l2(logger.clone(), *l2_config, Arc::new(NativeCrypto))
1495 }
1496 };
1497
1498 let (execution_result, _bal) = vm.execute_block(block)?;
1500
1501 let account_updates = vm.get_state_transitions()?;
1503
1504 let mut state_accessed = logger
1505 .state_accessed
1506 .lock()
1507 .map_err(|_e| {
1508 ChainError::WitnessGeneration("Failed to execute with witness".to_string())
1509 })?
1510 .clone();
1511
1512 for keys in state_accessed.values_mut() {
1514 let mut seen = HashSet::new();
1515 keys.retain(|k| seen.insert(*k));
1516 }
1517
1518 for (account, acc_keys) in state_accessed.iter() {
1519 let slots: &mut Vec<H256> =
1520 touched_account_storage_slots.entry(*account).or_default();
1521 slots.extend(acc_keys.iter().copied());
1522 }
1523
1524 let logger_block_hashes = logger
1526 .block_hashes_accessed
1527 .lock()
1528 .map_err(|_e| {
1529 ChainError::WitnessGeneration("Failed to get block hashes".to_string())
1530 })?
1531 .clone();
1532
1533 blockhash_opcode_references.extend(logger_block_hashes);
1534
1535 if let Some(withdrawals) = block.body.withdrawals.as_ref() {
1537 for withdrawal in withdrawals {
1538 trie.get(&hash_address(&withdrawal.address)).map_err(|_e| {
1539 ChainError::Custom("Failed to access account from trie".to_string())
1540 })?;
1541 }
1542 }
1543
1544 let mut used_storage_tries = HashMap::new();
1545
1546 for (account, acc_keys) in state_accessed.iter() {
1549 trie.get(&hash_address(account)).map_err(|_e| {
1551 ChainError::WitnessGeneration("Failed to access account from trie".to_string())
1552 })?;
1553 if !acc_keys.is_empty()
1555 && let Ok(Some(storage_trie)) = self.storage.storage_trie(parent_hash, *account)
1556 {
1557 let (storage_trie_witness, storage_trie) = TrieLogger::open_trie(storage_trie);
1558 for storage_key in acc_keys {
1560 let hashed_key = hash_key(storage_key);
1561 storage_trie.get(&hashed_key).map_err(|_e| {
1562 ChainError::WitnessGeneration(
1563 "Failed to access storage key".to_string(),
1564 )
1565 })?;
1566 }
1567 used_storage_tries.insert(*account, (storage_trie_witness, storage_trie));
1569 }
1570 }
1571
1572 for code_hash in logger
1574 .code_accessed
1575 .lock()
1576 .map_err(|_e| {
1577 ChainError::WitnessGeneration("Failed to gather used bytecodes".to_string())
1578 })?
1579 .iter()
1580 {
1581 let code = self
1582 .storage
1583 .get_account_code(*code_hash)
1584 .map_err(|_e| {
1585 ChainError::WitnessGeneration("Failed to get account code".to_string())
1586 })?
1587 .ok_or(ChainError::WitnessGeneration(
1588 "Failed to get account code".to_string(),
1589 ))?;
1590 codes.push(code.code().to_vec());
1591 }
1592
1593 let (storage_tries_after_update, account_updates_list) =
1595 self.storage.apply_account_updates_from_trie_with_witness(
1596 trie,
1597 &account_updates,
1598 used_storage_tries,
1599 )?;
1600
1601 self.store_block(block.clone(), account_updates_list, execution_result)?;
1605
1606 for (address, (witness, _storage_trie)) in storage_tries_after_update {
1607 let mut witness = witness.lock().map_err(|_| {
1608 ChainError::WitnessGeneration("Failed to lock storage trie witness".to_string())
1609 })?;
1610 let witness = std::mem::take(&mut *witness);
1611 let witness = witness.into_values().collect::<Vec<_>>();
1612 used_trie_nodes.extend_from_slice(&witness);
1613 touched_account_storage_slots.entry(address).or_default();
1614 }
1615
1616 let (new_state_trie_witness, updated_trie) = TrieLogger::open_trie(
1617 self.storage
1618 .state_trie(block.header.hash())
1619 .map_err(|_| ChainError::ParentStateNotFound)?
1620 .ok_or(ChainError::ParentStateNotFound)?,
1621 );
1622
1623 trie = updated_trie;
1625
1626 for state_trie_witness in current_trie_witness
1627 .lock()
1628 .map_err(|_| {
1629 ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
1630 })?
1631 .iter()
1632 {
1633 accumulated_state_trie_witness
1634 .insert(*state_trie_witness.0, state_trie_witness.1.clone());
1635 }
1636
1637 current_trie_witness = new_state_trie_witness;
1638 }
1639
1640 used_trie_nodes.extend_from_slice(&Vec::from_iter(
1641 accumulated_state_trie_witness.into_values(),
1642 ));
1643
1644 if used_trie_nodes.is_empty()
1646 && let Some(root) = root_node
1647 {
1648 used_trie_nodes.push((*root).clone());
1649 }
1650
1651 let mut block_headers_bytes = Vec::new();
1653
1654 let first_blockhash_opcode_number = blockhash_opcode_references.keys().min();
1655 let first_needed_block_hash = first_blockhash_opcode_number
1656 .and_then(|n| {
1657 (*n < first_block_header.number.saturating_sub(1))
1658 .then(|| blockhash_opcode_references.get(n))?
1659 .copied()
1660 })
1661 .unwrap_or(first_block_header.parent_hash);
1662
1663 let mut current_header = blocks
1665 .last()
1666 .ok_or_else(|| ChainError::WitnessGeneration("Empty batch".to_string()))?
1667 .header
1668 .clone();
1669
1670 while current_header.hash() != first_needed_block_hash {
1673 let parent_hash = current_header.parent_hash;
1674 let current_number = current_header.number - 1;
1675
1676 current_header = self
1677 .storage
1678 .get_block_header_by_hash(parent_hash)?
1679 .ok_or_else(|| {
1680 ChainError::WitnessGeneration(format!(
1681 "Failed to get block {current_number} header"
1682 ))
1683 })?;
1684
1685 block_headers_bytes.push(current_header.encode_to_vec());
1686 }
1687
1688 let nodes: BTreeMap<H256, Node> = used_trie_nodes
1690 .into_iter()
1691 .map(|node| {
1692 (
1693 node.compute_hash(&NativeCrypto).finalize(&NativeCrypto),
1694 node,
1695 )
1696 })
1697 .collect();
1698 let state_trie_root = if let NodeRef::Node(state_trie_root, _) =
1699 Trie::get_embedded_root(&nodes, initial_state_root)?
1700 {
1701 Some((*state_trie_root).clone())
1702 } else {
1703 None
1704 };
1705
1706 let state_trie = if let Some(state_trie_root) = &state_trie_root {
1708 Trie::new_temp_with_root(state_trie_root.clone().into())
1709 } else {
1710 Trie::new_temp()
1711 };
1712 let mut storage_trie_roots = BTreeMap::new();
1713 for address in touched_account_storage_slots.keys() {
1714 let hashed_address = hash_address(address);
1715 let hashed_address_h256 = H256::from_slice(&hashed_address);
1716 let Some(encoded_account) = state_trie.get(&hashed_address)? else {
1717 continue; };
1719 let storage_root_hash = AccountState::decode(&encoded_account)?.storage_root;
1720 if storage_root_hash == *EMPTY_TRIE_HASH {
1721 continue; }
1723 if !nodes.contains_key(&storage_root_hash) {
1724 continue; }
1726 let node = Trie::get_embedded_root(&nodes, storage_root_hash)?;
1727 let NodeRef::Node(node, _) = node else {
1728 return Err(ChainError::Custom(
1729 "execution witness does not contain non-empty storage trie".to_string(),
1730 ));
1731 };
1732 storage_trie_roots.insert(hashed_address_h256, (*node).clone());
1733 }
1734
1735 Ok(ExecutionWitness {
1736 codes,
1737 block_headers_bytes,
1738 first_block_number: first_block_header.number,
1739 chain_config: self.storage.get_chain_config(),
1740 state_trie_root,
1741 storage_trie_roots,
1742 })
1743 }
1744
1745 pub fn generate_witness_from_account_updates(
1746 &self,
1747 account_updates: Vec<AccountUpdate>,
1748 block: &Block,
1749 parent_header: BlockHeader,
1750 logger: &DatabaseLogger,
1751 ) -> Result<ExecutionWitness, ChainError> {
1752 let trie = self
1754 .storage
1755 .state_trie(parent_header.hash())
1756 .map_err(|_| ChainError::ParentStateNotFound)?
1757 .ok_or(ChainError::ParentStateNotFound)?;
1758 let initial_state_root = trie.hash_no_commit(&NativeCrypto);
1759
1760 let (trie_witness, trie) = TrieLogger::open_trie(trie);
1761
1762 let mut touched_account_storage_slots = BTreeMap::new();
1763 let mut used_trie_nodes = Vec::new();
1765
1766 let root_node = trie.root_node().map_err(|_| {
1768 ChainError::WitnessGeneration("Failed to get root state node".to_string())
1769 })?;
1770
1771 let mut codes = Vec::new();
1772
1773 for account_update in &account_updates {
1774 touched_account_storage_slots.insert(
1775 account_update.address,
1776 account_update
1777 .added_storage
1778 .keys()
1779 .cloned()
1780 .collect::<Vec<H256>>(),
1781 );
1782 }
1783
1784 let blockhash_opcode_references = logger
1786 .block_hashes_accessed
1787 .lock()
1788 .map_err(|_e| ChainError::WitnessGeneration("Failed to get block hashes".to_string()))?
1789 .clone();
1790
1791 if let Some(withdrawals) = block.body.withdrawals.as_ref() {
1793 for withdrawal in withdrawals {
1794 trie.get(&hash_address(&withdrawal.address)).map_err(|_e| {
1795 ChainError::Custom("Failed to access account from trie".to_string())
1796 })?;
1797 }
1798 }
1799
1800 let mut used_storage_tries = HashMap::new();
1801
1802 for (account, acc_keys) in logger
1805 .state_accessed
1806 .lock()
1807 .map_err(|_e| {
1808 ChainError::WitnessGeneration("Failed to execute with witness".to_string())
1809 })?
1810 .iter()
1811 {
1812 trie.get(&hash_address(account)).map_err(|_e| {
1814 ChainError::WitnessGeneration("Failed to access account from trie".to_string())
1815 })?;
1816 if !acc_keys.is_empty()
1818 && let Ok(Some(storage_trie)) =
1819 self.storage.storage_trie(parent_header.hash(), *account)
1820 {
1821 let (storage_trie_witness, storage_trie) = TrieLogger::open_trie(storage_trie);
1822 for storage_key in acc_keys {
1824 let hashed_key = hash_key(storage_key);
1825 storage_trie.get(&hashed_key).map_err(|_e| {
1826 ChainError::WitnessGeneration("Failed to access storage key".to_string())
1827 })?;
1828 }
1829 used_storage_tries.insert(*account, (storage_trie_witness, storage_trie));
1831 }
1832 }
1833
1834 for code_hash in logger
1836 .code_accessed
1837 .lock()
1838 .map_err(|_e| {
1839 ChainError::WitnessGeneration("Failed to gather used bytecodes".to_string())
1840 })?
1841 .iter()
1842 {
1843 let code = self
1844 .storage
1845 .get_account_code(*code_hash)
1846 .map_err(|_e| {
1847 ChainError::WitnessGeneration("Failed to get account code".to_string())
1848 })?
1849 .ok_or(ChainError::WitnessGeneration(
1850 "Failed to get account code".to_string(),
1851 ))?;
1852 codes.push(code.code().to_vec());
1853 }
1854
1855 let (storage_tries_after_update, _account_updates_list) =
1857 self.storage.apply_account_updates_from_trie_with_witness(
1858 trie,
1859 &account_updates,
1860 used_storage_tries,
1861 )?;
1862
1863 for (address, (witness, _storage_trie)) in storage_tries_after_update {
1864 let mut witness = witness.lock().map_err(|_| {
1865 ChainError::WitnessGeneration("Failed to lock storage trie witness".to_string())
1866 })?;
1867 let witness = std::mem::take(&mut *witness);
1868 let witness = witness.into_values().collect::<Vec<_>>();
1869 used_trie_nodes.extend_from_slice(&witness);
1870 touched_account_storage_slots.entry(address).or_default();
1871 }
1872
1873 used_trie_nodes.extend_from_slice(&Vec::from_iter(
1874 trie_witness
1875 .lock()
1876 .map_err(|_| {
1877 ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
1878 })?
1879 .clone()
1880 .into_values(),
1881 ));
1882
1883 if used_trie_nodes.is_empty()
1885 && let Some(root) = root_node
1886 {
1887 used_trie_nodes.push((*root).clone());
1888 }
1889
1890 let mut block_headers_bytes = Vec::new();
1892
1893 let first_blockhash_opcode_number = blockhash_opcode_references.keys().min();
1894 let first_needed_block_hash = first_blockhash_opcode_number
1895 .and_then(|n| {
1896 (*n < block.header.number.saturating_sub(1))
1897 .then(|| blockhash_opcode_references.get(n))?
1898 .copied()
1899 })
1900 .unwrap_or(block.header.parent_hash);
1901
1902 let mut current_header = block.header.clone();
1903
1904 while current_header.hash() != first_needed_block_hash {
1907 let parent_hash = current_header.parent_hash;
1908 let current_number = current_header.number - 1;
1909
1910 current_header = self
1911 .storage
1912 .get_block_header_by_hash(parent_hash)?
1913 .ok_or_else(|| {
1914 ChainError::WitnessGeneration(format!(
1915 "Failed to get block {current_number} header"
1916 ))
1917 })?;
1918
1919 block_headers_bytes.push(current_header.encode_to_vec());
1920 }
1921
1922 let nodes: BTreeMap<H256, Node> = used_trie_nodes
1924 .into_iter()
1925 .map(|node| {
1926 (
1927 node.compute_hash(&NativeCrypto).finalize(&NativeCrypto),
1928 node,
1929 )
1930 })
1931 .collect();
1932 let state_trie_root = if let NodeRef::Node(state_trie_root, _) =
1933 Trie::get_embedded_root(&nodes, initial_state_root)?
1934 {
1935 Some((*state_trie_root).clone())
1936 } else {
1937 None
1938 };
1939
1940 let state_trie = if let Some(state_trie_root) = &state_trie_root {
1942 Trie::new_temp_with_root(state_trie_root.clone().into())
1943 } else {
1944 Trie::new_temp()
1945 };
1946 let mut storage_trie_roots = BTreeMap::new();
1947 for address in touched_account_storage_slots.keys() {
1948 let hashed_address = hash_address(address);
1949 let hashed_address_h256 = H256::from_slice(&hashed_address);
1950 let Some(encoded_account) = state_trie.get(&hashed_address)? else {
1951 continue; };
1953 let storage_root_hash = AccountState::decode(&encoded_account)?.storage_root;
1954 if storage_root_hash == *EMPTY_TRIE_HASH {
1955 continue; }
1957 if !nodes.contains_key(&storage_root_hash) {
1958 continue; }
1960 let node = Trie::get_embedded_root(&nodes, storage_root_hash)?;
1961 let NodeRef::Node(node, _) = node else {
1962 return Err(ChainError::Custom(
1963 "execution witness does not contain non-empty storage trie".to_string(),
1964 ));
1965 };
1966 storage_trie_roots.insert(hashed_address_h256, (*node).clone());
1967 }
1968
1969 Ok(ExecutionWitness {
1970 codes,
1971 block_headers_bytes,
1972 first_block_number: parent_header.number,
1973 chain_config: self.storage.get_chain_config(),
1974 state_trie_root,
1975 storage_trie_roots,
1976 })
1977 }
1978
1979 #[instrument(
1980 level = "trace",
1981 name = "Block DB update",
1982 skip_all,
1983 fields(namespace = "block_execution")
1984 )]
1985 pub fn store_block(
1986 &self,
1987 block: Block,
1988 account_updates_list: AccountUpdatesList,
1989 execution_result: BlockExecutionResult,
1990 ) -> Result<(), ChainError> {
1991 validate_state_root(&block.header, account_updates_list.state_trie_hash)?;
1993
1994 let update_batch = UpdateBatch {
1995 account_updates: account_updates_list.state_updates,
1996 storage_updates: account_updates_list.storage_updates,
1997 receipts: vec![(block.hash(), execution_result.receipts)],
1998 blocks: vec![block],
1999 code_updates: account_updates_list.code_updates,
2000 batch_mode: false,
2001 };
2002
2003 self.storage
2004 .store_block_updates(update_batch)
2005 .map_err(|e| e.into())
2006 }
2007
2008 pub fn add_block(&self, block: Block) -> Result<(), ChainError> {
2009 let since = Instant::now();
2010 let (res, updates) = self.execute_block(&block)?;
2011 let executed = Instant::now();
2012
2013 let account_updates_list = self
2015 .storage
2016 .apply_account_updates_batch(block.header.parent_hash, &updates)?
2017 .ok_or(ChainError::ParentStateNotFound)?;
2018
2019 let (gas_used, gas_limit, block_number, transactions_count) = (
2020 block.header.gas_used,
2021 block.header.gas_limit,
2022 block.header.number,
2023 block.body.transactions.len(),
2024 );
2025
2026 let merkleized = Instant::now();
2027 let result = self.store_block(block, account_updates_list, res);
2028 let stored = Instant::now();
2029
2030 if self.options.perf_logs_enabled {
2031 Self::print_add_block_logs(
2032 gas_used,
2033 gas_limit,
2034 block_number,
2035 transactions_count,
2036 since,
2037 executed,
2038 merkleized,
2039 stored,
2040 );
2041 }
2042 result
2043 }
2044
2045 pub fn add_block_pipeline(
2046 &self,
2047 block: Block,
2048 bal: Option<&BlockAccessList>,
2049 ) -> Result<(), ChainError> {
2050 let (_, _, result) = self.add_block_pipeline_inner(block, bal, false)?;
2051 result
2052 }
2053
2054 pub fn add_block_pipeline_bal(
2061 &self,
2062 block: Block,
2063 bal: Option<&BlockAccessList>,
2064 ) -> Result<Option<BlockAccessList>, ChainError> {
2065 let (produced_bal, _, result) = self.add_block_pipeline_inner(block, bal, false)?;
2066 result?;
2067 Ok(produced_bal)
2068 }
2069
2070 pub fn add_block_pipeline_with_witness(
2073 &self,
2074 block: Block,
2075 bal: Option<&BlockAccessList>,
2076 ) -> Result<ExecutionWitness, ChainError> {
2077 let (_, witness, result) = self.add_block_pipeline_inner(block, bal, true)?;
2078 result?;
2079 witness.ok_or_else(|| {
2080 ChainError::WitnessGeneration(
2081 "forced witness collection completed without producing a witness".to_string(),
2082 )
2083 })
2084 }
2085
2086 fn add_block_pipeline_inner(
2095 &self,
2096 block: Block,
2097 bal: Option<&BlockAccessList>,
2098 force_witness: bool,
2099 ) -> Result<AddBlockPipelineInnerResult, ChainError> {
2100 let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
2102 self.storage.add_pending_block(block)?;
2104 return Err(ChainError::ParentNotFound);
2105 };
2106
2107 let should_store_witness = self.options.precompute_witnesses && self.is_synced();
2108 let collect_witness = should_store_witness || force_witness;
2109
2110 let (mut vm, logger) = if collect_witness {
2111 let vm_db: DynVmDatabase = Box::new(StoreVmDatabase::new(
2115 self.storage.clone(),
2116 parent_header.clone(),
2117 )?);
2118
2119 let logger = Arc::new(DatabaseLogger::new(Arc::new(vm_db)));
2120
2121 let vm = match self.options.r#type.clone() {
2122 BlockchainType::L1 => {
2123 Evm::new_from_db_for_l1(logger.clone(), Arc::new(NativeCrypto))
2124 }
2125 BlockchainType::L2(l2_config) => Evm::new_from_db_for_l2(
2126 logger.clone(),
2127 *l2_config.fee_config.read().map_err(|_| {
2128 EvmError::Custom("Fee config lock was poisoned".to_string())
2129 })?,
2130 Arc::new(NativeCrypto),
2131 ),
2132 };
2133 (vm, Some(logger))
2134 } else {
2135 let vm_db = StoreVmDatabase::new(self.storage.clone(), parent_header.clone())?;
2136 let vm = self.new_evm(vm_db)?;
2137 (vm, None)
2138 };
2139
2140 let (
2141 res,
2142 account_updates_list,
2143 accumulated_updates,
2144 produced_bal,
2145 merkle_queue_length,
2146 instants,
2147 warmer_duration,
2148 ) = { self.execute_block_pipeline(&block, &parent_header, &mut vm, bal, collect_witness)? };
2149
2150 let (gas_used, gas_limit, block_number, transactions_count) = (
2151 block.header.gas_used,
2152 block.header.gas_limit,
2153 block.header.number,
2154 block.body.transactions.len(),
2155 );
2156 let block_hash = block.hash();
2157
2158 let mut witness = None;
2159 if let Some(logger) = logger
2160 && let Some(account_updates) = accumulated_updates
2161 {
2162 let block_hash = block.hash();
2163 let generated_witness = self.generate_witness_from_account_updates(
2164 account_updates,
2165 &block,
2166 parent_header,
2167 &logger,
2168 )?;
2169 match (should_store_witness, force_witness) {
2170 (true, true) => {
2171 witness = Some(generated_witness.clone());
2172 self.storage
2173 .store_witness(block_hash, block_number, generated_witness)?;
2174 }
2175 (true, false) => {
2176 self.storage
2177 .store_witness(block_hash, block_number, generated_witness)?;
2178 }
2179 (false, true) => {
2180 witness = Some(generated_witness);
2181 }
2182 (false, false) => {}
2183 }
2184 };
2185
2186 if let Some(bal) = produced_bal.as_ref().or(bal)
2191 && let Err(err) = self.storage.store_block_access_list(block_hash, bal)
2192 {
2193 warn!("Failed to store block access list for block {block_hash}: {err}");
2194 }
2195
2196 let result = self.store_block(block, account_updates_list, res);
2197
2198 let stored = Instant::now();
2199
2200 let instants = std::array::from_fn(move |i| {
2201 if i < instants.len() {
2202 instants[i]
2203 } else {
2204 stored
2205 }
2206 });
2207
2208 if self.options.perf_logs_enabled {
2209 Self::print_add_block_pipeline_logs(
2210 gas_used,
2211 gas_limit,
2212 block_number,
2213 block_hash,
2214 transactions_count,
2215 merkle_queue_length,
2216 warmer_duration,
2217 instants,
2218 );
2219 }
2220
2221 metrics!(if let Some(bal_ref) = produced_bal.as_ref().or(bal) {
2222 let account_count = bal_ref.accounts().len() as u64;
2223 let slot_count = bal_ref.item_count().saturating_sub(account_count);
2224 let size_bytes = bal_ref.length() as f64;
2225 METRICS_BAL.blocks_total.inc();
2226 METRICS_BAL.size_bytes.set(size_bytes);
2227 METRICS_BAL.size_bytes_histogram.observe(size_bytes);
2228 METRICS_BAL.account_count.set(account_count as i64);
2229 METRICS_BAL.slot_count.set(slot_count as i64);
2230 });
2231
2232 Ok((produced_bal, witness, result))
2233 }
2234
2235 #[allow(clippy::too_many_arguments)]
2236 fn print_add_block_logs(
2237 gas_used: u64,
2238 gas_limit: u64,
2239 block_number: u64,
2240 transactions_count: usize,
2241 since: Instant,
2242 executed: Instant,
2243 merkleized: Instant,
2244 stored: Instant,
2245 ) {
2246 let interval = stored.duration_since(since).as_millis() as f64;
2247 if interval != 0f64 {
2248 let as_gigas = gas_used as f64 / 10_f64.powf(9_f64);
2249 let throughput = as_gigas / interval * 1000_f64;
2250
2251 metrics!(
2252 METRICS_BLOCKS.set_block_number(block_number);
2253 METRICS_BLOCKS.set_latest_gas_used(gas_used as f64);
2254 METRICS_BLOCKS.set_latest_block_gas_limit(gas_limit as f64);
2255 METRICS_BLOCKS.set_latest_gigagas(throughput);
2256 METRICS_BLOCKS.set_execution_ms(executed.duration_since(since).as_secs_f64() * 1000.0);
2257 METRICS_BLOCKS.set_merkle_ms(merkleized.duration_since(executed).as_secs_f64() * 1000.0);
2258 METRICS_BLOCKS.set_store_ms(stored.duration_since(merkleized).as_secs_f64() * 1000.0);
2259 METRICS_BLOCKS.set_transaction_count(transactions_count as i64);
2260 );
2261
2262 let base_log = format!(
2263 "[METRIC] BLOCK EXECUTION THROUGHPUT ({}): {:.3} Ggas/s TIME SPENT: {:.0} ms. Gas Used: {:.3} ({:.0}%), #Txs: {}.",
2264 block_number,
2265 throughput,
2266 interval,
2267 as_gigas,
2268 (gas_used as f64 / gas_limit as f64) * 100.0,
2269 transactions_count
2270 );
2271
2272 fn percentage(init: Instant, end: Instant, total: f64) -> f64 {
2273 (end.duration_since(init).as_millis() as f64 / total * 100.0).round()
2274 }
2275 let extra_log = if as_gigas > 0.0 {
2276 format!(
2277 " exec: {}% merkle: {}% store: {}%",
2278 percentage(since, executed, interval),
2279 percentage(executed, merkleized, interval),
2280 percentage(merkleized, stored, interval)
2281 )
2282 } else {
2283 "".to_string()
2284 };
2285 info!("{}{}", base_log, extra_log);
2286 }
2287 }
2288
2289 #[allow(clippy::too_many_arguments)]
2290 fn print_add_block_pipeline_logs(
2291 gas_used: u64,
2292 gas_limit: u64,
2293 block_number: u64,
2294 block_hash: H256,
2295 transactions_count: usize,
2296 merkle_queue_length: usize,
2297 warmer_duration: Duration,
2298 [
2299 start_instant,
2300 block_validated_instant,
2301 exec_merkle_start,
2302 merkle_start_instant,
2303 exec_end_instant,
2304 merkle_end_instant,
2305 exec_merkle_end_instant,
2306 stored_instant,
2307 ]: [Instant; 8],
2308 ) {
2309 let total_ms = stored_instant.duration_since(start_instant).as_secs_f64() * 1000.0;
2310 if total_ms == 0.0 {
2311 return;
2312 }
2313
2314 let as_mgas = gas_used as f64 / 1e6;
2315 let throughput = (gas_used as f64 / 1e9) / (total_ms / 1000.0);
2316
2317 let validate_ms = block_validated_instant
2319 .duration_since(start_instant)
2320 .as_secs_f64()
2321 * 1000.0;
2322 let exec_ms = exec_end_instant
2323 .duration_since(exec_merkle_start)
2324 .as_secs_f64()
2325 * 1000.0;
2326 let store_ms = stored_instant
2327 .duration_since(exec_merkle_end_instant)
2328 .as_secs_f64()
2329 * 1000.0;
2330 let warmer_ms = warmer_duration.as_secs_f64() * 1000.0;
2331
2332 let _merkle_total_ms = exec_merkle_end_instant
2336 .duration_since(exec_merkle_start)
2337 .as_secs_f64()
2338 * 1000.0;
2339
2340 let merkle_concurrent_ms = (merkle_end_instant
2342 .duration_since(exec_merkle_start)
2343 .as_secs_f64()
2344 * 1000.0)
2345 .min(exec_ms);
2346
2347 let merkle_drain_ms = exec_merkle_end_instant
2349 .saturating_duration_since(exec_end_instant)
2350 .as_secs_f64()
2351 * 1000.0;
2352
2353 let actual_merkle_ms = merkle_concurrent_ms + merkle_drain_ms;
2355 let overlap_pct = if actual_merkle_ms > 0.0 {
2356 (merkle_concurrent_ms / actual_merkle_ms) * 100.0
2357 } else {
2358 0.0
2359 };
2360
2361 let warmer_early_ms = exec_ms - warmer_ms;
2363
2364 let phases = [
2367 ("validate", validate_ms),
2368 ("exec", exec_ms),
2369 ("merkle", merkle_drain_ms),
2370 ("store", store_ms),
2371 ];
2372 let bottleneck = phases
2373 .iter()
2374 .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
2375 .map(|(name, _)| *name)
2376 .unwrap_or("exec");
2377
2378 let pct = |ms: f64| (ms / total_ms * 100.0).round() as u64;
2380
2381 let header = format!(
2383 "[METRIC] BLOCK {} {:#x} | {:.3} Ggas/s | {:.2} ms | {} txs | {:.0} Mgas ({}%)",
2384 block_number,
2385 block_hash,
2386 throughput,
2387 total_ms,
2388 transactions_count,
2389 as_mgas,
2390 (gas_used as f64 / gas_limit as f64 * 100.0).round() as u64
2391 );
2392
2393 let bottleneck_marker = |name: &str| {
2394 if name == bottleneck {
2395 " << BOTTLENECK"
2396 } else {
2397 ""
2398 }
2399 };
2400
2401 let warmer_relation = if warmer_early_ms >= 0.0 {
2402 "before exec"
2403 } else {
2404 "after exec"
2405 };
2406
2407 let merkle_start_delay_ms = merkle_start_instant
2408 .duration_since(exec_merkle_start)
2409 .as_secs_f64()
2410 * 1000.0;
2411
2412 info!("{}", header);
2413 info!(
2414 " |- validate: {:>7.2} ms ({:>2}%){}",
2415 validate_ms,
2416 pct(validate_ms),
2417 bottleneck_marker("validate")
2418 );
2419 info!(
2420 " |- exec: {:>7.2} ms ({:>2}%){}",
2421 exec_ms,
2422 pct(exec_ms),
2423 bottleneck_marker("exec")
2424 );
2425 info!(
2426 " |- merkle: {:>7.2} ms ({:>2}%){} [concurrent: {:.2} ms, drain: {:.2} ms, overlap: {:.0}%, queue: {}, start_delay: {:.2} ms]",
2427 merkle_drain_ms,
2428 pct(merkle_drain_ms),
2429 bottleneck_marker("merkle"),
2430 merkle_concurrent_ms,
2431 merkle_drain_ms,
2432 overlap_pct,
2433 merkle_queue_length,
2434 merkle_start_delay_ms,
2435 );
2436 info!(
2437 " |- store: {:>7.2} ms ({:>2}%){}",
2438 store_ms,
2439 pct(store_ms),
2440 bottleneck_marker("store")
2441 );
2442 info!(
2443 " `- warmer: {:>7.2} ms [finished: {:.2} ms {}]",
2444 warmer_ms,
2445 warmer_early_ms.abs(),
2446 warmer_relation,
2447 );
2448
2449 metrics!(
2451 METRICS_BLOCKS.set_block_number(block_number);
2452 METRICS_BLOCKS.set_latest_gas_used(gas_used as f64);
2453 METRICS_BLOCKS.set_latest_block_gas_limit(gas_limit as f64);
2454 METRICS_BLOCKS.set_latest_gigagas(throughput);
2455 METRICS_BLOCKS.set_transaction_count(transactions_count as i64);
2456 METRICS_BLOCKS.set_validate_ms(validate_ms);
2457 METRICS_BLOCKS.set_execution_ms(exec_ms);
2458 METRICS_BLOCKS.set_merkle_concurrent_ms(merkle_concurrent_ms);
2459 METRICS_BLOCKS.set_merkle_drain_ms(merkle_drain_ms);
2460 METRICS_BLOCKS.set_merkle_ms(_merkle_total_ms);
2461 METRICS_BLOCKS.set_merkle_overlap_pct(overlap_pct);
2462 METRICS_BLOCKS.set_store_ms(store_ms);
2463 METRICS_BLOCKS.set_warmer_ms(warmer_ms);
2464 METRICS_BLOCKS.set_warmer_early_ms(warmer_early_ms);
2465 );
2466 }
2467
2468 pub async fn add_blocks_in_batch(
2480 &self,
2481 blocks: Vec<Block>,
2482 bals: &[Option<BlockAccessList>],
2483 cancellation_token: CancellationToken,
2484 ) -> Result<(), (ChainError, Option<BatchBlockProcessingFailure>)> {
2485 let mut last_valid_hash = H256::default();
2486
2487 debug_assert!(
2491 bals.is_empty() || bals.len() == blocks.len(),
2492 "bals must be empty or aligned with blocks (bals={}, blocks={})",
2493 bals.len(),
2494 blocks.len(),
2495 );
2496
2497 let Some(first_block_header) = blocks.first().map(|e| e.header.clone()) else {
2498 return Err((ChainError::Custom("First block not found".into()), None));
2499 };
2500
2501 let chain_config: ChainConfig = self.storage.get_chain_config();
2502
2503 let mut block_hash_cache: BTreeMap<BlockNumber, BlockHash> =
2506 blocks.iter().map(|b| (b.header.number, b.hash())).collect();
2507
2508 let parent_header = self
2509 .storage
2510 .get_block_header_by_hash(first_block_header.parent_hash)
2511 .map_err(|e| (ChainError::StoreError(e), None))?
2512 .ok_or((ChainError::ParentNotFound, None))?;
2513
2514 block_hash_cache
2518 .entry(parent_header.number)
2519 .or_insert_with(|| parent_header.hash());
2520 let mut hash = parent_header.parent_hash;
2521 let mut number = parent_header.number.saturating_sub(1);
2522 let lookback = first_block_header.number.saturating_sub(256);
2523 while number > lookback {
2524 block_hash_cache.entry(number).or_insert(hash);
2525 match self.storage.get_block_header_by_hash(hash) {
2526 Ok(Some(header)) => {
2527 hash = header.parent_hash;
2528 number = number.saturating_sub(1);
2529 }
2530 Ok(None) => break,
2531 Err(e) => {
2532 warn!("Failed to fetch block header by hash during BLOCKHASH cache walk: {e}");
2533 break;
2534 }
2535 }
2536 }
2537 let vm_db = StoreVmDatabase::new_with_block_hash_cache(
2538 self.storage.clone(),
2539 parent_header,
2540 block_hash_cache,
2541 )
2542 .map_err(|e| (ChainError::EvmError(e), None))?;
2543 let mut vm = self.new_evm(vm_db).map_err(|e| (e.into(), None))?;
2544
2545 let blocks_len = blocks.len();
2546 let mut all_receipts: Vec<(BlockHash, Vec<Receipt>)> = Vec::with_capacity(blocks_len);
2547 let mut total_gas_used = 0;
2548 let mut transactions_count = 0;
2549
2550 let interval = Instant::now();
2551 for (i, block) in blocks.iter().enumerate() {
2552 if cancellation_token.is_cancelled() {
2553 info!("Received shutdown signal, aborting");
2554 return Err((ChainError::Custom(String::from("shutdown signal")), None));
2555 }
2556 let parent_header = if i == 0 {
2558 find_parent_header(&block.header, &self.storage).map_err(|err| {
2559 (
2560 err,
2561 Some(BatchBlockProcessingFailure {
2562 failed_block_hash: block.hash(),
2563 last_valid_hash,
2564 }),
2565 )
2566 })?
2567 } else {
2568 blocks[i - 1].header.clone()
2570 };
2571
2572 let BlockExecutionResult { receipts, .. } = self
2573 .execute_block_from_state(&parent_header, block, &chain_config, &mut vm)
2574 .map_err(|err| {
2575 (
2576 err,
2577 Some(BatchBlockProcessingFailure {
2578 failed_block_hash: block.hash(),
2579 last_valid_hash,
2580 }),
2581 )
2582 })?;
2583 debug!("Executed block with hash {}", block.hash());
2584 last_valid_hash = block.hash();
2585 total_gas_used += block.header.gas_used;
2586 transactions_count += block.body.transactions.len();
2587 all_receipts.push((block.hash(), receipts));
2588
2589 log_batch_progress(blocks_len as u32, i as u32);
2591 tokio::task::yield_now().await;
2592 }
2593
2594 let account_updates = vm
2595 .get_state_transitions()
2596 .map_err(|err| (ChainError::EvmError(err), None))?;
2597
2598 let last_block = blocks
2599 .last()
2600 .ok_or_else(|| (ChainError::Custom("Last block not found".into()), None))?;
2601
2602 let last_block_number = last_block.header.number;
2603 let last_block_gas_limit = last_block.header.gas_limit;
2604
2605 let account_updates_list = self
2607 .storage
2608 .apply_account_updates_batch(first_block_header.parent_hash, &account_updates)
2609 .map_err(|e| (e.into(), None))?
2610 .ok_or((ChainError::ParentStateNotFound, None))?;
2611
2612 let new_state_root = account_updates_list.state_trie_hash;
2613 let state_updates = account_updates_list.state_updates;
2614 let accounts_updates = account_updates_list.storage_updates;
2615 let code_updates = account_updates_list.code_updates;
2616
2617 validate_state_root(&last_block.header, new_state_root).map_err(|e| (e, None))?;
2619
2620 let bals_to_store: Vec<(BlockHash, BlockAccessList)> = blocks
2627 .iter()
2628 .zip(bals.iter())
2629 .filter_map(|(block, bal)| {
2630 let bal = bal.as_ref()?;
2631 bal.matches_commitment(block.header.block_access_list_hash)
2632 .then(|| (block.hash(), bal.clone()))
2633 })
2634 .collect();
2635
2636 let update_batch = UpdateBatch {
2637 account_updates: state_updates,
2638 storage_updates: accounts_updates,
2639 blocks,
2640 receipts: all_receipts,
2641 code_updates,
2642 batch_mode: true,
2643 };
2644
2645 self.storage
2646 .store_block_updates(update_batch)
2647 .map_err(|e| (e.into(), None))?;
2648
2649 for (block_hash, bal) in &bals_to_store {
2650 if let Err(err) = self.storage.store_block_access_list(*block_hash, bal) {
2651 warn!(
2652 "Failed to persist block access list for {block_hash} during batch sync: {err}"
2653 );
2654 }
2655 }
2656
2657 let elapsed_seconds = interval.elapsed().as_secs_f64();
2658 let throughput = if elapsed_seconds > 0.0 && total_gas_used != 0 {
2659 let as_gigas = (total_gas_used as f64) / 1e9;
2660 as_gigas / elapsed_seconds
2661 } else {
2662 0.0
2663 };
2664
2665 metrics!(
2666 METRICS_BLOCKS.set_block_number(last_block_number);
2667 METRICS_BLOCKS.set_latest_block_gas_limit(last_block_gas_limit as f64);
2668 METRICS_BLOCKS.set_latest_gas_used(total_gas_used as f64 / blocks_len as f64);
2670 METRICS_BLOCKS.set_latest_gigagas(throughput);
2671 );
2672
2673 if self.options.perf_logs_enabled {
2674 info!(
2675 "[METRICS] Executed and stored: Range: {}, Last block num: {}, Last block gas limit: {}, Total transactions: {}, Total Gas: {}, Throughput: {} Gigagas/s",
2676 blocks_len,
2677 last_block_number,
2678 last_block_gas_limit,
2679 transactions_count,
2680 total_gas_used,
2681 throughput
2682 );
2683 }
2684
2685 Ok(())
2686 }
2687
2688 #[cfg(feature = "c-kzg")]
2690 pub async fn add_blob_transaction_to_pool(
2691 &self,
2692 transaction: EIP4844Transaction,
2693 blobs_bundle: BlobsBundle,
2694 ) -> Result<H256, MempoolError> {
2695 let fork = self.current_fork().await?;
2696
2697 let transaction = Transaction::EIP4844Transaction(transaction);
2698 let hash = transaction.hash();
2699 if self.mempool.contains_tx(hash)? {
2700 return Ok(hash);
2701 }
2702
2703 let wrapper_len = transaction.encode_canonical_len() + blobs_bundle.length();
2710 if wrapper_len > MAX_BLOB_TX_SIZE {
2711 return Err(MempoolError::TxSizeExceeded {
2712 actual: wrapper_len,
2713 limit: MAX_BLOB_TX_SIZE,
2714 });
2715 }
2716
2717 if let Transaction::EIP4844Transaction(transaction) = &transaction {
2719 blobs_bundle.validate(transaction, fork)?;
2720 }
2721
2722 let sender = transaction.sender(&NativeCrypto)?;
2723
2724 if let Some(tx_to_replace) = self.validate_transaction(&transaction, sender).await? {
2726 self.remove_transaction_from_pool(&tx_to_replace)?;
2727 }
2728
2729 self.mempool.add_blobs_bundle(hash, blobs_bundle)?;
2732 self.mempool
2733 .add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?;
2734 Ok(hash)
2735 }
2736
2737 pub async fn add_transaction_to_pool(
2739 &self,
2740 transaction: Transaction,
2741 ) -> Result<H256, MempoolError> {
2742 if matches!(transaction, Transaction::EIP4844Transaction(_)) {
2744 return Err(MempoolError::BlobTxNoBlobsBundle);
2745 }
2746 let encoded_len = transaction.encode_canonical_len();
2752 if encoded_len > MAX_TX_SIZE {
2753 return Err(MempoolError::TxSizeExceeded {
2754 actual: encoded_len,
2755 limit: MAX_TX_SIZE,
2756 });
2757 }
2758 let hash = transaction.hash();
2759 if self.mempool.contains_tx(hash)? {
2760 return Ok(hash);
2761 }
2762 let sender = transaction.sender(&NativeCrypto)?;
2763 if let Some(tx_to_replace) = self.validate_transaction(&transaction, sender).await? {
2765 self.remove_transaction_from_pool(&tx_to_replace)?;
2766 }
2767
2768 self.mempool
2770 .add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?;
2771
2772 Ok(hash)
2773 }
2774
2775 pub fn remove_transaction_from_pool(&self, hash: &H256) -> Result<(), StoreError> {
2777 self.mempool.remove_transaction(hash)
2778 }
2779
2780 pub fn remove_block_transactions_from_pool(&self, block: &Block) -> Result<(), StoreError> {
2782 for tx in &block.body.transactions {
2783 self.mempool.remove_transaction(&tx.hash())?;
2784 }
2785 Ok(())
2786 }
2787
2788 pub fn remove_stale_blob_txs(&self, head_hash: BlockHash) -> Result<(), StoreError> {
2793 let blob_txs = self.mempool.blob_txs()?;
2794 if blob_txs.is_empty() {
2795 return Ok(());
2796 }
2797 let mut nonce_by_sender: HashMap<Address, u64> = HashMap::new();
2799 for (hash, sender, tx_nonce) in blob_txs {
2800 let state_nonce = match nonce_by_sender.entry(sender) {
2801 Entry::Occupied(e) => *e.get(),
2802 Entry::Vacant(e) => {
2803 let nonce = self
2804 .storage
2805 .get_account_info_by_hash(head_hash, sender)?
2806 .map(|info| info.nonce)
2807 .unwrap_or(0);
2808 *e.insert(nonce)
2809 }
2810 };
2811 if tx_nonce < state_nonce {
2812 self.mempool.remove_transaction(&hash)?;
2813 }
2814 }
2815 Ok(())
2816 }
2817
2818 pub async fn validate_transaction(
2851 &self,
2852 tx: &Transaction,
2853 sender: Address,
2854 ) -> Result<Option<H256>, MempoolError> {
2855 let nonce = tx.nonce();
2856
2857 if matches!(tx, &Transaction::PrivilegedL2Transaction(_)) {
2858 return Ok(None);
2859 }
2860
2861 let header_no = self.storage.get_latest_block_number().await?;
2862 let header = self
2863 .storage
2864 .get_block_header(header_no)?
2865 .ok_or(MempoolError::NoBlockHeaderError)?;
2866 let config = self.storage.get_chain_config();
2867
2868 if !matches!(tx, Transaction::EIP4844Transaction(_)) {
2875 let encoded_len = tx.encode_canonical_len();
2876 if encoded_len > MAX_TX_SIZE {
2877 return Err(MempoolError::TxSizeExceeded {
2878 actual: encoded_len,
2879 limit: MAX_TX_SIZE,
2880 });
2881 }
2882 }
2883
2884 let max_initcode_size = if config.is_amsterdam_activated(header.timestamp) {
2887 AMSTERDAM_MAX_INITCODE_SIZE
2888 } else {
2889 MAX_INITCODE_SIZE
2890 };
2891 if config.is_shanghai_activated(header.timestamp)
2892 && tx.is_contract_creation()
2893 && tx.data().len() > max_initcode_size as usize
2894 {
2895 return Err(MempoolError::TxMaxInitCodeSizeError);
2896 }
2897
2898 if config.is_osaka_activated(header.timestamp)
2899 && !config.is_amsterdam_activated(header.timestamp)
2900 && tx.gas_limit() > POST_OSAKA_GAS_LIMIT_CAP
2901 {
2902 return Err(MempoolError::TxMaxGasLimitExceededError(
2904 tx.hash(),
2905 tx.gas_limit(),
2906 ));
2907 }
2908
2909 if header.gas_limit < tx.gas_limit() {
2911 return Err(MempoolError::TxGasLimitExceededError);
2912 }
2913
2914 if tx.max_priority_fee().unwrap_or(0) > tx.max_fee_per_gas().unwrap_or(0) {
2916 return Err(MempoolError::TxTipAboveFeeCapError);
2917 }
2918
2919 if tx.gas_limit() < mempool::transaction_intrinsic_gas(tx, &header, &config)? {
2921 return Err(MempoolError::TxIntrinsicGasCostAboveLimitError);
2922 }
2923
2924 if let Some(fee) = tx.max_fee_per_blob_gas() {
2926 if fee < MIN_BASE_FEE_PER_BLOB_GAS.into() {
2928 return Err(MempoolError::TxBlobBaseFeeTooLowError);
2929 }
2930 };
2931
2932 let maybe_sender_acc_info = self.storage.get_account_info(header_no, sender).await?;
2933
2934 if let Some(sender_acc_info) = maybe_sender_acc_info {
2935 if nonce < sender_acc_info.nonce || nonce == u64::MAX {
2936 return Err(MempoolError::NonceTooLow);
2937 }
2938
2939 let tx_cost = tx
2940 .cost_without_base_fee()
2941 .ok_or(MempoolError::InvalidTxGasvalues)?;
2942
2943 if tx_cost > sender_acc_info.balance {
2944 return Err(MempoolError::NotEnoughBalance);
2945 }
2946 } else {
2947 return Err(MempoolError::NotEnoughBalance);
2949 }
2950
2951 let tx_to_replace_hash = self.mempool.find_tx_to_replace(sender, nonce, tx)?;
2954
2955 if tx
2956 .chain_id()
2957 .is_some_and(|chain_id| chain_id != config.chain_id)
2958 {
2959 return Err(MempoolError::InvalidChainId(config.chain_id));
2960 }
2961
2962 Ok(tx_to_replace_hash)
2963 }
2964
2965 pub fn set_synced(&self) {
2968 self.is_synced.store(true, Ordering::Relaxed);
2969 }
2970
2971 pub fn set_not_synced(&self) {
2974 self.is_synced.store(false, Ordering::Relaxed);
2975 }
2976
2977 pub fn is_synced(&self) -> bool {
2981 self.is_synced.load(Ordering::Relaxed)
2982 }
2983
2984 pub fn get_p2p_transaction_by_hash(&self, hash: &H256) -> Result<P2PTransaction, StoreError> {
2985 let Some(tx) = self.mempool.get_transaction_by_hash(*hash)? else {
2986 return Err(StoreError::Custom(format!(
2987 "Hash {hash} not found in the mempool",
2988 )));
2989 };
2990 let result = match tx {
2991 Transaction::LegacyTransaction(itx) => P2PTransaction::LegacyTransaction(itx),
2992 Transaction::EIP2930Transaction(itx) => P2PTransaction::EIP2930Transaction(itx),
2993 Transaction::EIP1559Transaction(itx) => P2PTransaction::EIP1559Transaction(itx),
2994 Transaction::EIP4844Transaction(itx) => {
2995 let Some(bundle) = self.mempool.get_blobs_bundle(*hash)? else {
2996 return Err(StoreError::Custom(format!(
2997 "Blob transaction present without its bundle: hash {hash}",
2998 )));
2999 };
3000
3001 P2PTransaction::EIP4844TransactionWithBlobs(WrappedEIP4844Transaction {
3002 tx: itx,
3003 wrapper_version: (bundle.version != 0).then_some(bundle.version),
3004 blobs_bundle: bundle,
3005 })
3006 }
3007 Transaction::EIP7702Transaction(itx) => P2PTransaction::EIP7702Transaction(itx),
3008 Transaction::PrivilegedL2Transaction(_) => {
3012 return Err(StoreError::Custom(
3013 "Privileged Transactions are not supported in P2P".to_string(),
3014 ));
3015 }
3016 Transaction::FeeTokenTransaction(itx) => P2PTransaction::FeeTokenTransaction(itx),
3017 };
3018
3019 Ok(result)
3020 }
3021
3022 pub fn new_evm(&self, vm_db: StoreVmDatabase) -> Result<Evm, EvmError> {
3023 new_evm(&self.options.r#type, vm_db)
3024 }
3025
3026 pub async fn current_fork(&self) -> Result<Fork, StoreError> {
3028 let chain_config = self.storage.get_chain_config();
3029 let latest_block_number = self.storage.get_latest_block_number().await?;
3030 let latest_block = self
3031 .storage
3032 .get_block_header(latest_block_number)?
3033 .ok_or(StoreError::Custom("Latest block not in DB".to_string()))?;
3034 Ok(chain_config.fork(latest_block.timestamp))
3035 }
3036}
3037
3038fn load_trie(
3040 storage: &Store,
3041 parent_state_root: H256,
3042 prefix: Option<H256>,
3043) -> Result<Trie, StoreError> {
3044 Ok(match prefix {
3045 Some(account_hash) => {
3046 let state_trie = storage.open_state_trie(parent_state_root)?;
3047 let storage_root = match state_trie.get(account_hash.as_bytes())? {
3048 Some(rlp) => AccountState::decode(&rlp)?.storage_root,
3049 None => *EMPTY_TRIE_HASH,
3050 };
3051 storage.open_storage_trie(account_hash, parent_state_root, storage_root)?
3052 }
3053 None => storage.open_state_trie(parent_state_root)?,
3054 })
3055}
3056
3057fn collapse_root_node(
3060 storage: &Store,
3061 parent_state_root: H256,
3062 prefix: Option<H256>,
3063 root: BranchNode,
3064) -> Result<Option<Node>, StoreError> {
3065 let children: Vec<(usize, &NodeRef)> = root
3066 .choices
3067 .iter()
3068 .enumerate()
3069 .filter(|(_, choice)| choice.is_valid())
3070 .take(2)
3071 .collect();
3072 if children.len() > 1 {
3073 return Ok(Some(Node::Branch(Box::from(root))));
3074 }
3075 let Some((choice, only_child)) = children.first() else {
3076 return Ok(None);
3077 };
3078 let only_child = Arc::unwrap_or_clone(match only_child {
3079 NodeRef::Node(node, _) => node.clone(),
3080 noderef @ NodeRef::Hash(_) => {
3081 let trie = load_trie(storage, parent_state_root, prefix)?;
3082 let Some(node) = noderef.get_node(trie.db(), Nibbles::from_hex(vec![*choice as u8]))?
3083 else {
3084 return Ok(None);
3085 };
3086 node
3087 }
3088 });
3089 Ok(Some(match only_child {
3090 Node::Branch(_) => {
3091 ExtensionNode::new(Nibbles::from_hex(vec![*choice as u8]), only_child.into()).into()
3092 }
3093 Node::Extension(mut extension_node) => {
3094 extension_node.prefix.prepend(*choice as u8);
3095 extension_node.into()
3096 }
3097 Node::Leaf(mut leaf) => {
3098 leaf.partial.prepend(*choice as u8);
3099 leaf.into()
3100 }
3101 }))
3102}
3103
3104fn collect_and_send(
3106 index: u8,
3107 state_trie: &mut Trie,
3108 pre_collected_state: &mut Vec<TrieNode>,
3109 storage_nodes: &mut Vec<(H256, Vec<TrieNode>)>,
3110 tx: Sender<CollectedStateMsg>,
3111) -> Result<(), StoreError> {
3112 let (subroot, mut state_nodes) = collect_trie(index, std::mem::take(state_trie))?;
3113 if !pre_collected_state.is_empty() {
3114 let mut pre = std::mem::take(pre_collected_state);
3115 pre.extend(state_nodes);
3116 state_nodes = pre;
3117 }
3118 tx.send(CollectedStateMsg {
3119 index,
3120 subroot,
3121 state_nodes,
3122 storage_nodes: std::mem::take(storage_nodes),
3123 })
3124 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3125 Ok(())
3126}
3127
3128fn get_or_open_storage_trie<'a>(
3130 storage_tries: &'a mut FxHashMap<H256, Trie>,
3131 storage: &Store,
3132 parent_state_root: H256,
3133 prefix: H256,
3134 storage_root: H256,
3135) -> Result<&'a mut Trie, StoreError> {
3136 match storage_tries.entry(prefix) {
3137 Entry::Occupied(e) => Ok(e.into_mut()),
3138 Entry::Vacant(e) => {
3139 Ok(e.insert(storage.open_storage_trie(prefix, parent_state_root, storage_root)?))
3140 }
3141 }
3142}
3143
3144fn handle_subtrie(
3145 storage: Store,
3146 rx: cb::Receiver<WorkerRequest>,
3147 parent_state_root: H256,
3148 index: u8,
3149 worker_senders: Vec<cb::Sender<WorkerRequest>>,
3150 shutdown_rx: cb::Receiver<()>,
3151) -> Result<(), StoreError> {
3152 let mut state_trie = storage.open_state_trie(parent_state_root)?;
3153 let mut storage_nodes: Vec<(H256, Vec<TrieNode>)> = vec![];
3154 let mut accounts: FxHashMap<H256, AccountState> = Default::default();
3155 let mut expected_shards: FxHashMap<H256, u16> = Default::default();
3156 let mut storage_state: FxHashMap<H256, PreMerkelizedAccountState> = Default::default();
3157 let mut received_shards: FxHashMap<H256, u16> = Default::default();
3158 let mut pending_storage_accounts: usize = 0;
3159 let mut pending_collect_tx: Option<Sender<CollectedStateMsg>> = None;
3160 let mut pre_collected_state: Vec<TrieNode> = vec![];
3161 let mut storage_tries: FxHashMap<H256, Trie> = Default::default();
3162 let mut pre_collected_storage: FxHashMap<H256, Vec<TrieNode>> = Default::default();
3163
3164 let mut worker_senders: Option<Vec<cb::Sender<WorkerRequest>>> = Some(worker_senders);
3166 let mut dirty = false;
3167 let mut collecting_storages = false;
3170 let mut routing_complete = false;
3171 let mut routing_done_mask: u16 = 0;
3172 let mut storage_to_collect: Vec<(H256, Trie)> = vec![];
3173
3174 loop {
3175 if collecting_storages {
3178 if let Some((prefix, trie)) = storage_to_collect.pop() {
3179 let senders = worker_senders
3180 .as_ref()
3181 .expect("collecting after senders dropped");
3182 let (root, mut nodes) = collect_trie(index, trie)?;
3183 if let Some(mut pre_nodes) = pre_collected_storage.remove(&prefix) {
3184 pre_nodes.extend(nodes);
3185 nodes = pre_nodes;
3186 }
3187 let bucket = prefix.as_fixed_bytes()[0] >> 4;
3188 senders[bucket as usize]
3189 .send(WorkerRequest::StorageShard {
3190 prefix,
3191 index,
3192 subroot: root,
3193 nodes,
3194 })
3195 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3196 } else {
3197 worker_senders = None;
3199 collecting_storages = false;
3200 if pending_storage_accounts == 0
3202 && let Some(tx) = pending_collect_tx.take()
3203 {
3204 collect_and_send(
3205 index,
3206 &mut state_trie,
3207 &mut pre_collected_state,
3208 &mut storage_nodes,
3209 tx,
3210 )?;
3211 break;
3212 }
3213 }
3214 }
3215
3216 let msg = if collecting_storages || dirty {
3219 match rx.try_recv() {
3220 Ok(msg) => msg,
3221 Err(TryRecvError::Disconnected) => break,
3222 Err(TryRecvError::Empty) => {
3223 if matches!(shutdown_rx.try_recv(), Err(TryRecvError::Disconnected)) {
3225 return Err(StoreError::Custom("shard worker shutdown".into()));
3226 }
3227 if dirty {
3228 let mut nodes = state_trie.commit_without_storing(&NativeCrypto);
3232 nodes.retain(|(nib, _)| nib.as_ref().first() == Some(&index));
3233 pre_collected_state.extend(nodes);
3234 if !collecting_storages {
3235 for (prefix, trie) in storage_tries.iter_mut() {
3237 let mut nodes = trie.commit_without_storing(&NativeCrypto);
3238 nodes.retain(|(nib, _)| nib.as_ref().first() == Some(&index));
3239 if !nodes.is_empty() {
3240 pre_collected_storage
3241 .entry(*prefix)
3242 .or_default()
3243 .extend(nodes);
3244 }
3245 }
3246 }
3247 dirty = false;
3248 }
3249 continue;
3250 }
3251 }
3252 } else {
3253 select! {
3254 recv(rx) -> msg => match msg {
3255 Ok(msg) => msg,
3256 Err(_) => break,
3257 },
3258 recv(shutdown_rx) -> _ => {
3259 return Err(StoreError::Custom("shard worker shutdown".into()));
3260 }
3261 }
3262 };
3263
3264 match msg {
3265 WorkerRequest::ProcessAccount {
3266 prefix,
3267 info,
3268 storage: account_storage,
3269 removed,
3270 removed_storage,
3271 } => {
3272 let senders = worker_senders
3273 .as_ref()
3274 .expect("ProcessAccount after collection started");
3275
3276 match accounts.entry(prefix) {
3278 Entry::Occupied(_) => {}
3279 Entry::Vacant(vacant_entry) => {
3280 let account_state = match state_trie.get(prefix.as_bytes())? {
3281 Some(rlp) => {
3282 let state = AccountState::decode(&rlp)?;
3283 state_trie.insert(prefix.as_bytes().to_vec(), rlp)?;
3284 state
3285 }
3286 None => AccountState::default(),
3287 };
3288 vacant_entry.insert(account_state);
3289 }
3290 }
3291
3292 if let Some(info) = info {
3294 let acct = accounts.get_mut(&prefix).expect("just loaded");
3295 acct.nonce = info.nonce;
3296 acct.balance = info.balance;
3297 acct.code_hash = info.code_hash;
3298 let path = prefix.as_bytes();
3299 if *acct != AccountState::default() {
3300 state_trie.insert(path.to_vec(), acct.encode_to_vec())?;
3301 } else {
3302 state_trie.remove(path)?;
3303 }
3304 }
3305
3306 if removed || removed_storage {
3307 pre_collected_storage.remove(&prefix);
3309 storage_tries.insert(prefix, Trie::new_temp());
3310 for (i, tx) in senders.iter().enumerate() {
3311 if i as u8 != index {
3312 tx.send(WorkerRequest::DeleteStorage(prefix))
3313 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3314 }
3315 }
3316 accounts.get_mut(&prefix).expect("just loaded").storage_root = *EMPTY_TRIE_HASH;
3317 if expected_shards.insert(prefix, 0xFFFF).is_none() {
3318 pending_storage_accounts += 1;
3319 }
3320 if removed {
3321 dirty = true;
3322 continue;
3323 }
3324 }
3325
3326 if !account_storage.is_empty() {
3327 let storage_root = accounts
3328 .get(&prefix)
3329 .map(|a| a.storage_root)
3330 .unwrap_or(*EMPTY_TRIE_HASH);
3331
3332 let is_new = !expected_shards.contains_key(&prefix);
3333 for (key, value) in account_storage {
3334 let hashed_key = keccak(key);
3335 let bucket = hashed_key.as_fixed_bytes()[0] >> 4;
3336 *expected_shards.entry(prefix).or_insert(0u16) |= 1 << bucket;
3337 if bucket == index {
3338 let trie = get_or_open_storage_trie(
3340 &mut storage_tries,
3341 &storage,
3342 parent_state_root,
3343 prefix,
3344 storage_root,
3345 )?;
3346 if value.is_zero() {
3347 trie.remove(hashed_key.as_bytes())?;
3348 } else {
3349 trie.insert(hashed_key.as_bytes().to_vec(), value.encode_to_vec())?;
3350 }
3351 } else {
3352 senders[bucket as usize]
3353 .send(WorkerRequest::MerklizeStorage {
3354 prefix,
3355 key: hashed_key,
3356 value,
3357 storage_root,
3358 })
3359 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3360 }
3361 }
3362 if is_new {
3363 pending_storage_accounts += 1;
3364 }
3365 }
3366 dirty = true;
3367 }
3368 WorkerRequest::MerklizeStorage {
3369 prefix,
3370 key,
3371 value,
3372 storage_root,
3373 } => {
3374 let trie = get_or_open_storage_trie(
3375 &mut storage_tries,
3376 &storage,
3377 parent_state_root,
3378 prefix,
3379 storage_root,
3380 )?;
3381 if value.is_zero() {
3382 trie.remove(key.as_bytes())?;
3383 } else {
3384 trie.insert(key.as_bytes().to_vec(), value.encode_to_vec())?;
3385 }
3386 dirty = true;
3387 }
3388 WorkerRequest::DeleteStorage(prefix) => {
3389 pre_collected_storage.remove(&prefix);
3390 storage_tries.insert(prefix, Trie::new_temp());
3391 dirty = true;
3392 }
3393 WorkerRequest::FinishRouting => {
3394 let senders = worker_senders
3396 .as_ref()
3397 .expect("FinishRouting after senders dropped");
3398 for i in 0..16u8 {
3399 senders[i as usize]
3400 .send(WorkerRequest::RoutingDone { from: index })
3401 .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3402 }
3403 }
3404 WorkerRequest::RoutingDone { from } => {
3405 routing_done_mask |= 1u16 << from;
3406 if routing_done_mask == 0xFFFF && !collecting_storages && !routing_complete {
3407 collecting_storages = true;
3408 routing_complete = true;
3409 storage_to_collect = storage_tries.drain().collect();
3410 }
3411 }
3412 WorkerRequest::MerklizeAccounts { accounts: batch } => {
3413 for hashed_account in batch {
3415 storage_nodes.push((hashed_account, vec![]));
3416 }
3417 }
3418 WorkerRequest::StorageShard {
3419 prefix,
3420 index: shard_index,
3421 mut subroot,
3422 nodes,
3423 } => {
3424 let state = storage_state.entry(prefix).or_default();
3425 match &mut state.storage_root {
3426 Some(root) => {
3427 root.choices[shard_index as usize] =
3428 std::mem::take(&mut subroot.choices[shard_index as usize]);
3429 }
3430 rootptr => {
3431 *rootptr = Some(subroot);
3432 }
3433 }
3434 state.nodes.extend(nodes);
3435
3436 let received = received_shards.entry(prefix).or_insert(0u16);
3437 *received |= 1 << shard_index;
3438 if *received == expected_shards.get(&prefix).copied().unwrap_or(0) {
3439 let mut state = storage_state.remove(&prefix).expect("shard without state");
3441 let new_storage_root = if let Some(mut root) = state.storage_root {
3442 root.choices.iter_mut().for_each(NodeRef::clear_hash);
3444 let collapsed =
3445 collapse_root_node(&storage, parent_state_root, Some(prefix), *root)?;
3446 if let Some(root) = collapsed {
3447 let mut root = NodeRef::from(root);
3448 let hash =
3449 root.commit(Nibbles::default(), &mut state.nodes, &NativeCrypto);
3450 let _ = DROP_SENDER.send(Box::new(root));
3451 hash.finalize(&NativeCrypto)
3452 } else {
3453 state.nodes.push((Nibbles::default(), vec![RLP_NULL]));
3454 *EMPTY_TRIE_HASH
3455 }
3456 } else {
3457 *EMPTY_TRIE_HASH
3458 };
3459 storage_nodes.push((prefix, state.nodes));
3460
3461 let old_state = accounts.get_mut(&prefix).expect("loaded in ProcessAccount");
3463 old_state.storage_root = new_storage_root;
3464 let path = prefix.as_bytes();
3465 if *old_state != AccountState::default() {
3466 state_trie.insert(path.to_vec(), old_state.encode_to_vec())?;
3467 } else {
3468 state_trie.remove(path)?;
3469 }
3470
3471 dirty = true;
3472 pending_storage_accounts -= 1;
3473 if pending_storage_accounts == 0
3474 && !collecting_storages
3475 && routing_complete
3476 && let Some(tx) = pending_collect_tx.take()
3477 {
3478 collect_and_send(
3479 index,
3480 &mut state_trie,
3481 &mut pre_collected_state,
3482 &mut storage_nodes,
3483 tx,
3484 )?;
3485 break;
3486 }
3487 }
3488 }
3489 WorkerRequest::CollectState { tx } => {
3490 if pending_storage_accounts == 0 && !collecting_storages && routing_complete {
3491 collect_and_send(
3492 index,
3493 &mut state_trie,
3494 &mut pre_collected_state,
3495 &mut storage_nodes,
3496 tx,
3497 )?;
3498 break;
3499 }
3500 pending_collect_tx = Some(tx);
3502 }
3503 }
3504 }
3505 Ok(())
3506}
3507
3508pub fn new_evm(blockchain_type: &BlockchainType, vm_db: StoreVmDatabase) -> Result<Evm, EvmError> {
3509 let evm = match blockchain_type {
3510 BlockchainType::L1 => Evm::new_for_l1(vm_db, Arc::new(NativeCrypto)),
3511 BlockchainType::L2(l2_config) => {
3512 let fee_config = *l2_config
3513 .fee_config
3514 .read()
3515 .map_err(|_| EvmError::Custom("Fee config lock was poisoned".to_string()))?;
3516
3517 Evm::new_for_l2(vm_db, fee_config, Arc::new(NativeCrypto))?
3518 }
3519 };
3520 Ok(evm)
3521}
3522
3523pub fn validate_state_root(
3525 block_header: &BlockHeader,
3526 new_state_root: H256,
3527) -> Result<(), ChainError> {
3528 if new_state_root == block_header.state_root {
3530 Ok(())
3531 } else {
3532 Err(ChainError::InvalidBlock(
3533 InvalidBlockError::StateRootMismatch,
3534 ))
3535 }
3536}
3537
3538pub async fn latest_canonical_block_hash(storage: &Store) -> Result<H256, ChainError> {
3540 let latest_block_number = storage.get_latest_block_number().await?;
3541 if let Some(latest_valid_header) = storage.get_block_header(latest_block_number)? {
3542 let latest_valid_hash = latest_valid_header.hash();
3543 return Ok(latest_valid_hash);
3544 }
3545 Err(ChainError::StoreError(StoreError::Custom(
3546 "Could not find latest valid hash".to_string(),
3547 )))
3548}
3549
3550pub fn find_parent_header(
3553 block_header: &BlockHeader,
3554 storage: &Store,
3555) -> Result<BlockHeader, ChainError> {
3556 match storage.get_block_header_by_hash(block_header.parent_hash)? {
3557 Some(parent_header) => Ok(parent_header),
3558 None => Err(ChainError::ParentNotFound),
3559 }
3560}
3561
3562pub async fn is_canonical(
3563 store: &Store,
3564 block_number: BlockNumber,
3565 block_hash: BlockHash,
3566) -> Result<bool, StoreError> {
3567 match store.get_canonical_block_hash(block_number).await? {
3568 Some(hash) if hash == block_hash => Ok(true),
3569 _ => Ok(false),
3570 }
3571}
3572
3573fn branchify(node: Node) -> Box<BranchNode> {
3574 match node {
3575 Node::Branch(branch_node) => branch_node,
3576 Node::Extension(extension_node) => {
3577 let index = extension_node.prefix.as_ref()[0];
3578 let noderef = if extension_node.prefix.len() == 1 {
3579 extension_node.child
3580 } else {
3581 let prefix = extension_node.prefix.offset(1);
3582 let node = ExtensionNode::new(prefix, extension_node.child);
3583 NodeRef::from(Arc::new(node.into()))
3584 };
3585 let mut choices = BranchNode::EMPTY_CHOICES;
3586 choices[index as usize] = noderef;
3587 Box::new(BranchNode::new(choices))
3588 }
3589 Node::Leaf(leaf_node) => {
3590 let index = leaf_node.partial.as_ref()[0];
3591 let node = LeafNode::new(leaf_node.partial.offset(1), leaf_node.value);
3592 let mut choices = BranchNode::EMPTY_CHOICES;
3593 choices[index as usize] = NodeRef::from(Arc::new(node.into()));
3594 Box::new(BranchNode::new(choices))
3595 }
3596 }
3597}
3598
3599fn collect_trie(index: u8, mut trie: Trie) -> Result<(Box<BranchNode>, Vec<TrieNode>), TrieError> {
3600 let root = branchify(
3601 trie.root_node()?
3602 .map(Arc::unwrap_or_clone)
3603 .unwrap_or_else(|| Node::Branch(Box::default())),
3604 );
3605 trie.root = Node::Branch(root).into();
3606 let (_, mut nodes) = trie.collect_changes_since_last_hash(&NativeCrypto);
3607 nodes.retain(|(nib, _)| nib.as_ref().first() == Some(&index));
3608
3609 let Some(Node::Branch(root)) = trie.root_node()?.map(Arc::unwrap_or_clone) else {
3610 return Err(TrieError::InvalidInput);
3611 };
3612 Ok((root, nodes))
3613}