Skip to main content

ethrex_blockchain/
blockchain.rs

1//! # ethrex Blockchain
2//!
3//! Core blockchain logic for the ethrex Ethereum client.
4//!
5//! ## Overview
6//!
7//! This module implements the blockchain layer, which is responsible for:
8//! - Block validation and execution
9//! - State management and transitions
10//! - Fork choice rule implementation
11//! - Transaction mempool management
12//! - Payload building for block production
13//!
14//! ## Key Components
15//!
16//! - [`Blockchain`]: Main interface for blockchain operations
17//! - [`Mempool`]: Transaction pool for pending transactions
18//! - [`fork_choice`]: Fork choice rule implementation
19//! - [`payload`]: Block payload building for consensus
20//!
21//! ## Block Execution Flow
22//!
23//! ```text
24//! 1. Receive block from consensus/P2P
25//! 2. Validate block header (parent, timestamp, gas limit, etc.)
26//! 3. Execute transactions in EVM
27//! 4. Verify state root matches header
28//! 5. Store block and update canonical chain
29//! ```
30//!
31//! ## Usage
32//!
33//! ```ignore
34//! use ethrex_blockchain::Blockchain;
35//!
36//! let blockchain = Blockchain::new(store, BlockchainOptions::default());
37//!
38//! // Add a block
39//! blockchain.add_block(&block)?;
40//!
41//! // Add transaction to mempool
42//! blockchain.add_transaction_to_mempool(tx).await?;
43//! ```
44
45pub mod constants;
46pub mod error;
47pub mod fork_choice;
48pub mod mempool;
49pub mod payload;
50pub mod tracing;
51pub mod vm;
52
53use ::tracing::{debug, error, info, instrument, warn};
54use constants::{AMSTERDAM_MAX_INITCODE_SIZE, MAX_INITCODE_SIZE, POST_OSAKA_GAS_LIMIT_CAP};
55use error::MempoolError;
56use error::{ChainError, InvalidBlockError};
57use ethrex_common::constants::{EMPTY_TRIE_HASH, MIN_BASE_FEE_PER_BLOB_GAS};
58
59use crossbeam::channel::{self as cb, TryRecvError, select};
60// Re-export stateless validation functions for backwards compatibility
61#[cfg(feature = "c-kzg")]
62use ethrex_common::types::EIP4844Transaction;
63#[cfg(feature = "c-kzg")]
64use ethrex_common::types::MAX_BLOB_TX_SIZE;
65use ethrex_common::types::MAX_TX_SIZE;
66use ethrex_common::types::block_access_list::BlockAccessList;
67use ethrex_common::types::block_execution_witness::ExecutionWitness;
68use ethrex_common::types::fee_config::FeeConfig;
69use ethrex_common::types::{
70    AccountInfo, AccountState, AccountUpdate, BalSynthesisItem, Block, BlockHash, BlockHeader,
71    BlockNumber, ChainConfig, Code, Receipt, Transaction, WrappedEIP4844Transaction,
72    synthesize_bal_updates, validate_block_body,
73};
74use ethrex_common::types::{ELASTICITY_MULTIPLIER, P2PTransaction};
75use ethrex_common::types::{Fork, MempoolTransaction};
76use ethrex_common::utils::keccak;
77use ethrex_common::{Address, H256, TrieLogger, U256};
78pub use ethrex_common::{
79    get_total_blob_gas, validate_block_access_list_hash, validate_block_pre_execution,
80    validate_gas_used, validate_receipts_root_and_logs_bloom, validate_requests_hash,
81};
82use ethrex_crypto::NativeCrypto;
83use ethrex_metrics::metrics;
84use ethrex_rlp::constants::RLP_NULL;
85use ethrex_rlp::decode::RLPDecode;
86use ethrex_rlp::encode::RLPEncode;
87use ethrex_storage::{
88    AccountUpdatesList, Store, UpdateBatch, error::StoreError, hash_address, hash_key,
89};
90use ethrex_trie::node::{BranchNode, ExtensionNode, LeafNode};
91use ethrex_trie::{Nibbles, Node, NodeRef, Trie, TrieError, TrieNode};
92use ethrex_vm::backends::CachingDatabase;
93#[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
94use ethrex_vm::backends::levm::LEVM;
95use ethrex_vm::backends::levm::db::DatabaseLogger;
96use ethrex_vm::{BlockExecutionResult, DynVmDatabase, Evm, EvmError};
97use mempool::Mempool;
98use payload::PayloadOrTask;
99use rustc_hash::{FxHashMap, FxHashSet};
100use std::collections::hash_map::Entry;
101use std::collections::{BTreeMap, HashMap, HashSet};
102use std::sync::LazyLock;
103use std::sync::mpsc::Sender;
104use std::sync::{
105    Arc, RwLock,
106    atomic::{AtomicBool, AtomicUsize, Ordering},
107    mpsc::{Receiver, channel},
108};
109use std::time::{Duration, Instant};
110use tokio::sync::Mutex as TokioMutex;
111use tokio_util::sync::CancellationToken;
112
113use vm::StoreVmDatabase;
114
115#[cfg(feature = "metrics")]
116use ethrex_metrics::bal::METRICS_BAL;
117#[cfg(feature = "metrics")]
118use ethrex_metrics::blocks::METRICS_BLOCKS;
119
120#[cfg(feature = "c-kzg")]
121use ethrex_common::types::BlobsBundle;
122
123const MAX_PAYLOADS: usize = 10;
124const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000;
125
126/// Background thread for dropping large tree structures off the critical path.
127/// Accepts any `Send` value and drops it on a dedicated thread, avoiding
128/// recursive deallocation costs (~500us for state trie roots) on hot paths.
129static DROP_SENDER: LazyLock<Sender<Box<dyn Send>>> = LazyLock::new(|| {
130    let (tx, rx) = channel::<Box<dyn Send>>();
131    std::thread::Builder::new()
132        .name("drop_thread".to_string())
133        .spawn(move || for _ in rx {})
134        .expect("failed to spawn drop thread");
135    tx
136});
137
138// Result type for execute_block_pipeline
139type BlockExecutionPipelineResult = (
140    BlockExecutionResult,
141    AccountUpdatesList,
142    Option<Vec<AccountUpdate>>,
143    Option<BlockAccessList>, // produced BAL (Some on Amsterdam+ blocks)
144    usize,                   // max queue length
145    [Instant; 7],            // timing instants
146    Duration,                // warmer duration
147);
148
149type AddBlockPipelineInnerResult = (
150    Option<BlockAccessList>,
151    Option<ExecutionWitness>,
152    Result<(), ChainError>,
153);
154
155//TODO: Implement a struct Chain or BlockChain to encapsulate
156//functionality and canonical chain state and config
157
158/// Specifies whether the blockchain operates as L1 (mainnet/testnet) or L2 (rollup).
159#[derive(Debug, Clone, Default)]
160pub enum BlockchainType {
161    /// Standard Ethereum L1 blockchain.
162    #[default]
163    L1,
164    /// Layer 2 rollup with additional fee configuration.
165    L2(L2Config),
166}
167
168/// Configuration for L2 rollup operation.
169#[derive(Debug, Clone, Default)]
170pub struct L2Config {
171    /// Fee configuration for L2 transactions.
172    ///
173    /// Uses `RwLock` because the Watcher updates L1 fee config periodically.
174    pub fee_config: Arc<RwLock<FeeConfig>>,
175}
176
177/// Core blockchain implementation for block validation and execution.
178///
179/// The `Blockchain` struct is the main entry point for all blockchain operations:
180/// - Adding and validating blocks
181/// - Managing the transaction mempool
182/// - Building payloads for block production
183/// - Handling fork choice updates
184///
185/// # Thread Safety
186///
187/// `Blockchain` uses interior mutability for thread-safe access to shared state.
188/// The mempool and payload storage are protected by appropriate synchronization primitives.
189///
190/// # Example
191///
192/// ```ignore
193/// let blockchain = Blockchain::new(store, BlockchainOptions::default());
194///
195/// // Validate and add a block
196/// blockchain.add_block(&block)?;
197///
198/// // Check sync status
199/// if blockchain.is_synced() {
200///     // Process transactions from mempool
201/// }
202/// ```
203#[derive(Debug)]
204pub struct Blockchain {
205    /// Underlying storage for blocks and state.
206    storage: Store,
207    /// Transaction mempool for pending transactions.
208    pub mempool: Mempool,
209    /// Whether the node has completed initial sync.
210    ///
211    /// Set to true after initial sync completes, never reset to false.
212    /// Does not reflect whether an ongoing sync is in progress.
213    is_synced: AtomicBool,
214    /// Configuration options for blockchain behavior.
215    pub options: BlockchainOptions,
216    /// Cache of recently built payloads.
217    ///
218    /// Maps payload IDs to either completed payloads or in-progress build tasks.
219    /// Kept around in case consensus requests the same payload twice.
220    pub payloads: Arc<TokioMutex<Vec<(u64, PayloadOrTask)>>>,
221    /// Persistent thread pool for merkleization workers.
222    /// 17 threads: 16 shard workers + 1 watcher/coordination.
223    ///
224    /// `Arc` for sharing in test harnesses that build many `Blockchain`s; the
225    /// production path keeps the original semantics (one fresh pool per call
226    /// to `Blockchain::new` / `default_with_store`).
227    merkle_pool: Arc<rayon::ThreadPool>,
228}
229
230/// Configuration options for the blockchain.
231#[derive(Debug, Clone)]
232pub struct BlockchainOptions {
233    /// Maximum number of transactions in the mempool.
234    pub max_mempool_size: usize,
235    /// Whether to emit performance logging.
236    pub perf_logs_enabled: bool,
237    /// Blockchain type (L1 or L2).
238    pub r#type: BlockchainType,
239    /// EIP-7872: User-configured maximum blobs per block for local building.
240    /// If None, uses the protocol maximum for the current fork.
241    pub max_blobs_per_block: Option<u32>,
242    /// If true, computes execution witnesses upon receiving newPayload messages and stores them in local storage
243    pub precompute_witnesses: bool,
244    /// If true (default), per-block execution caches precompile results between the
245    /// warmer thread and the executor. Set to false (via `--no-precompile-cache`) to
246    /// disable the cache for benchmarking purposes.
247    pub precompile_cache_enabled: bool,
248    /// If true (default), Amsterdam+ validation runs transactions in parallel
249    /// using the header BAL to seed per-tx databases. Set to false (via
250    /// `--no-bal-parallel-exec`) to fall back to sequential execution.
251    pub bal_parallel_exec_enabled: bool,
252    /// If true (default), Amsterdam+ validation spawns a warmer thread that
253    /// prefetches accounts, storage slots, and codes listed in the header BAL.
254    /// Set to false (via `--no-bal-prefetch`) to skip prefetching on the BAL path.
255    pub bal_prefetch_enabled: bool,
256    /// If true (default), Amsterdam+ validation merkleizes optimistically from
257    /// `synthesize_bal_updates` in parallel with execution. Set to false (via
258    /// `--no-bal-parallel-trie`) to fall back to streaming `AccountUpdate`s from
259    /// the executor and merkleizing post-execution.
260    pub bal_parallel_trie_enabled: bool,
261}
262
263impl Default for BlockchainOptions {
264    fn default() -> Self {
265        Self {
266            max_mempool_size: MAX_MEMPOOL_SIZE_DEFAULT,
267            perf_logs_enabled: false,
268            r#type: BlockchainType::default(),
269            max_blobs_per_block: None,
270            precompute_witnesses: false,
271            precompile_cache_enabled: true,
272            bal_parallel_exec_enabled: true,
273            bal_prefetch_enabled: true,
274            bal_parallel_trie_enabled: true,
275        }
276    }
277}
278
279#[derive(Debug, Clone)]
280pub struct BatchBlockProcessingFailure {
281    pub last_valid_hash: H256,
282    pub failed_block_hash: H256,
283}
284
285fn log_batch_progress(batch_size: u32, current_block: u32) {
286    let progress_needed = batch_size > 10;
287    const PERCENT_MARKS: [u32; 4] = [20, 40, 60, 80];
288    if progress_needed {
289        PERCENT_MARKS.iter().for_each(|mark| {
290            if (batch_size * mark) / 100 == current_block {
291                info!("[SYNCING] {mark}% of batch processed");
292            }
293        });
294    }
295}
296
297enum WorkerRequest {
298    // From main thread (routed by account bucket)
299    ProcessAccount {
300        prefix: H256,
301        info: Option<AccountInfo>,
302        storage: FxHashMap<H256, U256>,
303        removed: bool,
304        removed_storage: bool,
305    },
306    // From main thread (broadcast to all workers)
307    FinishRouting,
308    MerklizeAccounts {
309        accounts: Vec<H256>,
310    },
311    CollectState {
312        tx: Sender<CollectedStateMsg>,
313    },
314    // Cross-worker storage messages (routed by storage key bucket)
315    MerklizeStorage {
316        prefix: H256,
317        key: H256,
318        value: U256,
319        storage_root: H256,
320    },
321    DeleteStorage(H256),
322    // Cross-worker: signals this worker finished routing all MerklizeStorage
323    RoutingDone {
324        from: u8,
325    },
326    // Cross-worker storage results (routed by account bucket)
327    StorageShard {
328        prefix: H256,
329        index: u8,
330        subroot: Box<BranchNode>,
331        nodes: Vec<TrieNode>,
332    },
333}
334
335struct CollectedStateMsg {
336    index: u8,
337    subroot: Box<BranchNode>,
338    state_nodes: Vec<TrieNode>,
339    storage_nodes: Vec<(H256, Vec<TrieNode>)>,
340}
341
342#[derive(Default)]
343struct PreMerkelizedAccountState {
344    storage_root: Option<Box<BranchNode>>,
345    nodes: Vec<TrieNode>,
346}
347
348/// Work item for BAL state trie shard workers.
349struct BalStateWorkItem {
350    hashed_address: H256,
351    nonce: Option<u64>,
352    balance: Option<U256>,
353    code_hash: Option<H256>,
354    /// Pre-computed storage root from Stage B, or None to keep existing.
355    storage_root: Option<H256>,
356}
357
358impl Blockchain {
359    /// Build a fresh 17-thread merkleization pool. Used by the default
360    /// constructors; tests that build many `Blockchain`s should share one pool
361    /// via `default_with_store_and_pool` to avoid spawning the pool repeatedly.
362    pub fn build_merkle_pool() -> Arc<rayon::ThreadPool> {
363        Arc::new(
364            rayon::ThreadPoolBuilder::new()
365                .num_threads(17)
366                .thread_name(|i| format!("merkle-worker-{i}"))
367                .build()
368                .expect("Failed to create merkle thread pool"),
369        )
370    }
371
372    pub fn new(store: Store, blockchain_opts: BlockchainOptions) -> Self {
373        Self {
374            storage: store,
375            mempool: Mempool::new(blockchain_opts.max_mempool_size),
376            is_synced: AtomicBool::new(false),
377            payloads: Arc::new(TokioMutex::new(Vec::new())),
378            options: blockchain_opts,
379            merkle_pool: Self::build_merkle_pool(),
380        }
381    }
382
383    /// Like `default_with_store`, but reuses an externally-owned merkleization
384    /// pool. Intended for test harnesses that build many short-lived
385    /// `Blockchain` instances; sharing the pool avoids spawning 17 fresh OS
386    /// threads per instance.
387    ///
388    /// SAFETY: the caller must ensure each pool has only one concurrent
389    /// `in_place_scope` user at a time. The internal merkle protocol requires
390    /// all 16 worker jobs to run concurrently (they cross-communicate via
391    /// channels); sharing a pool across simultaneous callers deadlocks.
392    pub fn default_with_store_and_pool(store: Store, pool: Arc<rayon::ThreadPool>) -> Self {
393        Self {
394            storage: store,
395            mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT),
396            is_synced: AtomicBool::new(false),
397            payloads: Arc::new(TokioMutex::new(Vec::new())),
398            options: BlockchainOptions::default(),
399            merkle_pool: pool,
400        }
401    }
402
403    pub fn default_with_store(store: Store) -> Self {
404        Self {
405            storage: store,
406            mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT),
407            is_synced: AtomicBool::new(false),
408            payloads: Arc::new(TokioMutex::new(Vec::new())),
409            options: BlockchainOptions::default(),
410            merkle_pool: Self::build_merkle_pool(),
411        }
412    }
413
414    /// L1 blocks must not contain L2-only transaction types (`FeeToken` 0x7d,
415    /// `Privileged` 0x7e). Both are L2-only types unknown to other L1 clients, so
416    /// accepting one on L1 diverges consensus. `Privileged` additionally takes its
417    /// sender from an unsigned, caller-chosen `from` (no signature recovery), so it
418    /// would also let a block forge a sender. On L2 these types are valid, so this
419    /// check only applies to L1.
420    fn validate_l1_transaction_types(&self, block: &Block) -> Result<(), ChainError> {
421        if !matches!(self.options.r#type, BlockchainType::L1) {
422            return Ok(());
423        }
424        for tx in &block.body.transactions {
425            if tx.tx_type().is_l2_only() {
426                return Err(ChainError::InvalidBlock(
427                    InvalidBlockError::UnsupportedTransactionType(tx.tx_type() as u8),
428                ));
429            }
430        }
431        Ok(())
432    }
433
434    /// Executes a block withing a new vm instance and state
435    fn execute_block(
436        &self,
437        block: &Block,
438    ) -> Result<(BlockExecutionResult, Vec<AccountUpdate>), ChainError> {
439        // Validate if it can be the new head and find the parent
440        let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
441            // If the parent is not present, we store it as pending.
442            self.storage.add_pending_block(block.clone())?;
443            return Err(ChainError::ParentNotFound);
444        };
445
446        let chain_config = self.storage.get_chain_config();
447
448        // Validate the block pre-execution
449        validate_block_pre_execution(block, &parent_header, &chain_config, ELASTICITY_MULTIPLIER)?;
450        self.validate_l1_transaction_types(block)?;
451
452        let vm_db = StoreVmDatabase::new(self.storage.clone(), parent_header)?;
453        let mut vm = self.new_evm(vm_db)?;
454
455        let (execution_result, bal) = vm.execute_block(block)?;
456        let account_updates = vm.get_state_transitions()?;
457
458        // Validate execution went alright
459        if let Err(e) = validate_gas_used(execution_result.block_gas_used, &block.header) {
460            ethrex_vm::log_gas_used_mismatch(
461                &execution_result.tx_gas_breakdowns,
462                block.header.number,
463                execution_result.block_gas_used,
464                block.header.gas_used,
465            );
466            return Err(e.into());
467        }
468        validate_receipts_root_and_logs_bloom(
469            &block.header,
470            &execution_result.receipts,
471            &NativeCrypto,
472        )?;
473        validate_requests_hash(&block.header, &chain_config, &execution_result.requests)?;
474        if let Some(bal) = &bal {
475            validate_block_access_list_hash(
476                &block.header,
477                &chain_config,
478                bal,
479                block.body.transactions.len(),
480            )?;
481        }
482
483        Ok((execution_result, account_updates))
484    }
485
486    /// Generates Block Access List by re-executing a block.
487    /// Returns None for pre-Amsterdam blocks.
488    /// This is used by engine_getPayloadBodiesByHashV2 and engine_getPayloadBodiesByRangeV2.
489    pub fn generate_bal_for_block(
490        &self,
491        block: &Block,
492    ) -> Result<Option<BlockAccessList>, ChainError> {
493        let chain_config = self.storage.get_chain_config();
494
495        // Pre-Amsterdam blocks don't have BAL
496        if !chain_config.is_amsterdam_activated(block.header.timestamp) {
497            return Ok(None);
498        }
499
500        // Find parent header
501        let parent_header = find_parent_header(&block.header, &self.storage)?;
502
503        // Create VM and execute block with BAL recording
504        let vm_db = StoreVmDatabase::new(self.storage.clone(), parent_header)?;
505        let mut vm = self.new_evm(vm_db)?;
506
507        let (_execution_result, bal) = vm.execute_block(block)?;
508
509        Ok(bal)
510    }
511
512    /// Executes a block withing a new vm instance and state
513    #[instrument(
514        level = "trace",
515        name = "Execute Block",
516        skip_all,
517        fields(namespace = "block_execution")
518    )]
519    fn execute_block_pipeline(
520        &self,
521        block: &Block,
522        parent_header: &BlockHeader,
523        vm: &mut Evm,
524        bal: Option<&BlockAccessList>,
525        collect_witness: bool,
526    ) -> Result<BlockExecutionPipelineResult, ChainError> {
527        let start_instant = Instant::now();
528
529        let chain_config = self.storage.get_chain_config();
530
531        // Validate the block pre-execution
532        validate_block_pre_execution(block, parent_header, &chain_config, ELASTICITY_MULTIPLIER)?;
533        self.validate_l1_transaction_types(block)?;
534        validate_block_body(&block.header, &block.body, &NativeCrypto)
535            .map_err(|e| ChainError::InvalidBlock(InvalidBlockError::InvalidBody(e)))?;
536        let block_validated_instant = Instant::now();
537
538        let exec_merkle_start = Instant::now();
539        let queue_length = AtomicUsize::new(0);
540        let queue_length_ref = &queue_length;
541        let mut max_queue_length = 0;
542
543        // Wrap the store with CachingDatabase so both warming and execution
544        // can benefit from shared caching of state lookups
545        let original_store = vm.db.store.clone();
546        let caching_store: Arc<dyn ethrex_vm::backends::LevmDatabase> = Arc::new(
547            CachingDatabase::new(original_store, self.options.precompile_cache_enabled),
548        );
549
550        // Replace the VM's store with the caching version
551        vm.db.store = caching_store.clone();
552
553        let cancelled = AtomicBool::new(false);
554        // Witness collection also forces sequential execution: parallel lanes
555        // re-read in-block-created state (e.g. a code deployed by an earlier
556        // tx) from the logged store, while sequential execution serves it from
557        // VM caches — recording accesses the canonical execution never makes.
558        let bal_parallel_exec_enabled = self.options.bal_parallel_exec_enabled && !collect_witness;
559
560        // Synthesize BAL updates pre-scope so the merkleizer thread can start
561        // trie work immediately, in parallel with execution.
562        // `--no-bal-parallel-trie` opts out: leave `optimistic_updates = None` so
563        // the merkleizer takes the streaming branch (fed by the EVM-side
564        // `bal_to_account_updates` send over the channel below).
565        // Witness collection forces the streaming branch too: the sequential
566        // executor (see `bal_parallel_exec_enabled` below) streams per-tx
567        // updates over the channel, which only the streaming merkleizer
568        // consumes — the synthesized path would leave the receiver dropped.
569        let optimistic_updates: Option<FxHashMap<Address, BalSynthesisItem>> =
570            if self.options.bal_parallel_trie_enabled && !collect_witness {
571                bal.map(synthesize_bal_updates)
572            } else {
573                None
574            };
575
576        // Synchronously warm all BAL storage slots before the executor thread starts.
577        //
578        // The warmer and executor share one CachingDatabase; `prefetch_storage`
579        // populates the cache only after its whole parallel fetch completes, so when
580        // the warmer ran storage concurrently the executor raced it to the trie for
581        // SSTORE original values and lost (~22% of CPU on cold-cache import-bench).
582        // Doing the storage prefetch up front (parallel, on all cores) lets execution
583        // run fully warm and removes the warmer's CPU/lock contention with it.
584        //
585        // Measured on bal-devnet-7-mainnet-mix-460 (import-bench --with-bal, vs main):
586        //   - concurrent storage warming (any ordering/chunking): ~ -7% to -13%
587        //   - this synchronous full-storage prefetch:             ~ -24%
588        // DO NOT move storage back into the concurrent warmer; the race is the whole
589        // problem. DO NOT add account prefetch here too: that regressed (~ +150 ms),
590        // because account reads already overlap exec well and a synchronous pass both
591        // adds serial latency and double-fetches against the warmer's Phase 1. Slots
592        // are warmed in natural account order; an execution-order sort gave no benefit
593        // once every slot is warm before exec.
594        //
595        // Live-node tradeoff: this prefetch is on the critical path before exec, no
596        // longer overlapped with it. With a warm cache the reads hit and it is a
597        // no-op; on a genuinely cold block (first slot after restart, account-heavy
598        // block) it adds serial latency the old overlapped warmer would have hidden.
599        // The benchmarks above are cold-cache batch import, not single-block live
600        // tail latency; the tradeoff is deliberate and favors throughput.
601        //
602        // Gated by `--no-bal-prefetch`: when the operator disables BAL-driven
603        // prefetching, skip the synchronous storage warm too. The warmer thread
604        // below already honors the same toggle.
605        // Witness collection records every read that reaches the store-backed
606        // logger beneath the shared cache. The warmer's speculative reads would
607        // be recorded as state accesses the canonical execution never makes,
608        // polluting the witness (e.g. `engine_newPayloadWithWitnessV5`), so
609        // warming is skipped entirely when a witness is being collected.
610        #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
611        if self.options.bal_prefetch_enabled
612            && !collect_witness
613            && let Some(bal) = bal
614        {
615            let slots = LEVM::bal_storage_slots(bal);
616            if !slots.is_empty() {
617                let _ = caching_store.prefetch_storage(&slots);
618            }
619        }
620
621        let (execution_result, merkleization_result, warmer_duration) =
622            std::thread::scope(|s| -> Result<_, ChainError> {
623                #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
624                let vm_type = vm.vm_type;
625                let cancelled_ref = &cancelled;
626                #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
627                let bal_prefetch_enabled = self.options.bal_prefetch_enabled;
628                #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
629                let warm_handle = (!collect_witness)
630                    .then(|| {
631                        std::thread::Builder::new()
632                            .name("block_executor_warmer".to_string())
633                            .spawn_scoped(s, move || {
634                                // Warming uses the same caching store, sharing cached state with execution.
635                                // Precompile cache lives inside CachingDatabase, shared automatically.
636                                let start = Instant::now();
637                                if let Some(bal) = bal {
638                                    if bal_prefetch_enabled {
639                                        // Amsterdam+: BAL-based precise prefetching (no tx re-execution).
640                                        if let Err(e) = LEVM::warm_block_from_bal(
641                                            bal,
642                                            caching_store,
643                                            cancelled_ref,
644                                        ) {
645                                            debug!("BAL warming failed (non-fatal): {e}");
646                                        }
647                                    } else if !bal_parallel_exec_enabled {
648                                        // --no-bal-prefetch combined with --no-bal-parallel-exec:
649                                        // mirror the pre-Amsterdam setup where a parallel speculative
650                                        // warmer races ahead of the serial executor. With parallel
651                                        // exec still on, we skip warming instead — two parallel passes
652                                        // over the same txs would just fight for cores.
653                                        if let Err(e) = LEVM::warm_block(
654                                            block,
655                                            caching_store,
656                                            vm_type,
657                                            &NativeCrypto,
658                                            cancelled_ref,
659                                        ) {
660                                            debug!("Block warming failed (non-fatal): {e}");
661                                        }
662                                    }
663                                } else {
664                                    // Pre-Amsterdam / P2P sync: speculative tx re-execution
665                                    if let Err(e) = LEVM::warm_block(
666                                        block,
667                                        caching_store,
668                                        vm_type,
669                                        &NativeCrypto,
670                                        cancelled_ref,
671                                    ) {
672                                        debug!("Block warming failed (non-fatal): {e}");
673                                    }
674                                }
675                                start.elapsed()
676                            })
677                            .map_err(|e| {
678                                ChainError::Custom(format!("Failed to spawn warmer thread: {e}"))
679                            })
680                    })
681                    .transpose()?;
682                let max_queue_length_ref = &mut max_queue_length;
683                // Channel is needed whenever the merkleizer takes the streaming
684                // branch OR LEVM falls into the sequential path:
685                // - sequential LEVM (`!bal_parallel_exec_enabled`) sends per-tx
686                //   updates via `send_state_transitions_tx`; errors if Sender is None.
687                // - streaming merkleizer (`!bal_parallel_trie_enabled` or no BAL)
688                //   reads updates from `rx`.
689                // Only the default `bal=Some && parallel_exec && parallel_trie` case
690                // can skip both: parallel LEVM doesn't stream when its Sender is None,
691                // and the merkleizer uses the synthesized optimistic map directly.
692                let (tx, rx_for_merkle) =
693                    if optimistic_updates.is_some() && bal_parallel_exec_enabled {
694                        (None, None)
695                    } else {
696                        let (tx, rx) = channel();
697                        (Some(tx), Some(rx))
698                    };
699
700                let execution_handle = std::thread::Builder::new()
701                    .name("block_executor_execution".to_string())
702                    .spawn_scoped(s, move || -> Result<_, ChainError> {
703                        let result = vm.execute_block_pipeline(
704                            block,
705                            tx,
706                            queue_length_ref,
707                            bal,
708                            bal_parallel_exec_enabled,
709                        );
710                        cancelled_ref.store(true, Ordering::Relaxed);
711                        let (execution_result, produced_bal) = result?;
712
713                        // Validate execution went alright
714                        if let Err(e) =
715                            validate_gas_used(execution_result.block_gas_used, &block.header)
716                        {
717                            ethrex_vm::log_gas_used_mismatch(
718                                &execution_result.tx_gas_breakdowns,
719                                block.header.number,
720                                execution_result.block_gas_used,
721                                block.header.gas_used,
722                            );
723                            return Err(e.into());
724                        }
725                        validate_receipts_root_and_logs_bloom(
726                            &block.header,
727                            &execution_result.receipts,
728                            &NativeCrypto,
729                        )?;
730                        validate_requests_hash(
731                            &block.header,
732                            &chain_config,
733                            &execution_result.requests,
734                        )?;
735                        // EIP-7928 block_access_list_hash commitment check.
736                        //
737                        // Sequential Amsterdam path: rebuilds a BAL and returns
738                        // Some(produced_bal), so the full hash+index+size check runs here.
739                        //
740                        // Parallel Amsterdam path: uses the header BAL directly to drive
741                        // execution and returns produced_bal = None. The header BAL's
742                        // index/size are already validated inside execute_block_pipeline,
743                        // and content-equivalence (unread_storage_reads /
744                        // unaccessed_pure_accounts) plus the state_root comparison prove the
745                        // header BAL is the canonical one. The one thing those checks do NOT
746                        // bind is the header commitment itself, so we must compare
747                        // keccak(rlp(header_bal)) against header.block_access_list_hash here;
748                        // otherwise a block with a content-valid BAL but a forged commitment
749                        // is accepted on this path while every spec-conformant client (and
750                        // our own sequential/batch paths) rejects it. This is a pure hash
751                        // compare on a BAL already in memory; the parallel exec optimization
752                        // (no BAL rebuild) is preserved.
753                        //
754                        // Pre-Amsterdam blocks never record a BAL, so both arms are skipped.
755                        if let Some(bal) = &produced_bal {
756                            validate_block_access_list_hash(
757                                &block.header,
758                                &chain_config,
759                                bal,
760                                block.body.transactions.len(),
761                            )?;
762                        } else if let Some(header_bal) = bal
763                            && chain_config.is_amsterdam_activated(block.header.timestamp)
764                            && !header_bal.matches_commitment(block.header.block_access_list_hash)
765                        {
766                            return Err(InvalidBlockError::BlockAccessListHashMismatch.into());
767                        }
768
769                        let exec_end_instant = Instant::now();
770                        Ok((execution_result, produced_bal, exec_end_instant))
771                    })
772                    .map_err(|e| {
773                        ChainError::Custom(format!("Failed to spawn execution thread: {e}"))
774                    })?;
775                let parent_header_ref = &parent_header; // Avoid moving to thread
776                // Merkleizer returns (list, streaming witness or None on BAL path, merkle_start, merkle_end).
777                type MerkleResult = Result<
778                    (
779                        AccountUpdatesList,
780                        Option<Vec<AccountUpdate>>,
781                        Instant,
782                        Instant,
783                    ),
784                    StoreError,
785                >;
786                let merkleize_handle = std::thread::Builder::new()
787                    .name("block_executor_merkleizer".to_string())
788                    .spawn_scoped(s, move || -> MerkleResult {
789                        let merkle_start_instant = Instant::now();
790                        let (account_updates_list, streaming_witness) =
791                            if let Some(prepared) = optimistic_updates {
792                                let list = self.handle_merkleization_bal_from_updates(
793                                    prepared,
794                                    parent_header_ref,
795                                )?;
796                                (list, None)
797                            } else {
798                                self.handle_merkleization(
799                                    rx_for_merkle.expect("rx is Some on non-BAL path"),
800                                    parent_header_ref,
801                                    queue_length_ref,
802                                    max_queue_length_ref,
803                                    collect_witness,
804                                )?
805                            };
806                        let merkle_end_instant = Instant::now();
807                        Ok((
808                            account_updates_list,
809                            streaming_witness,
810                            merkle_start_instant,
811                            merkle_end_instant,
812                        ))
813                    })
814                    .map_err(|e| {
815                        ChainError::Custom(format!("Failed to spawn merkleizer thread: {e}"))
816                    })?;
817                let execution_result = execution_handle.join().unwrap_or_else(|_| {
818                    Err(ChainError::Custom("execution thread panicked".to_string()))
819                });
820                let merkleization_result = merkleize_handle.join().unwrap_or_else(|_| {
821                    Err(StoreError::Custom(
822                        "merkleization thread panicked".to_string(),
823                    ))
824                });
825                #[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
826                let warmer_duration = warm_handle
827                    .map(|handle| {
828                        handle
829                            .join()
830                            .inspect_err(|e| warn!("Warming thread error: {e:?}"))
831                            .ok()
832                            .unwrap_or(Duration::ZERO)
833                    })
834                    .unwrap_or(Duration::ZERO);
835                #[cfg(any(not(feature = "rayon"), feature = "eip-8025"))]
836                let warmer_duration = Duration::ZERO;
837                Ok((execution_result, merkleization_result, warmer_duration))
838            })?;
839        let (account_updates_list, streaming_witness, merkle_start_instant, merkle_end_instant) =
840            merkleization_result?;
841        let (execution_result, produced_bal, exec_end_instant) = execution_result?;
842
843        // Witness collection forces the streaming merkleizer (synthesized
844        // updates are disabled above), so the streaming witness is the only
845        // possible source of accumulated updates.
846        let accumulated_updates = streaming_witness;
847
848        let exec_merkle_end_instant = Instant::now();
849
850        Ok((
851            execution_result,
852            account_updates_list,
853            accumulated_updates,
854            produced_bal,
855            max_queue_length,
856            [
857                start_instant,
858                block_validated_instant,
859                exec_merkle_start,
860                merkle_start_instant,
861                exec_end_instant,
862                merkle_end_instant,
863                exec_merkle_end_instant,
864            ],
865            warmer_duration,
866        ))
867    }
868
869    #[instrument(
870        level = "trace",
871        name = "Trie update",
872        skip_all,
873        fields(namespace = "block_execution")
874    )]
875    fn handle_merkleization(
876        &self,
877        rx: Receiver<Vec<AccountUpdate>>,
878        parent_header: &BlockHeader,
879        queue_length: &AtomicUsize,
880        max_queue_length: &mut usize,
881        collect_witness: bool,
882    ) -> Result<(AccountUpdatesList, Option<Vec<AccountUpdate>>), StoreError> {
883        let parent_state_root = parent_header.state_root;
884
885        // Create 16 worker channels (crossbeam for select! support)
886        let mut workers_tx = Vec::with_capacity(16);
887        let mut workers_rx = Vec::with_capacity(16);
888        for _ in 0..16 {
889            let (tx, rx) = cb::unbounded();
890            workers_tx.push(tx);
891            workers_rx.push(rx);
892        }
893
894        // Shutdown channel: dropping shutdown_tx signals all workers to exit.
895        let (shutdown_tx, shutdown_rx) = cb::bounded::<()>(0);
896        // Done channel: workers report completion status.
897        let (done_tx, done_rx) = cb::unbounded::<Result<(), StoreError>>();
898
899        // Run workers + coordination on the persistent pool.
900        // Workers and watcher are spawned as pool tasks; the coordination logic
901        // (dispatching messages, collecting results) runs on the calling thread
902        // via in_place_scope, so it executes concurrently with the pool tasks.
903        let watcher_error: Arc<std::sync::Mutex<Option<StoreError>>> = Default::default();
904        let result = self.merkle_pool.in_place_scope(|s| {
905            // Spawn 16 unified workers (each gets clone of all 16 senders)
906            for (i, rx) in workers_rx.into_iter().enumerate() {
907                let all_senders = workers_tx.clone();
908                let storage_clone = self.storage.clone();
909                let shutdown_rx = shutdown_rx.clone();
910                let done_tx = done_tx.clone();
911                s.spawn(move |_| {
912                    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
913                        handle_subtrie(
914                            storage_clone,
915                            rx,
916                            parent_state_root,
917                            i as u8,
918                            all_senders,
919                            shutdown_rx,
920                        )
921                    }));
922                    let result = match result {
923                        Ok(r) => r,
924                        Err(_) => Err(StoreError::Custom(format!("shard worker {i} panicked"))),
925                    };
926                    if let Err(cb::SendError(Err(e))) = done_tx.send(result) {
927                        error!("Failed to send worker {i} error to watcher: {e}");
928                    }
929                });
930            }
931            drop(done_tx); // Only workers hold senders
932            drop(shutdown_rx); // Only workers hold receivers
933
934            // Watcher task: drops shutdown_tx on first worker error to signal
935            // all remaining workers, preventing deadlock on gatherer_rx.
936            let watcher_error = watcher_error.clone();
937            s.spawn(move |_| {
938                let _shutdown = shutdown_tx;
939                for result in done_rx {
940                    if let Err(e) = result {
941                        // Store error for the caller, then drop _shutdown to signal workers.
942                        *watcher_error.lock().expect("watcher mutex poisoned") = Some(e);
943                        return;
944                    }
945                }
946            });
947
948            // Coordination runs on the calling thread, concurrently with pool tasks.
949            let mut code_updates: Vec<(H256, Code)> = vec![];
950            let mut hashed_address_cache: FxHashMap<Address, H256> = Default::default();
951            let mut has_storage: FxHashSet<H256> = Default::default();
952
953            let mut accumulator: Option<FxHashMap<Address, AccountUpdate>> =
954                collect_witness.then(FxHashMap::default);
955
956            for updates in rx {
957                let current_length = queue_length.fetch_sub(1, Ordering::Acquire);
958                *max_queue_length = current_length.max(*max_queue_length);
959                // Accumulate updates for witness generation if enabled
960                if let Some(acc) = &mut accumulator {
961                    for update in updates.clone() {
962                        match acc.entry(update.address) {
963                            Entry::Vacant(e) => {
964                                e.insert(update);
965                            }
966                            Entry::Occupied(mut e) => {
967                                e.get_mut().merge(update);
968                            }
969                        }
970                    }
971                }
972
973                for update in updates {
974                    let hashed_address = *hashed_address_cache
975                        .entry(update.address)
976                        .or_insert_with(|| keccak(update.address));
977
978                    let (info, code, storage) = if update.removed {
979                        (Some(Default::default()), None, Default::default())
980                    } else {
981                        (update.info, update.code, update.added_storage)
982                    };
983
984                    // Extract code for dispatcher-local collection
985                    if let Some(ref info) = info
986                        && let Some(code) = code
987                    {
988                        code_updates.push((info.code_hash, code));
989                    }
990
991                    if update.removed || update.removed_storage || !storage.is_empty() {
992                        has_storage.insert(hashed_address);
993                    }
994
995                    let bucket = hashed_address.as_fixed_bytes()[0] >> 4;
996                    workers_tx[bucket as usize]
997                        .send(WorkerRequest::ProcessAccount {
998                            prefix: hashed_address,
999                            info,
1000                            storage,
1001                            removed: update.removed,
1002                            removed_storage: update.removed_storage,
1003                        })
1004                        .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
1005                }
1006            }
1007
1008            // Send FinishRouting — workers self-synchronize via RoutingDone exchange.
1009            for tx in &workers_tx {
1010                tx.send(WorkerRequest::FinishRouting)
1011                    .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
1012            }
1013
1014            // Send MerklizeAccounts for no-storage accounts.
1015            let mut early_batches: [Vec<H256>; 16] = Default::default();
1016            for hashed_account in hashed_address_cache.values() {
1017                if !has_storage.contains(hashed_account) {
1018                    let bucket = hashed_account.as_fixed_bytes()[0] >> 4;
1019                    early_batches[bucket as usize].push(*hashed_account);
1020                }
1021            }
1022            for (i, batch) in early_batches.into_iter().enumerate() {
1023                if !batch.is_empty() {
1024                    workers_tx[i]
1025                        .send(WorkerRequest::MerklizeAccounts { accounts: batch })
1026                        .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
1027                }
1028            }
1029
1030            // Send CollectState immediately — workers defer until collection is done.
1031            let mut storage_updates: Vec<(H256, Vec<TrieNode>)> = Default::default();
1032            let (gatherer_tx, gatherer_rx) = channel();
1033            for tx in &workers_tx {
1034                tx.send(WorkerRequest::CollectState {
1035                    tx: gatherer_tx.clone(),
1036                })
1037                .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
1038            }
1039            drop(gatherer_tx);
1040            drop(workers_tx);
1041
1042            let mut root = BranchNode::default();
1043            let mut state_updates = Vec::new();
1044            for CollectedStateMsg {
1045                index,
1046                subroot,
1047                state_nodes,
1048                storage_nodes,
1049            } in gatherer_rx
1050            {
1051                storage_updates.extend(storage_nodes);
1052                state_updates.extend(state_nodes);
1053                root.choices[index as usize] = subroot.choices[index as usize].clone();
1054            }
1055
1056            let collapsed = self.collapse_root_node(parent_header, None, root)?;
1057            let state_trie_hash = if let Some(root) = collapsed {
1058                let mut root = NodeRef::from(root);
1059                let hash = root.commit(Nibbles::default(), &mut state_updates, &NativeCrypto);
1060                let _ = DROP_SENDER.send(Box::new(root));
1061                hash.finalize(&NativeCrypto)
1062            } else {
1063                state_updates.push((Nibbles::default(), vec![RLP_NULL]));
1064                *EMPTY_TRIE_HASH
1065            };
1066
1067            let accumulated_updates = accumulator.map(|acc| acc.into_values().collect());
1068
1069            Ok((
1070                AccountUpdatesList {
1071                    state_trie_hash,
1072                    state_updates,
1073                    storage_updates,
1074                    code_updates,
1075                },
1076                accumulated_updates,
1077            ))
1078        });
1079
1080        // Surface any worker errors captured by the watcher task.
1081        if let Some(err) = watcher_error.lock().expect("watcher mutex poisoned").take() {
1082            return Err(err);
1083        }
1084
1085        result
1086    }
1087
1088    /// Validation path synthesizes `BalSynthesisItem`s from the input BAL pre-execution and
1089    /// merkleizes optimistically in parallel with EVM execution. Two gates guard the result:
1090    /// (1) the EIP-7928 `block_access_list_hash` commitment check, and
1091    /// (2) the downstream `state_root` comparison against the block header. The parallel
1092    /// path returns `produced_bal = None` (the header BAL drives execution rather than being
1093    /// rebuilt), so gate (1) compares `keccak(rlp(header_bal))` against the header commitment
1094    /// directly in the execution thread; the sequential path runs the same gate against the
1095    /// rebuilt BAL. On any mismatch the optimistic merkle output is discarded via `?` on the
1096    /// execution thread's join result.
1097    #[instrument(
1098        level = "trace",
1099        name = "Trie update (BAL)",
1100        skip_all,
1101        fields(namespace = "block_execution")
1102    )]
1103    fn handle_merkleization_bal_from_updates(
1104        &self,
1105        prepared: FxHashMap<Address, BalSynthesisItem>,
1106        parent_header: &BlockHeader,
1107    ) -> Result<AccountUpdatesList, StoreError> {
1108        const NUM_WORKERS: usize = 16;
1109        let parent_state_root = parent_header.state_root;
1110
1111        // Build code updates and work items with pre-hashed addresses from the
1112        // pre-synthesized map. No Stage A drain needed: the synthesis happened
1113        // pre-scope at the call site.
1114        let mut code_updates: Vec<(H256, Code)> = Vec::new();
1115        let mut accounts: Vec<(H256, BalSynthesisItem)> = Vec::with_capacity(prepared.len());
1116        for (addr, item) in prepared {
1117            let hashed = keccak(addr);
1118            if let Some(ch) = item.code_hash
1119                && let Some(ref code) = item.code
1120            {
1121                code_updates.push((ch, code.clone()));
1122            }
1123            accounts.push((hashed, item));
1124        }
1125
1126        // === Stage B: Parallel per-account storage root computation ===
1127
1128        // Sort by storage weight (descending) for greedy bin packing.
1129        // Every item with real Stage B work MUST have weight >= 1: the greedy
1130        // algorithm does `bin_weights[min] += weight`, so weight-0 items never
1131        // change the bin weight and `min_by_key` keeps returning the same bin,
1132        // piling ALL of them into a single worker.
1133        // Synthesis never sets `removed`/`removed_storage`, so weight is purely
1134        // based on storage slot count.
1135        let mut work_indices: Vec<(usize, usize)> = accounts
1136            .iter()
1137            .enumerate()
1138            .map(|(i, (_, item))| {
1139                let weight = if !item.added_storage.is_empty() {
1140                    1.max(item.added_storage.len())
1141                } else {
1142                    0
1143                };
1144                (i, weight)
1145            })
1146            .collect();
1147        work_indices.sort_unstable_by(|a, b| b.1.cmp(&a.1));
1148
1149        // Greedy bin packing into NUM_WORKERS bins
1150        let mut bins: Vec<Vec<usize>> = (0..NUM_WORKERS).map(|_| Vec::new()).collect();
1151        let mut bin_weights: Vec<usize> = vec![0; NUM_WORKERS];
1152        for (idx, weight) in work_indices {
1153            let min_bin = bin_weights
1154                .iter()
1155                .enumerate()
1156                .min_by_key(|(_, w)| **w)
1157                .expect("bin_weights is non-empty")
1158                .0;
1159            bins[min_bin].push(idx);
1160            bin_weights[min_bin] += weight;
1161        }
1162
1163        // Compute storage roots in parallel
1164        let mut storage_roots: Vec<Option<H256>> = vec![None; accounts.len()];
1165        let mut storage_updates: Vec<(H256, Vec<TrieNode>)> = Vec::new();
1166
1167        std::thread::scope(|s| -> Result<(), StoreError> {
1168            let accounts_ref = &accounts;
1169            let handles: Vec<_> = bins
1170                .into_iter()
1171                .enumerate()
1172                .filter_map(|(worker_id, bin)| {
1173                    if bin.is_empty() {
1174                        return None;
1175                    }
1176                    Some(
1177                        std::thread::Builder::new()
1178                            .name(format!("bal_storage_worker_{worker_id}"))
1179                            .spawn_scoped(
1180                                s,
1181                                move || -> Result<Vec<(usize, H256, Vec<TrieNode>)>, StoreError> {
1182                                    let mut results: Vec<(usize, H256, Vec<TrieNode>)> = Vec::new();
1183                                    // Open one state trie per worker for storage root lookups
1184                                    let state_trie =
1185                                        self.storage.open_state_trie(parent_state_root)?;
1186                                    for idx in bin {
1187                                        let (hashed_address, item) = &accounts_ref[idx];
1188                                        if item.added_storage.is_empty() {
1189                                            continue;
1190                                        }
1191
1192                                        let storage_root = match state_trie
1193                                            .get(hashed_address.as_bytes())?
1194                                        {
1195                                            Some(rlp) => AccountState::decode(&rlp)?.storage_root,
1196                                            None => *EMPTY_TRIE_HASH,
1197                                        };
1198                                        let mut trie = self.storage.open_storage_trie(
1199                                            *hashed_address,
1200                                            parent_state_root,
1201                                            storage_root,
1202                                        )?;
1203
1204                                        // Pre-hash and sort by trie path so per-slot inserts
1205                                        // walk the node arena in order, improving cache locality.
1206                                        let mut hashed_storage: Vec<(H256, U256)> = item
1207                                            .added_storage
1208                                            .iter()
1209                                            .map(|(k, v)| (keccak(k), *v))
1210                                            .collect();
1211                                        hashed_storage.sort_unstable_by(|a, b| a.0.cmp(&b.0));
1212                                        for (hashed_key, value) in &hashed_storage {
1213                                            if value.is_zero() {
1214                                                trie.remove(hashed_key.as_bytes())?;
1215                                            } else {
1216                                                trie.insert(
1217                                                    hashed_key.as_bytes().to_vec(),
1218                                                    value.encode_to_vec(),
1219                                                )?;
1220                                            }
1221                                        }
1222
1223                                        let (root_hash, nodes) =
1224                                            trie.collect_changes_since_last_hash(&NativeCrypto);
1225                                        results.push((idx, root_hash, nodes));
1226                                    }
1227                                    Ok(results)
1228                                },
1229                            )
1230                            .map_err(|e| StoreError::Custom(format!("spawn failed: {e}"))),
1231                    )
1232                })
1233                .collect::<Result<Vec<_>, _>>()?;
1234
1235            for handle in handles {
1236                let results = handle
1237                    .join()
1238                    .map_err(|_| StoreError::Custom("storage worker panicked".to_string()))??;
1239                for (idx, root_hash, nodes) in results {
1240                    storage_roots[idx] = Some(root_hash);
1241                    storage_updates.push((accounts_ref[idx].0, nodes));
1242                }
1243            }
1244            Ok(())
1245        })?;
1246
1247        // === Stage C: State trie update via 16 shard workers ===
1248
1249        // Build per-shard work items
1250        let mut shards: Vec<Vec<BalStateWorkItem>> = (0..NUM_WORKERS).map(|_| Vec::new()).collect();
1251        for (idx, (hashed_address, item)) in accounts.iter().enumerate() {
1252            let bucket = (hashed_address.as_fixed_bytes()[0] >> 4) as usize;
1253            shards[bucket].push(BalStateWorkItem {
1254                hashed_address: *hashed_address,
1255                nonce: item.nonce,
1256                balance: item.balance,
1257                code_hash: item.code_hash,
1258                storage_root: storage_roots[idx],
1259            });
1260        }
1261
1262        let mut root = BranchNode::default();
1263        let mut state_updates = Vec::new();
1264
1265        // All 16 shard threads must run, even for empty shards: each worker
1266        // opens the parent state trie and returns its existing subtree so the
1267        // root can be correctly assembled via `collect_trie`. Skipping unchanged
1268        // shards (unlike Stage B's filter_map) would leave holes in the root.
1269        std::thread::scope(|s| -> Result<(), StoreError> {
1270            let handles: Vec<_> = shards
1271                .into_iter()
1272                .enumerate()
1273                .map(|(index, shard_items)| {
1274                    std::thread::Builder::new()
1275                        .name(format!("bal_state_shard_{index}"))
1276                        .spawn_scoped(
1277                            s,
1278                            move || -> Result<(Box<BranchNode>, Vec<TrieNode>), StoreError> {
1279                                let mut state_trie =
1280                                    self.storage.open_state_trie(parent_state_root)?;
1281
1282                                for item in &shard_items {
1283                                    let path = item.hashed_address.as_bytes();
1284
1285                                    // Load existing account state
1286                                    let mut account_state = match state_trie.get(path)? {
1287                                        Some(rlp) => {
1288                                            let state = AccountState::decode(&rlp)?;
1289                                            // Re-insert to materialize the trie path so
1290                                            // collect_changes_since_last_hash includes this
1291                                            // node in the diff (needed for both updates and
1292                                            // removals via collect_trie).
1293                                            state_trie.insert(path.to_vec(), rlp)?;
1294                                            state
1295                                        }
1296                                        None => AccountState::default(),
1297                                    };
1298
1299                                    if let Some(n) = item.nonce {
1300                                        account_state.nonce = n;
1301                                    }
1302                                    if let Some(b) = item.balance {
1303                                        account_state.balance = b;
1304                                    }
1305                                    if let Some(ch) = item.code_hash {
1306                                        account_state.code_hash = ch;
1307                                    }
1308                                    if let Some(storage_root) = item.storage_root {
1309                                        account_state.storage_root = storage_root;
1310                                    }
1311
1312                                    // EIP-161: remove empty accounts (zero nonce, zero balance,
1313                                    // empty code, empty storage) from the state trie.
1314                                    if account_state != AccountState::default() {
1315                                        state_trie
1316                                            .insert(path.to_vec(), account_state.encode_to_vec())?;
1317                                    } else {
1318                                        state_trie.remove(path)?;
1319                                    }
1320                                }
1321
1322                                collect_trie(index as u8, state_trie)
1323                                    .map_err(|e| StoreError::Custom(format!("{e}")))
1324                            },
1325                        )
1326                        .map_err(|e| StoreError::Custom(format!("spawn failed: {e}")))
1327                })
1328                .collect::<Result<Vec<_>, _>>()?;
1329
1330            for (i, handle) in handles.into_iter().enumerate() {
1331                let (subroot, state_nodes) = handle
1332                    .join()
1333                    .map_err(|_| StoreError::Custom("state shard worker panicked".to_string()))??;
1334                state_updates.extend(state_nodes);
1335                root.choices[i] = subroot.choices[i].clone();
1336            }
1337            Ok(())
1338        })?;
1339
1340        // === Stage D: Finalize root ===
1341        let state_trie_hash =
1342            if let Some(root) = self.collapse_root_node(parent_header, None, root)? {
1343                let mut root = NodeRef::from(root);
1344                let hash = root.commit(Nibbles::default(), &mut state_updates, &NativeCrypto);
1345                let _ = DROP_SENDER.send(Box::new(root));
1346                hash.finalize(&NativeCrypto)
1347            } else {
1348                state_updates.push((Nibbles::default(), vec![RLP_NULL]));
1349                *EMPTY_TRIE_HASH
1350            };
1351
1352        Ok(AccountUpdatesList {
1353            state_trie_hash,
1354            state_updates,
1355            storage_updates,
1356            code_updates,
1357        })
1358    }
1359
1360    fn collapse_root_node(
1361        &self,
1362        parent_header: &BlockHeader,
1363        prefix: Option<H256>,
1364        root: BranchNode,
1365    ) -> Result<Option<Node>, StoreError> {
1366        collapse_root_node(&self.storage, parent_header.state_root, prefix, root)
1367    }
1368
1369    /// Executes a block from a given vm instance an does not clear its state
1370    fn execute_block_from_state(
1371        &self,
1372        parent_header: &BlockHeader,
1373        block: &Block,
1374        chain_config: &ChainConfig,
1375        vm: &mut Evm,
1376    ) -> Result<BlockExecutionResult, ChainError> {
1377        // Validate the block pre-execution
1378        validate_block_pre_execution(block, parent_header, chain_config, ELASTICITY_MULTIPLIER)?;
1379        self.validate_l1_transaction_types(block)?;
1380        let (execution_result, bal) = vm.execute_block(block)?;
1381        // Validate execution went alright
1382        if let Err(e) = validate_gas_used(execution_result.block_gas_used, &block.header) {
1383            ethrex_vm::log_gas_used_mismatch(
1384                &execution_result.tx_gas_breakdowns,
1385                block.header.number,
1386                execution_result.block_gas_used,
1387                block.header.gas_used,
1388            );
1389            return Err(e.into());
1390        }
1391        validate_receipts_root_and_logs_bloom(
1392            &block.header,
1393            &execution_result.receipts,
1394            &NativeCrypto,
1395        )?;
1396        validate_requests_hash(&block.header, chain_config, &execution_result.requests)?;
1397        if let Some(bal) = &bal {
1398            validate_block_access_list_hash(
1399                &block.header,
1400                chain_config,
1401                bal,
1402                block.body.transactions.len(),
1403            )?;
1404        }
1405
1406        Ok(execution_result)
1407    }
1408
1409    pub async fn generate_witness_for_blocks(
1410        &self,
1411        blocks: &[Block],
1412    ) -> Result<ExecutionWitness, ChainError> {
1413        self.generate_witness_for_blocks_with_fee_configs(blocks, None)
1414            .await
1415    }
1416
1417    pub async fn generate_witness_for_blocks_with_fee_configs(
1418        &self,
1419        blocks: &[Block],
1420        fee_configs: Option<&[FeeConfig]>,
1421    ) -> Result<ExecutionWitness, ChainError> {
1422        let first_block_header = &blocks
1423            .first()
1424            .ok_or(ChainError::WitnessGeneration(
1425                "Empty block batch".to_string(),
1426            ))?
1427            .header;
1428
1429        // Get state at previous block
1430        let trie = self
1431            .storage
1432            .state_trie(first_block_header.parent_hash)
1433            .map_err(|_| ChainError::ParentStateNotFound)?
1434            .ok_or(ChainError::ParentStateNotFound)?;
1435        let initial_state_root = trie.hash_no_commit(&NativeCrypto);
1436
1437        let (mut current_trie_witness, mut trie) = TrieLogger::open_trie(trie);
1438
1439        // For each block, a new TrieLogger will be opened, each containing the
1440        // witness accessed during the block execution. We need to accumulate
1441        // all the nodes accessed during the entire batch execution.
1442        let mut accumulated_state_trie_witness = current_trie_witness
1443            .lock()
1444            .map_err(|_| {
1445                ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
1446            })?
1447            .clone();
1448
1449        let mut touched_account_storage_slots = BTreeMap::new();
1450        // This will become the state trie + storage trie
1451        let mut used_trie_nodes = Vec::new();
1452
1453        // Store the root node in case the block is empty and the witness does not record any nodes
1454        let root_node = trie.root_node().map_err(|_| {
1455            ChainError::WitnessGeneration("Failed to get root state node".to_string())
1456        })?;
1457
1458        let mut blockhash_opcode_references = HashMap::new();
1459        let mut codes = Vec::new();
1460
1461        for (i, block) in blocks.iter().enumerate() {
1462            let parent_hash = block.header.parent_hash;
1463            let parent_header = self
1464                .storage
1465                .get_block_header_by_hash(parent_hash)
1466                .map_err(ChainError::StoreError)?
1467                .ok_or(ChainError::ParentNotFound)?;
1468
1469            // This assumes that the user has the necessary state stored already,
1470            // so if the user only has the state previous to the first block, it
1471            // will fail in the second iteration of this for loop. To ensure this,
1472            // doesn't fail, later in this function we store the new state after
1473            // re-execution.
1474            let vm_db: DynVmDatabase =
1475                Box::new(StoreVmDatabase::new(self.storage.clone(), parent_header)?);
1476
1477            let logger = Arc::new(DatabaseLogger::new(Arc::new(vm_db)));
1478
1479            let mut vm = match self.options.r#type {
1480                BlockchainType::L1 => {
1481                    Evm::new_from_db_for_l1(logger.clone(), Arc::new(NativeCrypto))
1482                }
1483                BlockchainType::L2(_) => {
1484                    let l2_config = match fee_configs {
1485                        Some(fee_configs) => {
1486                            fee_configs.get(i).ok_or(ChainError::WitnessGeneration(
1487                                "FeeConfig not found for witness generation".to_string(),
1488                            ))?
1489                        }
1490                        None => Err(ChainError::WitnessGeneration(
1491                            "L2Config not found for witness generation".to_string(),
1492                        ))?,
1493                    };
1494                    Evm::new_from_db_for_l2(logger.clone(), *l2_config, Arc::new(NativeCrypto))
1495                }
1496            };
1497
1498            // Re-execute block with logger
1499            let (execution_result, _bal) = vm.execute_block(block)?;
1500
1501            // Gather account updates
1502            let account_updates = vm.get_state_transitions()?;
1503
1504            let mut state_accessed = logger
1505                .state_accessed
1506                .lock()
1507                .map_err(|_e| {
1508                    ChainError::WitnessGeneration("Failed to execute with witness".to_string())
1509                })?
1510                .clone();
1511
1512            // Deduplicate storage keys while preserving access order
1513            for keys in state_accessed.values_mut() {
1514                let mut seen = HashSet::new();
1515                keys.retain(|k| seen.insert(*k));
1516            }
1517
1518            for (account, acc_keys) in state_accessed.iter() {
1519                let slots: &mut Vec<H256> =
1520                    touched_account_storage_slots.entry(*account).or_default();
1521                slots.extend(acc_keys.iter().copied());
1522            }
1523
1524            // Get the used block hashes from the logger
1525            let logger_block_hashes = logger
1526                .block_hashes_accessed
1527                .lock()
1528                .map_err(|_e| {
1529                    ChainError::WitnessGeneration("Failed to get block hashes".to_string())
1530                })?
1531                .clone();
1532
1533            blockhash_opcode_references.extend(logger_block_hashes);
1534
1535            // Access all the accounts needed for withdrawals
1536            if let Some(withdrawals) = block.body.withdrawals.as_ref() {
1537                for withdrawal in withdrawals {
1538                    trie.get(&hash_address(&withdrawal.address)).map_err(|_e| {
1539                        ChainError::Custom("Failed to access account from trie".to_string())
1540                    })?;
1541                }
1542            }
1543
1544            let mut used_storage_tries = HashMap::new();
1545
1546            // Access all the accounts from the initial trie
1547            // Record all the storage nodes for the initial state
1548            for (account, acc_keys) in state_accessed.iter() {
1549                // Access the account from the state trie to record the nodes used to access it
1550                trie.get(&hash_address(account)).map_err(|_e| {
1551                    ChainError::WitnessGeneration("Failed to access account from trie".to_string())
1552                })?;
1553                // Get storage trie at before updates
1554                if !acc_keys.is_empty()
1555                    && let Ok(Some(storage_trie)) = self.storage.storage_trie(parent_hash, *account)
1556                {
1557                    let (storage_trie_witness, storage_trie) = TrieLogger::open_trie(storage_trie);
1558                    // Access all the keys
1559                    for storage_key in acc_keys {
1560                        let hashed_key = hash_key(storage_key);
1561                        storage_trie.get(&hashed_key).map_err(|_e| {
1562                            ChainError::WitnessGeneration(
1563                                "Failed to access storage key".to_string(),
1564                            )
1565                        })?;
1566                    }
1567                    // Store the tries to reuse when applying account updates
1568                    used_storage_tries.insert(*account, (storage_trie_witness, storage_trie));
1569                }
1570            }
1571
1572            // Store all the accessed evm bytecodes
1573            for code_hash in logger
1574                .code_accessed
1575                .lock()
1576                .map_err(|_e| {
1577                    ChainError::WitnessGeneration("Failed to gather used bytecodes".to_string())
1578                })?
1579                .iter()
1580            {
1581                let code = self
1582                    .storage
1583                    .get_account_code(*code_hash)
1584                    .map_err(|_e| {
1585                        ChainError::WitnessGeneration("Failed to get account code".to_string())
1586                    })?
1587                    .ok_or(ChainError::WitnessGeneration(
1588                        "Failed to get account code".to_string(),
1589                    ))?;
1590                codes.push(code.code().to_vec());
1591            }
1592
1593            // Apply account updates to the trie recording all the necessary nodes to do so
1594            let (storage_tries_after_update, account_updates_list) =
1595                self.storage.apply_account_updates_from_trie_with_witness(
1596                    trie,
1597                    &account_updates,
1598                    used_storage_tries,
1599                )?;
1600
1601            // We cannot ensure that the users of this function have the necessary
1602            // state stored, so in order for it to not assume anything, we update
1603            // the storage with the new state after re-execution
1604            self.store_block(block.clone(), account_updates_list, execution_result)?;
1605
1606            for (address, (witness, _storage_trie)) in storage_tries_after_update {
1607                let mut witness = witness.lock().map_err(|_| {
1608                    ChainError::WitnessGeneration("Failed to lock storage trie witness".to_string())
1609                })?;
1610                let witness = std::mem::take(&mut *witness);
1611                let witness = witness.into_values().collect::<Vec<_>>();
1612                used_trie_nodes.extend_from_slice(&witness);
1613                touched_account_storage_slots.entry(address).or_default();
1614            }
1615
1616            let (new_state_trie_witness, updated_trie) = TrieLogger::open_trie(
1617                self.storage
1618                    .state_trie(block.header.hash())
1619                    .map_err(|_| ChainError::ParentStateNotFound)?
1620                    .ok_or(ChainError::ParentStateNotFound)?,
1621            );
1622
1623            // Use the updated state trie for the next block
1624            trie = updated_trie;
1625
1626            for state_trie_witness in current_trie_witness
1627                .lock()
1628                .map_err(|_| {
1629                    ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
1630                })?
1631                .iter()
1632            {
1633                accumulated_state_trie_witness
1634                    .insert(*state_trie_witness.0, state_trie_witness.1.clone());
1635            }
1636
1637            current_trie_witness = new_state_trie_witness;
1638        }
1639
1640        used_trie_nodes.extend_from_slice(&Vec::from_iter(
1641            accumulated_state_trie_witness.into_values(),
1642        ));
1643
1644        // If the witness is empty at least try to store the root
1645        if used_trie_nodes.is_empty()
1646            && let Some(root) = root_node
1647        {
1648            used_trie_nodes.push((*root).clone());
1649        }
1650
1651        // - We now need necessary block headers, these go from the first block referenced (via BLOCKHASH or just the first block to execute) up to the parent of the last block to execute.
1652        let mut block_headers_bytes = Vec::new();
1653
1654        let first_blockhash_opcode_number = blockhash_opcode_references.keys().min();
1655        let first_needed_block_hash = first_blockhash_opcode_number
1656            .and_then(|n| {
1657                (*n < first_block_header.number.saturating_sub(1))
1658                    .then(|| blockhash_opcode_references.get(n))?
1659                    .copied()
1660            })
1661            .unwrap_or(first_block_header.parent_hash);
1662
1663        // At the beginning this is the header of the last block to execute.
1664        let mut current_header = blocks
1665            .last()
1666            .ok_or_else(|| ChainError::WitnessGeneration("Empty batch".to_string()))?
1667            .header
1668            .clone();
1669
1670        // Headers from latest - 1 until we reach first block header we need.
1671        // We do it this way because we want to fetch headers by hash, not by number
1672        while current_header.hash() != first_needed_block_hash {
1673            let parent_hash = current_header.parent_hash;
1674            let current_number = current_header.number - 1;
1675
1676            current_header = self
1677                .storage
1678                .get_block_header_by_hash(parent_hash)?
1679                .ok_or_else(|| {
1680                    ChainError::WitnessGeneration(format!(
1681                        "Failed to get block {current_number} header"
1682                    ))
1683                })?;
1684
1685            block_headers_bytes.push(current_header.encode_to_vec());
1686        }
1687
1688        // Get initial state trie root and embed the rest of the trie into it
1689        let nodes: BTreeMap<H256, Node> = used_trie_nodes
1690            .into_iter()
1691            .map(|node| {
1692                (
1693                    node.compute_hash(&NativeCrypto).finalize(&NativeCrypto),
1694                    node,
1695                )
1696            })
1697            .collect();
1698        let state_trie_root = if let NodeRef::Node(state_trie_root, _) =
1699            Trie::get_embedded_root(&nodes, initial_state_root)?
1700        {
1701            Some((*state_trie_root).clone())
1702        } else {
1703            None
1704        };
1705
1706        // Get all initial storage trie roots and embed the rest of the trie into it
1707        let state_trie = if let Some(state_trie_root) = &state_trie_root {
1708            Trie::new_temp_with_root(state_trie_root.clone().into())
1709        } else {
1710            Trie::new_temp()
1711        };
1712        let mut storage_trie_roots = BTreeMap::new();
1713        for address in touched_account_storage_slots.keys() {
1714            let hashed_address = hash_address(address);
1715            let hashed_address_h256 = H256::from_slice(&hashed_address);
1716            let Some(encoded_account) = state_trie.get(&hashed_address)? else {
1717                continue; // empty account, doesn't have a storage trie
1718            };
1719            let storage_root_hash = AccountState::decode(&encoded_account)?.storage_root;
1720            if storage_root_hash == *EMPTY_TRIE_HASH {
1721                continue; // empty storage trie
1722            }
1723            if !nodes.contains_key(&storage_root_hash) {
1724                continue; // storage trie isn't relevant to this execution
1725            }
1726            let node = Trie::get_embedded_root(&nodes, storage_root_hash)?;
1727            let NodeRef::Node(node, _) = node else {
1728                return Err(ChainError::Custom(
1729                    "execution witness does not contain non-empty storage trie".to_string(),
1730                ));
1731            };
1732            storage_trie_roots.insert(hashed_address_h256, (*node).clone());
1733        }
1734
1735        Ok(ExecutionWitness {
1736            codes,
1737            block_headers_bytes,
1738            first_block_number: first_block_header.number,
1739            chain_config: self.storage.get_chain_config(),
1740            state_trie_root,
1741            storage_trie_roots,
1742        })
1743    }
1744
1745    pub fn generate_witness_from_account_updates(
1746        &self,
1747        account_updates: Vec<AccountUpdate>,
1748        block: &Block,
1749        parent_header: BlockHeader,
1750        logger: &DatabaseLogger,
1751    ) -> Result<ExecutionWitness, ChainError> {
1752        // Get state at previous block
1753        let trie = self
1754            .storage
1755            .state_trie(parent_header.hash())
1756            .map_err(|_| ChainError::ParentStateNotFound)?
1757            .ok_or(ChainError::ParentStateNotFound)?;
1758        let initial_state_root = trie.hash_no_commit(&NativeCrypto);
1759
1760        let (trie_witness, trie) = TrieLogger::open_trie(trie);
1761
1762        let mut touched_account_storage_slots = BTreeMap::new();
1763        // This will become the state trie + storage trie
1764        let mut used_trie_nodes = Vec::new();
1765
1766        // Store the root node in case the block is empty and the witness does not record any nodes
1767        let root_node = trie.root_node().map_err(|_| {
1768            ChainError::WitnessGeneration("Failed to get root state node".to_string())
1769        })?;
1770
1771        let mut codes = Vec::new();
1772
1773        for account_update in &account_updates {
1774            touched_account_storage_slots.insert(
1775                account_update.address,
1776                account_update
1777                    .added_storage
1778                    .keys()
1779                    .cloned()
1780                    .collect::<Vec<H256>>(),
1781            );
1782        }
1783
1784        // Get the used block hashes from the logger
1785        let blockhash_opcode_references = logger
1786            .block_hashes_accessed
1787            .lock()
1788            .map_err(|_e| ChainError::WitnessGeneration("Failed to get block hashes".to_string()))?
1789            .clone();
1790
1791        // Access all the accounts needed for withdrawals
1792        if let Some(withdrawals) = block.body.withdrawals.as_ref() {
1793            for withdrawal in withdrawals {
1794                trie.get(&hash_address(&withdrawal.address)).map_err(|_e| {
1795                    ChainError::Custom("Failed to access account from trie".to_string())
1796                })?;
1797            }
1798        }
1799
1800        let mut used_storage_tries = HashMap::new();
1801
1802        // Access all the accounts from the initial trie
1803        // Record all the storage nodes for the initial state
1804        for (account, acc_keys) in logger
1805            .state_accessed
1806            .lock()
1807            .map_err(|_e| {
1808                ChainError::WitnessGeneration("Failed to execute with witness".to_string())
1809            })?
1810            .iter()
1811        {
1812            // Access the account from the state trie to record the nodes used to access it
1813            trie.get(&hash_address(account)).map_err(|_e| {
1814                ChainError::WitnessGeneration("Failed to access account from trie".to_string())
1815            })?;
1816            // Get storage trie at before updates
1817            if !acc_keys.is_empty()
1818                && let Ok(Some(storage_trie)) =
1819                    self.storage.storage_trie(parent_header.hash(), *account)
1820            {
1821                let (storage_trie_witness, storage_trie) = TrieLogger::open_trie(storage_trie);
1822                // Access all the keys
1823                for storage_key in acc_keys {
1824                    let hashed_key = hash_key(storage_key);
1825                    storage_trie.get(&hashed_key).map_err(|_e| {
1826                        ChainError::WitnessGeneration("Failed to access storage key".to_string())
1827                    })?;
1828                }
1829                // Store the tries to reuse when applying account updates
1830                used_storage_tries.insert(*account, (storage_trie_witness, storage_trie));
1831            }
1832        }
1833
1834        // Store all the accessed evm bytecodes
1835        for code_hash in logger
1836            .code_accessed
1837            .lock()
1838            .map_err(|_e| {
1839                ChainError::WitnessGeneration("Failed to gather used bytecodes".to_string())
1840            })?
1841            .iter()
1842        {
1843            let code = self
1844                .storage
1845                .get_account_code(*code_hash)
1846                .map_err(|_e| {
1847                    ChainError::WitnessGeneration("Failed to get account code".to_string())
1848                })?
1849                .ok_or(ChainError::WitnessGeneration(
1850                    "Failed to get account code".to_string(),
1851                ))?;
1852            codes.push(code.code().to_vec());
1853        }
1854
1855        // Apply account updates to the trie recording all the necessary nodes to do so
1856        let (storage_tries_after_update, _account_updates_list) =
1857            self.storage.apply_account_updates_from_trie_with_witness(
1858                trie,
1859                &account_updates,
1860                used_storage_tries,
1861            )?;
1862
1863        for (address, (witness, _storage_trie)) in storage_tries_after_update {
1864            let mut witness = witness.lock().map_err(|_| {
1865                ChainError::WitnessGeneration("Failed to lock storage trie witness".to_string())
1866            })?;
1867            let witness = std::mem::take(&mut *witness);
1868            let witness = witness.into_values().collect::<Vec<_>>();
1869            used_trie_nodes.extend_from_slice(&witness);
1870            touched_account_storage_slots.entry(address).or_default();
1871        }
1872
1873        used_trie_nodes.extend_from_slice(&Vec::from_iter(
1874            trie_witness
1875                .lock()
1876                .map_err(|_| {
1877                    ChainError::WitnessGeneration("Failed to lock state trie witness".to_string())
1878                })?
1879                .clone()
1880                .into_values(),
1881        ));
1882
1883        // If the witness is empty at least try to store the root
1884        if used_trie_nodes.is_empty()
1885            && let Some(root) = root_node
1886        {
1887            used_trie_nodes.push((*root).clone());
1888        }
1889
1890        // - We now need necessary block headers, these go from the first block referenced (via BLOCKHASH or just the first block to execute) up to the parent of the last block to execute.
1891        let mut block_headers_bytes = Vec::new();
1892
1893        let first_blockhash_opcode_number = blockhash_opcode_references.keys().min();
1894        let first_needed_block_hash = first_blockhash_opcode_number
1895            .and_then(|n| {
1896                (*n < block.header.number.saturating_sub(1))
1897                    .then(|| blockhash_opcode_references.get(n))?
1898                    .copied()
1899            })
1900            .unwrap_or(block.header.parent_hash);
1901
1902        let mut current_header = block.header.clone();
1903
1904        // Headers from latest - 1 until we reach first block header we need.
1905        // We do it this way because we want to fetch headers by hash, not by number
1906        while current_header.hash() != first_needed_block_hash {
1907            let parent_hash = current_header.parent_hash;
1908            let current_number = current_header.number - 1;
1909
1910            current_header = self
1911                .storage
1912                .get_block_header_by_hash(parent_hash)?
1913                .ok_or_else(|| {
1914                    ChainError::WitnessGeneration(format!(
1915                        "Failed to get block {current_number} header"
1916                    ))
1917                })?;
1918
1919            block_headers_bytes.push(current_header.encode_to_vec());
1920        }
1921
1922        // Get initial state trie root and embed the rest of the trie into it
1923        let nodes: BTreeMap<H256, Node> = used_trie_nodes
1924            .into_iter()
1925            .map(|node| {
1926                (
1927                    node.compute_hash(&NativeCrypto).finalize(&NativeCrypto),
1928                    node,
1929                )
1930            })
1931            .collect();
1932        let state_trie_root = if let NodeRef::Node(state_trie_root, _) =
1933            Trie::get_embedded_root(&nodes, initial_state_root)?
1934        {
1935            Some((*state_trie_root).clone())
1936        } else {
1937            None
1938        };
1939
1940        // Get all initial storage trie roots and embed the rest of the trie into it
1941        let state_trie = if let Some(state_trie_root) = &state_trie_root {
1942            Trie::new_temp_with_root(state_trie_root.clone().into())
1943        } else {
1944            Trie::new_temp()
1945        };
1946        let mut storage_trie_roots = BTreeMap::new();
1947        for address in touched_account_storage_slots.keys() {
1948            let hashed_address = hash_address(address);
1949            let hashed_address_h256 = H256::from_slice(&hashed_address);
1950            let Some(encoded_account) = state_trie.get(&hashed_address)? else {
1951                continue; // empty account, doesn't have a storage trie
1952            };
1953            let storage_root_hash = AccountState::decode(&encoded_account)?.storage_root;
1954            if storage_root_hash == *EMPTY_TRIE_HASH {
1955                continue; // empty storage trie
1956            }
1957            if !nodes.contains_key(&storage_root_hash) {
1958                continue; // storage trie isn't relevant to this execution
1959            }
1960            let node = Trie::get_embedded_root(&nodes, storage_root_hash)?;
1961            let NodeRef::Node(node, _) = node else {
1962                return Err(ChainError::Custom(
1963                    "execution witness does not contain non-empty storage trie".to_string(),
1964                ));
1965            };
1966            storage_trie_roots.insert(hashed_address_h256, (*node).clone());
1967        }
1968
1969        Ok(ExecutionWitness {
1970            codes,
1971            block_headers_bytes,
1972            first_block_number: parent_header.number,
1973            chain_config: self.storage.get_chain_config(),
1974            state_trie_root,
1975            storage_trie_roots,
1976        })
1977    }
1978
1979    #[instrument(
1980        level = "trace",
1981        name = "Block DB update",
1982        skip_all,
1983        fields(namespace = "block_execution")
1984    )]
1985    pub fn store_block(
1986        &self,
1987        block: Block,
1988        account_updates_list: AccountUpdatesList,
1989        execution_result: BlockExecutionResult,
1990    ) -> Result<(), ChainError> {
1991        // Check state root matches the one in block header
1992        validate_state_root(&block.header, account_updates_list.state_trie_hash)?;
1993
1994        let update_batch = UpdateBatch {
1995            account_updates: account_updates_list.state_updates,
1996            storage_updates: account_updates_list.storage_updates,
1997            receipts: vec![(block.hash(), execution_result.receipts)],
1998            blocks: vec![block],
1999            code_updates: account_updates_list.code_updates,
2000            batch_mode: false,
2001        };
2002
2003        self.storage
2004            .store_block_updates(update_batch)
2005            .map_err(|e| e.into())
2006    }
2007
2008    pub fn add_block(&self, block: Block) -> Result<(), ChainError> {
2009        let since = Instant::now();
2010        let (res, updates) = self.execute_block(&block)?;
2011        let executed = Instant::now();
2012
2013        // Apply the account updates over the last block's state and compute the new state root
2014        let account_updates_list = self
2015            .storage
2016            .apply_account_updates_batch(block.header.parent_hash, &updates)?
2017            .ok_or(ChainError::ParentStateNotFound)?;
2018
2019        let (gas_used, gas_limit, block_number, transactions_count) = (
2020            block.header.gas_used,
2021            block.header.gas_limit,
2022            block.header.number,
2023            block.body.transactions.len(),
2024        );
2025
2026        let merkleized = Instant::now();
2027        let result = self.store_block(block, account_updates_list, res);
2028        let stored = Instant::now();
2029
2030        if self.options.perf_logs_enabled {
2031            Self::print_add_block_logs(
2032                gas_used,
2033                gas_limit,
2034                block_number,
2035                transactions_count,
2036                since,
2037                executed,
2038                merkleized,
2039                stored,
2040            );
2041        }
2042        result
2043    }
2044
2045    pub fn add_block_pipeline(
2046        &self,
2047        block: Block,
2048        bal: Option<&BlockAccessList>,
2049    ) -> Result<(), ChainError> {
2050        let (_, _, result) = self.add_block_pipeline_inner(block, bal, false)?;
2051        result
2052    }
2053
2054    /// Same as [`add_block_pipeline`] but also returns the BAL produced during execution.
2055    /// A BAL only exists from Amsterdam onward. On the parallel validation path the BAL
2056    /// comes from the header and drives execution rather than being rebuilt, so the
2057    /// returned value is `None`; the sequential path (block production or
2058    /// `--no-bal-parallel-exec`) rebuilds it and returns `Some(bal)`. Pre-Amsterdam blocks
2059    /// never record a BAL, so the returned value is always `None`.
2060    pub fn add_block_pipeline_bal(
2061        &self,
2062        block: Block,
2063        bal: Option<&BlockAccessList>,
2064    ) -> Result<Option<BlockAccessList>, ChainError> {
2065        let (produced_bal, _, result) = self.add_block_pipeline_inner(block, bal, false)?;
2066        result?;
2067        Ok(produced_bal)
2068    }
2069
2070    /// Same as [`add_block_pipeline`] but returns the execution witness produced
2071    /// while importing the block.
2072    pub fn add_block_pipeline_with_witness(
2073        &self,
2074        block: Block,
2075        bal: Option<&BlockAccessList>,
2076    ) -> Result<ExecutionWitness, ChainError> {
2077        let (_, witness, result) = self.add_block_pipeline_inner(block, bal, true)?;
2078        result?;
2079        witness.ok_or_else(|| {
2080            ChainError::WitnessGeneration(
2081                "forced witness collection completed without producing a witness".to_string(),
2082            )
2083        })
2084    }
2085
2086    /// Runs the full block pipeline (execute + merkleize + store).
2087    ///
2088    /// Returns a two-level Result:
2089    /// - Outer `Err`: pipeline couldn't start (e.g. parent header not found).
2090    /// - Inner `Result`: block storage outcome. The produced BAL is returned
2091    ///   even when storage fails, so callers like `add_block_pipeline_bal` can
2092    ///   retrieve it. Note: if *execution* itself fails (outer `Result`), the
2093    ///   BAL is not available.
2094    fn add_block_pipeline_inner(
2095        &self,
2096        block: Block,
2097        bal: Option<&BlockAccessList>,
2098        force_witness: bool,
2099    ) -> Result<AddBlockPipelineInnerResult, ChainError> {
2100        // Validate if it can be the new head and find the parent
2101        let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
2102            // If the parent is not present, we store it as pending.
2103            self.storage.add_pending_block(block)?;
2104            return Err(ChainError::ParentNotFound);
2105        };
2106
2107        let should_store_witness = self.options.precompute_witnesses && self.is_synced();
2108        let collect_witness = should_store_witness || force_witness;
2109
2110        let (mut vm, logger) = if collect_witness {
2111            // If witness pre-generation is enabled, we wrap the db with a logger
2112            // to track state access (block hashes, storage keys, codes) during execution
2113            // avoiding the need to re-execute the block later.
2114            let vm_db: DynVmDatabase = Box::new(StoreVmDatabase::new(
2115                self.storage.clone(),
2116                parent_header.clone(),
2117            )?);
2118
2119            let logger = Arc::new(DatabaseLogger::new(Arc::new(vm_db)));
2120
2121            let vm = match self.options.r#type.clone() {
2122                BlockchainType::L1 => {
2123                    Evm::new_from_db_for_l1(logger.clone(), Arc::new(NativeCrypto))
2124                }
2125                BlockchainType::L2(l2_config) => Evm::new_from_db_for_l2(
2126                    logger.clone(),
2127                    *l2_config.fee_config.read().map_err(|_| {
2128                        EvmError::Custom("Fee config lock was poisoned".to_string())
2129                    })?,
2130                    Arc::new(NativeCrypto),
2131                ),
2132            };
2133            (vm, Some(logger))
2134        } else {
2135            let vm_db = StoreVmDatabase::new(self.storage.clone(), parent_header.clone())?;
2136            let vm = self.new_evm(vm_db)?;
2137            (vm, None)
2138        };
2139
2140        let (
2141            res,
2142            account_updates_list,
2143            accumulated_updates,
2144            produced_bal,
2145            merkle_queue_length,
2146            instants,
2147            warmer_duration,
2148        ) = { self.execute_block_pipeline(&block, &parent_header, &mut vm, bal, collect_witness)? };
2149
2150        let (gas_used, gas_limit, block_number, transactions_count) = (
2151            block.header.gas_used,
2152            block.header.gas_limit,
2153            block.header.number,
2154            block.body.transactions.len(),
2155        );
2156        let block_hash = block.hash();
2157
2158        let mut witness = None;
2159        if let Some(logger) = logger
2160            && let Some(account_updates) = accumulated_updates
2161        {
2162            let block_hash = block.hash();
2163            let generated_witness = self.generate_witness_from_account_updates(
2164                account_updates,
2165                &block,
2166                parent_header,
2167                &logger,
2168            )?;
2169            match (should_store_witness, force_witness) {
2170                (true, true) => {
2171                    witness = Some(generated_witness.clone());
2172                    self.storage
2173                        .store_witness(block_hash, block_number, generated_witness)?;
2174                }
2175                (true, false) => {
2176                    self.storage
2177                        .store_witness(block_hash, block_number, generated_witness)?;
2178                }
2179                (false, true) => {
2180                    witness = Some(generated_witness);
2181                }
2182                (false, false) => {}
2183            }
2184        };
2185
2186        // Store the block's BAL so peers can request it later without re-execution.
2187        // On the parallel Amsterdam validation path the BAL is supplied via the header
2188        // and `produced_bal` is None, so fall back to the validated incoming `bal`.
2189        // Pre-Amsterdam blocks have no BAL on either source, so nothing is stored.
2190        if let Some(bal) = produced_bal.as_ref().or(bal)
2191            && let Err(err) = self.storage.store_block_access_list(block_hash, bal)
2192        {
2193            warn!("Failed to store block access list for block {block_hash}: {err}");
2194        }
2195
2196        let result = self.store_block(block, account_updates_list, res);
2197
2198        let stored = Instant::now();
2199
2200        let instants = std::array::from_fn(move |i| {
2201            if i < instants.len() {
2202                instants[i]
2203            } else {
2204                stored
2205            }
2206        });
2207
2208        if self.options.perf_logs_enabled {
2209            Self::print_add_block_pipeline_logs(
2210                gas_used,
2211                gas_limit,
2212                block_number,
2213                block_hash,
2214                transactions_count,
2215                merkle_queue_length,
2216                warmer_duration,
2217                instants,
2218            );
2219        }
2220
2221        metrics!(if let Some(bal_ref) = produced_bal.as_ref().or(bal) {
2222            let account_count = bal_ref.accounts().len() as u64;
2223            let slot_count = bal_ref.item_count().saturating_sub(account_count);
2224            let size_bytes = bal_ref.length() as f64;
2225            METRICS_BAL.blocks_total.inc();
2226            METRICS_BAL.size_bytes.set(size_bytes);
2227            METRICS_BAL.size_bytes_histogram.observe(size_bytes);
2228            METRICS_BAL.account_count.set(account_count as i64);
2229            METRICS_BAL.slot_count.set(slot_count as i64);
2230        });
2231
2232        Ok((produced_bal, witness, result))
2233    }
2234
2235    #[allow(clippy::too_many_arguments)]
2236    fn print_add_block_logs(
2237        gas_used: u64,
2238        gas_limit: u64,
2239        block_number: u64,
2240        transactions_count: usize,
2241        since: Instant,
2242        executed: Instant,
2243        merkleized: Instant,
2244        stored: Instant,
2245    ) {
2246        let interval = stored.duration_since(since).as_millis() as f64;
2247        if interval != 0f64 {
2248            let as_gigas = gas_used as f64 / 10_f64.powf(9_f64);
2249            let throughput = as_gigas / interval * 1000_f64;
2250
2251            metrics!(
2252                METRICS_BLOCKS.set_block_number(block_number);
2253                METRICS_BLOCKS.set_latest_gas_used(gas_used as f64);
2254                METRICS_BLOCKS.set_latest_block_gas_limit(gas_limit as f64);
2255                METRICS_BLOCKS.set_latest_gigagas(throughput);
2256                METRICS_BLOCKS.set_execution_ms(executed.duration_since(since).as_secs_f64() * 1000.0);
2257                METRICS_BLOCKS.set_merkle_ms(merkleized.duration_since(executed).as_secs_f64() * 1000.0);
2258                METRICS_BLOCKS.set_store_ms(stored.duration_since(merkleized).as_secs_f64() * 1000.0);
2259                METRICS_BLOCKS.set_transaction_count(transactions_count as i64);
2260            );
2261
2262            let base_log = format!(
2263                "[METRIC] BLOCK EXECUTION THROUGHPUT ({}): {:.3} Ggas/s TIME SPENT: {:.0} ms. Gas Used: {:.3} ({:.0}%), #Txs: {}.",
2264                block_number,
2265                throughput,
2266                interval,
2267                as_gigas,
2268                (gas_used as f64 / gas_limit as f64) * 100.0,
2269                transactions_count
2270            );
2271
2272            fn percentage(init: Instant, end: Instant, total: f64) -> f64 {
2273                (end.duration_since(init).as_millis() as f64 / total * 100.0).round()
2274            }
2275            let extra_log = if as_gigas > 0.0 {
2276                format!(
2277                    " exec: {}% merkle: {}% store: {}%",
2278                    percentage(since, executed, interval),
2279                    percentage(executed, merkleized, interval),
2280                    percentage(merkleized, stored, interval)
2281                )
2282            } else {
2283                "".to_string()
2284            };
2285            info!("{}{}", base_log, extra_log);
2286        }
2287    }
2288
2289    #[allow(clippy::too_many_arguments)]
2290    fn print_add_block_pipeline_logs(
2291        gas_used: u64,
2292        gas_limit: u64,
2293        block_number: u64,
2294        block_hash: H256,
2295        transactions_count: usize,
2296        merkle_queue_length: usize,
2297        warmer_duration: Duration,
2298        [
2299            start_instant,
2300            block_validated_instant,
2301            exec_merkle_start,
2302            merkle_start_instant,
2303            exec_end_instant,
2304            merkle_end_instant,
2305            exec_merkle_end_instant,
2306            stored_instant,
2307        ]: [Instant; 8],
2308    ) {
2309        let total_ms = stored_instant.duration_since(start_instant).as_secs_f64() * 1000.0;
2310        if total_ms == 0.0 {
2311            return;
2312        }
2313
2314        let as_mgas = gas_used as f64 / 1e6;
2315        let throughput = (gas_used as f64 / 1e9) / (total_ms / 1000.0);
2316
2317        // Calculate phase durations in ms
2318        let validate_ms = block_validated_instant
2319            .duration_since(start_instant)
2320            .as_secs_f64()
2321            * 1000.0;
2322        let exec_ms = exec_end_instant
2323            .duration_since(exec_merkle_start)
2324            .as_secs_f64()
2325            * 1000.0;
2326        let store_ms = stored_instant
2327            .duration_since(exec_merkle_end_instant)
2328            .as_secs_f64()
2329            * 1000.0;
2330        let warmer_ms = warmer_duration.as_secs_f64() * 1000.0;
2331
2332        // Calculate merkle breakdown
2333        // merkle_end_instant marks when merkle thread finished (may be before or after exec)
2334        // exec_merkle_end_instant marks when both exec and merkle are done
2335        let _merkle_total_ms = exec_merkle_end_instant
2336            .duration_since(exec_merkle_start)
2337            .as_secs_f64()
2338            * 1000.0;
2339
2340        // Concurrent merkle time: the portion of merkle that ran while exec was running
2341        let merkle_concurrent_ms = (merkle_end_instant
2342            .duration_since(exec_merkle_start)
2343            .as_secs_f64()
2344            * 1000.0)
2345            .min(exec_ms);
2346
2347        // Drain time: time spent finishing merkle after exec completed
2348        let merkle_drain_ms = exec_merkle_end_instant
2349            .saturating_duration_since(exec_end_instant)
2350            .as_secs_f64()
2351            * 1000.0;
2352
2353        // Overlap percentage: how much of merkle work was done concurrently
2354        let actual_merkle_ms = merkle_concurrent_ms + merkle_drain_ms;
2355        let overlap_pct = if actual_merkle_ms > 0.0 {
2356            (merkle_concurrent_ms / actual_merkle_ms) * 100.0
2357        } else {
2358            0.0
2359        };
2360
2361        // Calculate warmer effectiveness (positive = finished early)
2362        let warmer_early_ms = exec_ms - warmer_ms;
2363
2364        // Determine bottleneck (effective time for each phase)
2365        // For merkle, only count the drain time (concurrent time overlaps with exec)
2366        let phases = [
2367            ("validate", validate_ms),
2368            ("exec", exec_ms),
2369            ("merkle", merkle_drain_ms),
2370            ("store", store_ms),
2371        ];
2372        let bottleneck = phases
2373            .iter()
2374            .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
2375            .map(|(name, _)| *name)
2376            .unwrap_or("exec");
2377
2378        // Helper for percentage
2379        let pct = |ms: f64| (ms / total_ms * 100.0).round() as u64;
2380
2381        // Format output
2382        let header = format!(
2383            "[METRIC] BLOCK {} {:#x} | {:.3} Ggas/s | {:.2} ms | {} txs | {:.0} Mgas ({}%)",
2384            block_number,
2385            block_hash,
2386            throughput,
2387            total_ms,
2388            transactions_count,
2389            as_mgas,
2390            (gas_used as f64 / gas_limit as f64 * 100.0).round() as u64
2391        );
2392
2393        let bottleneck_marker = |name: &str| {
2394            if name == bottleneck {
2395                " << BOTTLENECK"
2396            } else {
2397                ""
2398            }
2399        };
2400
2401        let warmer_relation = if warmer_early_ms >= 0.0 {
2402            "before exec"
2403        } else {
2404            "after exec"
2405        };
2406
2407        let merkle_start_delay_ms = merkle_start_instant
2408            .duration_since(exec_merkle_start)
2409            .as_secs_f64()
2410            * 1000.0;
2411
2412        info!("{}", header);
2413        info!(
2414            "  |- validate: {:>7.2} ms  ({:>2}%){}",
2415            validate_ms,
2416            pct(validate_ms),
2417            bottleneck_marker("validate")
2418        );
2419        info!(
2420            "  |- exec:     {:>7.2} ms  ({:>2}%){}",
2421            exec_ms,
2422            pct(exec_ms),
2423            bottleneck_marker("exec")
2424        );
2425        info!(
2426            "  |- merkle:   {:>7.2} ms  ({:>2}%){}  [concurrent: {:.2} ms, drain: {:.2} ms, overlap: {:.0}%, queue: {}, start_delay: {:.2} ms]",
2427            merkle_drain_ms,
2428            pct(merkle_drain_ms),
2429            bottleneck_marker("merkle"),
2430            merkle_concurrent_ms,
2431            merkle_drain_ms,
2432            overlap_pct,
2433            merkle_queue_length,
2434            merkle_start_delay_ms,
2435        );
2436        info!(
2437            "  |- store:    {:>7.2} ms  ({:>2}%){}",
2438            store_ms,
2439            pct(store_ms),
2440            bottleneck_marker("store")
2441        );
2442        info!(
2443            "  `- warmer:   {:>7.2} ms         [finished: {:.2} ms {}]",
2444            warmer_ms,
2445            warmer_early_ms.abs(),
2446            warmer_relation,
2447        );
2448
2449        // Set prometheus metrics
2450        metrics!(
2451            METRICS_BLOCKS.set_block_number(block_number);
2452            METRICS_BLOCKS.set_latest_gas_used(gas_used as f64);
2453            METRICS_BLOCKS.set_latest_block_gas_limit(gas_limit as f64);
2454            METRICS_BLOCKS.set_latest_gigagas(throughput);
2455            METRICS_BLOCKS.set_transaction_count(transactions_count as i64);
2456            METRICS_BLOCKS.set_validate_ms(validate_ms);
2457            METRICS_BLOCKS.set_execution_ms(exec_ms);
2458            METRICS_BLOCKS.set_merkle_concurrent_ms(merkle_concurrent_ms);
2459            METRICS_BLOCKS.set_merkle_drain_ms(merkle_drain_ms);
2460            METRICS_BLOCKS.set_merkle_ms(_merkle_total_ms);
2461            METRICS_BLOCKS.set_merkle_overlap_pct(overlap_pct);
2462            METRICS_BLOCKS.set_store_ms(store_ms);
2463            METRICS_BLOCKS.set_warmer_ms(warmer_ms);
2464            METRICS_BLOCKS.set_warmer_early_ms(warmer_early_ms);
2465        );
2466    }
2467
2468    /// Adds multiple blocks in a batch.
2469    ///
2470    /// If an error occurs, returns a tuple containing:
2471    /// - The error type ([`ChainError`]).
2472    /// - [`BatchProcessingFailure`] (if the error was caused by block processing).
2473    ///
2474    /// Note: only the last block's state trie is stored in the db
2475    /// `bals` holds the per-block Block Access Lists fetched during sync, aligned
2476    /// by index with `blocks`. Pass an empty slice when no BALs are available
2477    /// (e.g. block import from RLP); the persistence step then stores none. Only
2478    /// BALs matching their block's header commitment are persisted.
2479    pub async fn add_blocks_in_batch(
2480        &self,
2481        blocks: Vec<Block>,
2482        bals: &[Option<BlockAccessList>],
2483        cancellation_token: CancellationToken,
2484    ) -> Result<(), (ChainError, Option<BatchBlockProcessingFailure>)> {
2485        let mut last_valid_hash = H256::default();
2486
2487        // `bals` is either empty (no BALs available) or index-aligned with `blocks`.
2488        // Guard the contract so a wrong-length slice can't silently drop/ignore BALs
2489        // via the `zip` in the persistence step below.
2490        debug_assert!(
2491            bals.is_empty() || bals.len() == blocks.len(),
2492            "bals must be empty or aligned with blocks (bals={}, blocks={})",
2493            bals.len(),
2494            blocks.len(),
2495        );
2496
2497        let Some(first_block_header) = blocks.first().map(|e| e.header.clone()) else {
2498            return Err((ChainError::Custom("First block not found".into()), None));
2499        };
2500
2501        let chain_config: ChainConfig = self.storage.get_chain_config();
2502
2503        // Cache block hashes for the full batch so we can access them during
2504        // execution without having to store the blocks beforehand.
2505        let mut block_hash_cache: BTreeMap<BlockNumber, BlockHash> =
2506            blocks.iter().map(|b| (b.header.number, b.hash())).collect();
2507
2508        let parent_header = self
2509            .storage
2510            .get_block_header_by_hash(first_block_header.parent_hash)
2511            .map_err(|e| (ChainError::StoreError(e), None))?
2512            .ok_or((ChainError::ParentNotFound, None))?;
2513
2514        // Walk the parent chain to cache the last 256 block hashes so that
2515        // BLOCKHASH can resolve references to blocks from previous batches
2516        // (they may not be canonical yet during import).
2517        block_hash_cache
2518            .entry(parent_header.number)
2519            .or_insert_with(|| parent_header.hash());
2520        let mut hash = parent_header.parent_hash;
2521        let mut number = parent_header.number.saturating_sub(1);
2522        let lookback = first_block_header.number.saturating_sub(256);
2523        while number > lookback {
2524            block_hash_cache.entry(number).or_insert(hash);
2525            match self.storage.get_block_header_by_hash(hash) {
2526                Ok(Some(header)) => {
2527                    hash = header.parent_hash;
2528                    number = number.saturating_sub(1);
2529                }
2530                Ok(None) => break,
2531                Err(e) => {
2532                    warn!("Failed to fetch block header by hash during BLOCKHASH cache walk: {e}");
2533                    break;
2534                }
2535            }
2536        }
2537        let vm_db = StoreVmDatabase::new_with_block_hash_cache(
2538            self.storage.clone(),
2539            parent_header,
2540            block_hash_cache,
2541        )
2542        .map_err(|e| (ChainError::EvmError(e), None))?;
2543        let mut vm = self.new_evm(vm_db).map_err(|e| (e.into(), None))?;
2544
2545        let blocks_len = blocks.len();
2546        let mut all_receipts: Vec<(BlockHash, Vec<Receipt>)> = Vec::with_capacity(blocks_len);
2547        let mut total_gas_used = 0;
2548        let mut transactions_count = 0;
2549
2550        let interval = Instant::now();
2551        for (i, block) in blocks.iter().enumerate() {
2552            if cancellation_token.is_cancelled() {
2553                info!("Received shutdown signal, aborting");
2554                return Err((ChainError::Custom(String::from("shutdown signal")), None));
2555            }
2556            // for the first block, we need to query the store
2557            let parent_header = if i == 0 {
2558                find_parent_header(&block.header, &self.storage).map_err(|err| {
2559                    (
2560                        err,
2561                        Some(BatchBlockProcessingFailure {
2562                            failed_block_hash: block.hash(),
2563                            last_valid_hash,
2564                        }),
2565                    )
2566                })?
2567            } else {
2568                // for the subsequent ones, the parent is the previous block
2569                blocks[i - 1].header.clone()
2570            };
2571
2572            let BlockExecutionResult { receipts, .. } = self
2573                .execute_block_from_state(&parent_header, block, &chain_config, &mut vm)
2574                .map_err(|err| {
2575                    (
2576                        err,
2577                        Some(BatchBlockProcessingFailure {
2578                            failed_block_hash: block.hash(),
2579                            last_valid_hash,
2580                        }),
2581                    )
2582                })?;
2583            debug!("Executed block with hash {}", block.hash());
2584            last_valid_hash = block.hash();
2585            total_gas_used += block.header.gas_used;
2586            transactions_count += block.body.transactions.len();
2587            all_receipts.push((block.hash(), receipts));
2588
2589            // Conversion is safe because EXECUTE_BATCH_SIZE=1024
2590            log_batch_progress(blocks_len as u32, i as u32);
2591            tokio::task::yield_now().await;
2592        }
2593
2594        let account_updates = vm
2595            .get_state_transitions()
2596            .map_err(|err| (ChainError::EvmError(err), None))?;
2597
2598        let last_block = blocks
2599            .last()
2600            .ok_or_else(|| (ChainError::Custom("Last block not found".into()), None))?;
2601
2602        let last_block_number = last_block.header.number;
2603        let last_block_gas_limit = last_block.header.gas_limit;
2604
2605        // Apply the account updates over all blocks and compute the new state root
2606        let account_updates_list = self
2607            .storage
2608            .apply_account_updates_batch(first_block_header.parent_hash, &account_updates)
2609            .map_err(|e| (e.into(), None))?
2610            .ok_or((ChainError::ParentStateNotFound, None))?;
2611
2612        let new_state_root = account_updates_list.state_trie_hash;
2613        let state_updates = account_updates_list.state_updates;
2614        let accounts_updates = account_updates_list.storage_updates;
2615        let code_updates = account_updates_list.code_updates;
2616
2617        // Check state root matches the one in block header
2618        validate_state_root(&last_block.header, new_state_root).map_err(|e| (e, None))?;
2619
2620        // EIP-8159: persist the per-block BAL fetched during sync so peers can
2621        // later request it over eth/71 without re-execution (the batch path
2622        // doesn't record BALs, so without this they'd fall back to regenerating
2623        // against possibly-pruned parent state). Only persist a BAL that matches
2624        // its header commitment; a wrong/empty peer BAL is dropped here, and the
2625        // serve path guards again. Captured before `blocks` is moved below.
2626        let bals_to_store: Vec<(BlockHash, BlockAccessList)> = blocks
2627            .iter()
2628            .zip(bals.iter())
2629            .filter_map(|(block, bal)| {
2630                let bal = bal.as_ref()?;
2631                bal.matches_commitment(block.header.block_access_list_hash)
2632                    .then(|| (block.hash(), bal.clone()))
2633            })
2634            .collect();
2635
2636        let update_batch = UpdateBatch {
2637            account_updates: state_updates,
2638            storage_updates: accounts_updates,
2639            blocks,
2640            receipts: all_receipts,
2641            code_updates,
2642            batch_mode: true,
2643        };
2644
2645        self.storage
2646            .store_block_updates(update_batch)
2647            .map_err(|e| (e.into(), None))?;
2648
2649        for (block_hash, bal) in &bals_to_store {
2650            if let Err(err) = self.storage.store_block_access_list(*block_hash, bal) {
2651                warn!(
2652                    "Failed to persist block access list for {block_hash} during batch sync: {err}"
2653                );
2654            }
2655        }
2656
2657        let elapsed_seconds = interval.elapsed().as_secs_f64();
2658        let throughput = if elapsed_seconds > 0.0 && total_gas_used != 0 {
2659            let as_gigas = (total_gas_used as f64) / 1e9;
2660            as_gigas / elapsed_seconds
2661        } else {
2662            0.0
2663        };
2664
2665        metrics!(
2666            METRICS_BLOCKS.set_block_number(last_block_number);
2667            METRICS_BLOCKS.set_latest_block_gas_limit(last_block_gas_limit as f64);
2668            // Set the latest gas used as the average gas used per block in the batch
2669            METRICS_BLOCKS.set_latest_gas_used(total_gas_used as f64 / blocks_len as f64);
2670            METRICS_BLOCKS.set_latest_gigagas(throughput);
2671        );
2672
2673        if self.options.perf_logs_enabled {
2674            info!(
2675                "[METRICS] Executed and stored: Range: {}, Last block num: {}, Last block gas limit: {}, Total transactions: {}, Total Gas: {}, Throughput: {} Gigagas/s",
2676                blocks_len,
2677                last_block_number,
2678                last_block_gas_limit,
2679                transactions_count,
2680                total_gas_used,
2681                throughput
2682            );
2683        }
2684
2685        Ok(())
2686    }
2687
2688    /// Add a blob transaction and its blobs bundle to the mempool checking that the transaction is valid
2689    #[cfg(feature = "c-kzg")]
2690    pub async fn add_blob_transaction_to_pool(
2691        &self,
2692        transaction: EIP4844Transaction,
2693        blobs_bundle: BlobsBundle,
2694    ) -> Result<H256, MempoolError> {
2695        let fork = self.current_fork().await?;
2696
2697        let transaction = Transaction::EIP4844Transaction(transaction);
2698        let hash = transaction.hash();
2699        if self.mempool.contains_tx(hash)? {
2700            return Ok(hash);
2701        }
2702
2703        // Wire-wrapper size cap for blob txs. Matches geth `txMaxSize = 1 MiB`
2704        // (blobpool) and nethermind `MaxBlobTxSize`, which both bound the
2705        // wire-wrapper form including the sidecar. ethrex stores the core tx
2706        // and the bundle in separate structs, so sum the two encoded sizes
2707        // (the ±few bytes of outer list framing are rounding error at this
2708        // scale).
2709        let wrapper_len = transaction.encode_canonical_len() + blobs_bundle.length();
2710        if wrapper_len > MAX_BLOB_TX_SIZE {
2711            return Err(MempoolError::TxSizeExceeded {
2712                actual: wrapper_len,
2713                limit: MAX_BLOB_TX_SIZE,
2714            });
2715        }
2716
2717        // Validate blobs bundle after checking if it's already added.
2718        if let Transaction::EIP4844Transaction(transaction) = &transaction {
2719            blobs_bundle.validate(transaction, fork)?;
2720        }
2721
2722        let sender = transaction.sender(&NativeCrypto)?;
2723
2724        // Validate transaction
2725        if let Some(tx_to_replace) = self.validate_transaction(&transaction, sender).await? {
2726            self.remove_transaction_from_pool(&tx_to_replace)?;
2727        }
2728
2729        // Add blobs bundle before the transaction so that when add_transaction
2730        // notifies payload builders the blob data is already available.
2731        self.mempool.add_blobs_bundle(hash, blobs_bundle)?;
2732        self.mempool
2733            .add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?;
2734        Ok(hash)
2735    }
2736
2737    /// Add a transaction to the mempool checking that the transaction is valid
2738    pub async fn add_transaction_to_pool(
2739        &self,
2740        transaction: Transaction,
2741    ) -> Result<H256, MempoolError> {
2742        // Blob transactions should be submitted via add_blob_transaction along with the corresponding blobs bundle
2743        if matches!(transaction, Transaction::EIP4844Transaction(_)) {
2744            return Err(MempoolError::BlobTxNoBlobsBundle);
2745        }
2746        // Wire size cap: run before sender recovery so oversized txs don't
2747        // force secp256k1 work. Matches geth's `txMaxSize` admission order
2748        // (size-checked at `ValidateTransaction` entry, well before any
2749        // crypto). The same check sits in `validate_transaction` so direct
2750        // callers (tests, L2 paths) keep the guarantee.
2751        let encoded_len = transaction.encode_canonical_len();
2752        if encoded_len > MAX_TX_SIZE {
2753            return Err(MempoolError::TxSizeExceeded {
2754                actual: encoded_len,
2755                limit: MAX_TX_SIZE,
2756            });
2757        }
2758        let hash = transaction.hash();
2759        if self.mempool.contains_tx(hash)? {
2760            return Ok(hash);
2761        }
2762        let sender = transaction.sender(&NativeCrypto)?;
2763        // Validate transaction
2764        if let Some(tx_to_replace) = self.validate_transaction(&transaction, sender).await? {
2765            self.remove_transaction_from_pool(&tx_to_replace)?;
2766        }
2767
2768        // Add transaction to storage
2769        self.mempool
2770            .add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?;
2771
2772        Ok(hash)
2773    }
2774
2775    /// Remove a transaction from the mempool
2776    pub fn remove_transaction_from_pool(&self, hash: &H256) -> Result<(), StoreError> {
2777        self.mempool.remove_transaction(hash)
2778    }
2779
2780    /// Remove all transactions in the executed block from the pool (if we have them)
2781    pub fn remove_block_transactions_from_pool(&self, block: &Block) -> Result<(), StoreError> {
2782        for tx in &block.body.transactions {
2783            self.mempool.remove_transaction(&tx.hash())?;
2784        }
2785        Ok(())
2786    }
2787
2788    /// Drop blob txs with nonce below the sender's on-chain nonce at `head_hash`.
2789    /// Per-block pruning only covers the head block, so stale blob txs from
2790    /// non-head canonical blocks leak in and are never evicted (value/nonce
2791    /// eviction pins low nonces). Resetting against on-chain nonces clears them.
2792    pub fn remove_stale_blob_txs(&self, head_hash: BlockHash) -> Result<(), StoreError> {
2793        let blob_txs = self.mempool.blob_txs()?;
2794        if blob_txs.is_empty() {
2795            return Ok(());
2796        }
2797        // Cache on-chain nonce per sender to avoid repeated state reads.
2798        let mut nonce_by_sender: HashMap<Address, u64> = HashMap::new();
2799        for (hash, sender, tx_nonce) in blob_txs {
2800            let state_nonce = match nonce_by_sender.entry(sender) {
2801                Entry::Occupied(e) => *e.get(),
2802                Entry::Vacant(e) => {
2803                    let nonce = self
2804                        .storage
2805                        .get_account_info_by_hash(head_hash, sender)?
2806                        .map(|info| info.nonce)
2807                        .unwrap_or(0);
2808                    *e.insert(nonce)
2809                }
2810            };
2811            if tx_nonce < state_nonce {
2812                self.mempool.remove_transaction(&hash)?;
2813            }
2814        }
2815        Ok(())
2816    }
2817
2818    /*
2819
2820    SOME VALIDATIONS THAT WE COULD INCLUDE
2821    Stateless validations
2822    1. This transaction is valid on current mempool
2823        -> Depends on mempool transaction filtering logic
2824    2. Ensure the maxPriorityFeePerGas is high enough to cover the requirement of the calling pool (the minimum to be included in)
2825        -> Depends on mempool transaction filtering logic
2826    3. Transaction's encoded size is smaller than maximum allowed
2827        -> I think that this is not in the spec, but it may be a good idea
2828    4. Make sure the transaction is signed properly
2829    5. Ensure a Blob Transaction comes with its sidecar (Done! - All blob validations have been moved to `common/types/blobs_bundle.rs`):
2830      1. Validate number of BlobHashes is positive (Done!)
2831      2. Validate number of BlobHashes is less than the maximum allowed per block,
2832         which may be computed as `maxBlobGasPerBlock / blobTxBlobGasPerBlob`
2833      3. Ensure number of BlobHashes is equal to:
2834        - The number of blobs (Done!)
2835        - The number of commitments (Done!)
2836        - The number of proofs (Done!)
2837      4. Validate that the hashes matches with the commitments, performing a `kzg4844` hash. (Done!)
2838      5. Verify the blob proofs with the `kzg4844` (Done!)
2839    Stateful validations
2840    1. Ensure transaction nonce is higher than the `from` address stored nonce
2841    2. Certain pools do not allow for nonce gaps. Ensure a gap is not produced (that is, the transaction nonce is exactly the following of the stored one)
2842    3. Ensure the transactor has enough funds to cover transaction cost:
2843        - Transaction cost is calculated as `(gas * gasPrice) + (blobGas * blobGasPrice) + value`
2844    4. In case of transaction reorg, ensure the transactor has enough funds to cover for transaction replacements without overdrafts.
2845    - This is done by comparing the total spent gas of the transactor from all pooled transactions, and accounting for the necessary gas spenditure if any of those transactions is replaced.
2846    5. Ensure the transactor is able to add a new transaction. The number of transactions sent by an account may be limited by a certain configured value
2847
2848    */
2849    /// Returns the hash of the transaction to replace in case the nonce already exists
2850    pub async fn validate_transaction(
2851        &self,
2852        tx: &Transaction,
2853        sender: Address,
2854    ) -> Result<Option<H256>, MempoolError> {
2855        let nonce = tx.nonce();
2856
2857        if matches!(tx, &Transaction::PrivilegedL2Transaction(_)) {
2858            return Ok(None);
2859        }
2860
2861        let header_no = self.storage.get_latest_block_number().await?;
2862        let header = self
2863            .storage
2864            .get_block_header(header_no)?
2865            .ok_or(MempoolError::NoBlockHeaderError)?;
2866        let config = self.storage.get_chain_config();
2867
2868        // Wire size cap for non-blob txs: peer-policy default, not consensus.
2869        // Matches geth `txMaxSize` (legacypool), reth `DEFAULT_MAX_TX_INPUT_BYTES`,
2870        // nethermind `MaxTxSize`. Blob txs are bounded by their own
2871        // wire-wrapper cap (`MAX_BLOB_TX_SIZE`) in `add_blob_transaction_to_pool`,
2872        // which sums the core tx and the sidecar to match geth/nethermind/erigon
2873        // scope.
2874        if !matches!(tx, Transaction::EIP4844Transaction(_)) {
2875            let encoded_len = tx.encode_canonical_len();
2876            if encoded_len > MAX_TX_SIZE {
2877                return Err(MempoolError::TxSizeExceeded {
2878                    actual: encoded_len,
2879                    limit: MAX_TX_SIZE,
2880                });
2881            }
2882        }
2883
2884        // Check init code size
2885        // [EIP-7954] - Amsterdam increases the limit
2886        let max_initcode_size = if config.is_amsterdam_activated(header.timestamp) {
2887            AMSTERDAM_MAX_INITCODE_SIZE
2888        } else {
2889            MAX_INITCODE_SIZE
2890        };
2891        if config.is_shanghai_activated(header.timestamp)
2892            && tx.is_contract_creation()
2893            && tx.data().len() > max_initcode_size as usize
2894        {
2895            return Err(MempoolError::TxMaxInitCodeSizeError);
2896        }
2897
2898        if config.is_osaka_activated(header.timestamp)
2899            && !config.is_amsterdam_activated(header.timestamp)
2900            && tx.gas_limit() > POST_OSAKA_GAS_LIMIT_CAP
2901        {
2902            // https://eips.ethereum.org/EIPS/eip-7825
2903            return Err(MempoolError::TxMaxGasLimitExceededError(
2904                tx.hash(),
2905                tx.gas_limit(),
2906            ));
2907        }
2908
2909        // Check gas limit is less than header's gas limit
2910        if header.gas_limit < tx.gas_limit() {
2911            return Err(MempoolError::TxGasLimitExceededError);
2912        }
2913
2914        // Check priority fee is less or equal than gas fee gap
2915        if tx.max_priority_fee().unwrap_or(0) > tx.max_fee_per_gas().unwrap_or(0) {
2916            return Err(MempoolError::TxTipAboveFeeCapError);
2917        }
2918
2919        // Check that the gas limit covers the gas needs for transaction metadata.
2920        if tx.gas_limit() < mempool::transaction_intrinsic_gas(tx, &header, &config)? {
2921            return Err(MempoolError::TxIntrinsicGasCostAboveLimitError);
2922        }
2923
2924        // Check that the specified blob gas fee is above the minimum value
2925        if let Some(fee) = tx.max_fee_per_blob_gas() {
2926            // Blob tx fee checks
2927            if fee < MIN_BASE_FEE_PER_BLOB_GAS.into() {
2928                return Err(MempoolError::TxBlobBaseFeeTooLowError);
2929            }
2930        };
2931
2932        let maybe_sender_acc_info = self.storage.get_account_info(header_no, sender).await?;
2933
2934        if let Some(sender_acc_info) = maybe_sender_acc_info {
2935            if nonce < sender_acc_info.nonce || nonce == u64::MAX {
2936                return Err(MempoolError::NonceTooLow);
2937            }
2938
2939            let tx_cost = tx
2940                .cost_without_base_fee()
2941                .ok_or(MempoolError::InvalidTxGasvalues)?;
2942
2943            if tx_cost > sender_acc_info.balance {
2944                return Err(MempoolError::NotEnoughBalance);
2945            }
2946        } else {
2947            // An account that is not in the database cannot possibly have enough balance to cover the transaction cost
2948            return Err(MempoolError::NotEnoughBalance);
2949        }
2950
2951        // Check the nonce of pendings TXs in the mempool from the same sender
2952        // If it exists check if the new tx has higher fees
2953        let tx_to_replace_hash = self.mempool.find_tx_to_replace(sender, nonce, tx)?;
2954
2955        if tx
2956            .chain_id()
2957            .is_some_and(|chain_id| chain_id != config.chain_id)
2958        {
2959            return Err(MempoolError::InvalidChainId(config.chain_id));
2960        }
2961
2962        Ok(tx_to_replace_hash)
2963    }
2964
2965    /// Marks the node's chain as up to date with the current chain
2966    /// Once the initial sync has taken place, the node will be considered as sync
2967    pub fn set_synced(&self) {
2968        self.is_synced.store(true, Ordering::Relaxed);
2969    }
2970
2971    /// Marks the node's chain as not up to date with the current chain.
2972    /// This will be used when the node is one batch or more behind the current chain.
2973    pub fn set_not_synced(&self) {
2974        self.is_synced.store(false, Ordering::Relaxed);
2975    }
2976
2977    /// Returns whether the node's chain is up to date with the current chain
2978    /// This will be true if the initial sync has already taken place and does not reflect whether there is an ongoing sync process
2979    /// The node should accept incoming p2p transactions if this method returns true
2980    pub fn is_synced(&self) -> bool {
2981        self.is_synced.load(Ordering::Relaxed)
2982    }
2983
2984    pub fn get_p2p_transaction_by_hash(&self, hash: &H256) -> Result<P2PTransaction, StoreError> {
2985        let Some(tx) = self.mempool.get_transaction_by_hash(*hash)? else {
2986            return Err(StoreError::Custom(format!(
2987                "Hash {hash} not found in the mempool",
2988            )));
2989        };
2990        let result = match tx {
2991            Transaction::LegacyTransaction(itx) => P2PTransaction::LegacyTransaction(itx),
2992            Transaction::EIP2930Transaction(itx) => P2PTransaction::EIP2930Transaction(itx),
2993            Transaction::EIP1559Transaction(itx) => P2PTransaction::EIP1559Transaction(itx),
2994            Transaction::EIP4844Transaction(itx) => {
2995                let Some(bundle) = self.mempool.get_blobs_bundle(*hash)? else {
2996                    return Err(StoreError::Custom(format!(
2997                        "Blob transaction present without its bundle: hash {hash}",
2998                    )));
2999                };
3000
3001                P2PTransaction::EIP4844TransactionWithBlobs(WrappedEIP4844Transaction {
3002                    tx: itx,
3003                    wrapper_version: (bundle.version != 0).then_some(bundle.version),
3004                    blobs_bundle: bundle,
3005                })
3006            }
3007            Transaction::EIP7702Transaction(itx) => P2PTransaction::EIP7702Transaction(itx),
3008            // Exclude privileged transactions as they are only created
3009            // by the lead sequencer. In the future, they might get gossiped
3010            // like the rest.
3011            Transaction::PrivilegedL2Transaction(_) => {
3012                return Err(StoreError::Custom(
3013                    "Privileged Transactions are not supported in P2P".to_string(),
3014                ));
3015            }
3016            Transaction::FeeTokenTransaction(itx) => P2PTransaction::FeeTokenTransaction(itx),
3017        };
3018
3019        Ok(result)
3020    }
3021
3022    pub fn new_evm(&self, vm_db: StoreVmDatabase) -> Result<Evm, EvmError> {
3023        new_evm(&self.options.r#type, vm_db)
3024    }
3025
3026    /// Get the current fork of the chain, based on the latest block's timestamp
3027    pub async fn current_fork(&self) -> Result<Fork, StoreError> {
3028        let chain_config = self.storage.get_chain_config();
3029        let latest_block_number = self.storage.get_latest_block_number().await?;
3030        let latest_block = self
3031            .storage
3032            .get_block_header(latest_block_number)?
3033            .ok_or(StoreError::Custom("Latest block not in DB".to_string()))?;
3034        Ok(chain_config.fork(latest_block.timestamp))
3035    }
3036}
3037
3038/// Open a state trie or storage trie depending on whether `prefix` is given.
3039fn load_trie(
3040    storage: &Store,
3041    parent_state_root: H256,
3042    prefix: Option<H256>,
3043) -> Result<Trie, StoreError> {
3044    Ok(match prefix {
3045        Some(account_hash) => {
3046            let state_trie = storage.open_state_trie(parent_state_root)?;
3047            let storage_root = match state_trie.get(account_hash.as_bytes())? {
3048                Some(rlp) => AccountState::decode(&rlp)?.storage_root,
3049                None => *EMPTY_TRIE_HASH,
3050            };
3051            storage.open_storage_trie(account_hash, parent_state_root, storage_root)?
3052        }
3053        None => storage.open_state_trie(parent_state_root)?,
3054    })
3055}
3056
3057/// Collapse a root branch node into an extension or leaf if it has only one valid child.
3058/// Returns `None` if there are no valid children.
3059fn collapse_root_node(
3060    storage: &Store,
3061    parent_state_root: H256,
3062    prefix: Option<H256>,
3063    root: BranchNode,
3064) -> Result<Option<Node>, StoreError> {
3065    let children: Vec<(usize, &NodeRef)> = root
3066        .choices
3067        .iter()
3068        .enumerate()
3069        .filter(|(_, choice)| choice.is_valid())
3070        .take(2)
3071        .collect();
3072    if children.len() > 1 {
3073        return Ok(Some(Node::Branch(Box::from(root))));
3074    }
3075    let Some((choice, only_child)) = children.first() else {
3076        return Ok(None);
3077    };
3078    let only_child = Arc::unwrap_or_clone(match only_child {
3079        NodeRef::Node(node, _) => node.clone(),
3080        noderef @ NodeRef::Hash(_) => {
3081            let trie = load_trie(storage, parent_state_root, prefix)?;
3082            let Some(node) = noderef.get_node(trie.db(), Nibbles::from_hex(vec![*choice as u8]))?
3083            else {
3084                return Ok(None);
3085            };
3086            node
3087        }
3088    });
3089    Ok(Some(match only_child {
3090        Node::Branch(_) => {
3091            ExtensionNode::new(Nibbles::from_hex(vec![*choice as u8]), only_child.into()).into()
3092        }
3093        Node::Extension(mut extension_node) => {
3094            extension_node.prefix.prepend(*choice as u8);
3095            extension_node.into()
3096        }
3097        Node::Leaf(mut leaf) => {
3098            leaf.partial.prepend(*choice as u8);
3099            leaf.into()
3100        }
3101    }))
3102}
3103
3104/// Collect the state trie shard, merge pre-collected nodes, and send results.
3105fn collect_and_send(
3106    index: u8,
3107    state_trie: &mut Trie,
3108    pre_collected_state: &mut Vec<TrieNode>,
3109    storage_nodes: &mut Vec<(H256, Vec<TrieNode>)>,
3110    tx: Sender<CollectedStateMsg>,
3111) -> Result<(), StoreError> {
3112    let (subroot, mut state_nodes) = collect_trie(index, std::mem::take(state_trie))?;
3113    if !pre_collected_state.is_empty() {
3114        let mut pre = std::mem::take(pre_collected_state);
3115        pre.extend(state_nodes);
3116        state_nodes = pre;
3117    }
3118    tx.send(CollectedStateMsg {
3119        index,
3120        subroot,
3121        state_nodes,
3122        storage_nodes: std::mem::take(storage_nodes),
3123    })
3124    .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3125    Ok(())
3126}
3127
3128/// Open or get an existing storage trie for the given account prefix.
3129fn get_or_open_storage_trie<'a>(
3130    storage_tries: &'a mut FxHashMap<H256, Trie>,
3131    storage: &Store,
3132    parent_state_root: H256,
3133    prefix: H256,
3134    storage_root: H256,
3135) -> Result<&'a mut Trie, StoreError> {
3136    match storage_tries.entry(prefix) {
3137        Entry::Occupied(e) => Ok(e.into_mut()),
3138        Entry::Vacant(e) => {
3139            Ok(e.insert(storage.open_storage_trie(prefix, parent_state_root, storage_root)?))
3140        }
3141    }
3142}
3143
3144fn handle_subtrie(
3145    storage: Store,
3146    rx: cb::Receiver<WorkerRequest>,
3147    parent_state_root: H256,
3148    index: u8,
3149    worker_senders: Vec<cb::Sender<WorkerRequest>>,
3150    shutdown_rx: cb::Receiver<()>,
3151) -> Result<(), StoreError> {
3152    let mut state_trie = storage.open_state_trie(parent_state_root)?;
3153    let mut storage_nodes: Vec<(H256, Vec<TrieNode>)> = vec![];
3154    let mut accounts: FxHashMap<H256, AccountState> = Default::default();
3155    let mut expected_shards: FxHashMap<H256, u16> = Default::default();
3156    let mut storage_state: FxHashMap<H256, PreMerkelizedAccountState> = Default::default();
3157    let mut received_shards: FxHashMap<H256, u16> = Default::default();
3158    let mut pending_storage_accounts: usize = 0;
3159    let mut pending_collect_tx: Option<Sender<CollectedStateMsg>> = None;
3160    let mut pre_collected_state: Vec<TrieNode> = vec![];
3161    let mut storage_tries: FxHashMap<H256, Trie> = Default::default();
3162    let mut pre_collected_storage: FxHashMap<H256, Vec<TrieNode>> = Default::default();
3163
3164    // Held until collection finishes to keep cross-worker channels open.
3165    let mut worker_senders: Option<Vec<cb::Sender<WorkerRequest>>> = Some(worker_senders);
3166    let mut dirty = false;
3167    // When active, we finalize one storage trie per loop iteration,
3168    // interleaving with incoming StorageShard messages.
3169    let mut collecting_storages = false;
3170    let mut routing_complete = false;
3171    let mut routing_done_mask: u16 = 0;
3172    let mut storage_to_collect: Vec<(H256, Trie)> = vec![];
3173
3174    loop {
3175        // When collecting storages, finalize one trie per iteration so that
3176        // incoming StorageShard messages can be processed in between.
3177        if collecting_storages {
3178            if let Some((prefix, trie)) = storage_to_collect.pop() {
3179                let senders = worker_senders
3180                    .as_ref()
3181                    .expect("collecting after senders dropped");
3182                let (root, mut nodes) = collect_trie(index, trie)?;
3183                if let Some(mut pre_nodes) = pre_collected_storage.remove(&prefix) {
3184                    pre_nodes.extend(nodes);
3185                    nodes = pre_nodes;
3186                }
3187                let bucket = prefix.as_fixed_bytes()[0] >> 4;
3188                senders[bucket as usize]
3189                    .send(WorkerRequest::StorageShard {
3190                        prefix,
3191                        index,
3192                        subroot: root,
3193                        nodes,
3194                    })
3195                    .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3196            } else {
3197                // All storage tries finalized
3198                worker_senders = None;
3199                collecting_storages = false;
3200                // Check if deferred collect can resolve now
3201                if pending_storage_accounts == 0
3202                    && let Some(tx) = pending_collect_tx.take()
3203                {
3204                    collect_and_send(
3205                        index,
3206                        &mut state_trie,
3207                        &mut pre_collected_state,
3208                        &mut storage_nodes,
3209                        tx,
3210                    )?;
3211                    break;
3212                }
3213            }
3214        }
3215
3216        // When collecting or dirty, poll non-blocking so we can interleave.
3217        // When clean and not collecting, just block.
3218        let msg = if collecting_storages || dirty {
3219            match rx.try_recv() {
3220                Ok(msg) => msg,
3221                Err(TryRecvError::Disconnected) => break,
3222                Err(TryRecvError::Empty) => {
3223                    // Check for shutdown signal from watcher
3224                    if matches!(shutdown_rx.try_recv(), Err(TryRecvError::Disconnected)) {
3225                        return Err(StoreError::Custom("shard worker shutdown".into()));
3226                    }
3227                    if dirty {
3228                        // Pre-collect state trie — safe during storage
3229                        // collection too, since StorageShard resolution only
3230                        // dirties specific paths that get re-committed later.
3231                        let mut nodes = state_trie.commit_without_storing(&NativeCrypto);
3232                        nodes.retain(|(nib, _)| nib.as_ref().first() == Some(&index));
3233                        pre_collected_state.extend(nodes);
3234                        if !collecting_storages {
3235                            // Pre-collect storage tries (only when not draining)
3236                            for (prefix, trie) in storage_tries.iter_mut() {
3237                                let mut nodes = trie.commit_without_storing(&NativeCrypto);
3238                                nodes.retain(|(nib, _)| nib.as_ref().first() == Some(&index));
3239                                if !nodes.is_empty() {
3240                                    pre_collected_storage
3241                                        .entry(*prefix)
3242                                        .or_default()
3243                                        .extend(nodes);
3244                                }
3245                            }
3246                        }
3247                        dirty = false;
3248                    }
3249                    continue;
3250                }
3251            }
3252        } else {
3253            select! {
3254                recv(rx) -> msg => match msg {
3255                    Ok(msg) => msg,
3256                    Err(_) => break,
3257                },
3258                recv(shutdown_rx) -> _ => {
3259                    return Err(StoreError::Custom("shard worker shutdown".into()));
3260                }
3261            }
3262        };
3263
3264        match msg {
3265            WorkerRequest::ProcessAccount {
3266                prefix,
3267                info,
3268                storage: account_storage,
3269                removed,
3270                removed_storage,
3271            } => {
3272                let senders = worker_senders
3273                    .as_ref()
3274                    .expect("ProcessAccount after collection started");
3275
3276                // Always load account to warm state trie during execution overlap
3277                match accounts.entry(prefix) {
3278                    Entry::Occupied(_) => {}
3279                    Entry::Vacant(vacant_entry) => {
3280                        let account_state = match state_trie.get(prefix.as_bytes())? {
3281                            Some(rlp) => {
3282                                let state = AccountState::decode(&rlp)?;
3283                                state_trie.insert(prefix.as_bytes().to_vec(), rlp)?;
3284                                state
3285                            }
3286                            None => AccountState::default(),
3287                        };
3288                        vacant_entry.insert(account_state);
3289                    }
3290                }
3291
3292                // Apply info immediately and insert into trie
3293                if let Some(info) = info {
3294                    let acct = accounts.get_mut(&prefix).expect("just loaded");
3295                    acct.nonce = info.nonce;
3296                    acct.balance = info.balance;
3297                    acct.code_hash = info.code_hash;
3298                    let path = prefix.as_bytes();
3299                    if *acct != AccountState::default() {
3300                        state_trie.insert(path.to_vec(), acct.encode_to_vec())?;
3301                    } else {
3302                        state_trie.remove(path)?;
3303                    }
3304                }
3305
3306                if removed || removed_storage {
3307                    // Delete locally + send DeleteStorage to other 15 workers
3308                    pre_collected_storage.remove(&prefix);
3309                    storage_tries.insert(prefix, Trie::new_temp());
3310                    for (i, tx) in senders.iter().enumerate() {
3311                        if i as u8 != index {
3312                            tx.send(WorkerRequest::DeleteStorage(prefix))
3313                                .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3314                        }
3315                    }
3316                    accounts.get_mut(&prefix).expect("just loaded").storage_root = *EMPTY_TRIE_HASH;
3317                    if expected_shards.insert(prefix, 0xFFFF).is_none() {
3318                        pending_storage_accounts += 1;
3319                    }
3320                    if removed {
3321                        dirty = true;
3322                        continue;
3323                    }
3324                }
3325
3326                if !account_storage.is_empty() {
3327                    let storage_root = accounts
3328                        .get(&prefix)
3329                        .map(|a| a.storage_root)
3330                        .unwrap_or(*EMPTY_TRIE_HASH);
3331
3332                    let is_new = !expected_shards.contains_key(&prefix);
3333                    for (key, value) in account_storage {
3334                        let hashed_key = keccak(key);
3335                        let bucket = hashed_key.as_fixed_bytes()[0] >> 4;
3336                        *expected_shards.entry(prefix).or_insert(0u16) |= 1 << bucket;
3337                        if bucket == index {
3338                            // Local storage: insert directly
3339                            let trie = get_or_open_storage_trie(
3340                                &mut storage_tries,
3341                                &storage,
3342                                parent_state_root,
3343                                prefix,
3344                                storage_root,
3345                            )?;
3346                            if value.is_zero() {
3347                                trie.remove(hashed_key.as_bytes())?;
3348                            } else {
3349                                trie.insert(hashed_key.as_bytes().to_vec(), value.encode_to_vec())?;
3350                            }
3351                        } else {
3352                            senders[bucket as usize]
3353                                .send(WorkerRequest::MerklizeStorage {
3354                                    prefix,
3355                                    key: hashed_key,
3356                                    value,
3357                                    storage_root,
3358                                })
3359                                .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3360                        }
3361                    }
3362                    if is_new {
3363                        pending_storage_accounts += 1;
3364                    }
3365                }
3366                dirty = true;
3367            }
3368            WorkerRequest::MerklizeStorage {
3369                prefix,
3370                key,
3371                value,
3372                storage_root,
3373            } => {
3374                let trie = get_or_open_storage_trie(
3375                    &mut storage_tries,
3376                    &storage,
3377                    parent_state_root,
3378                    prefix,
3379                    storage_root,
3380                )?;
3381                if value.is_zero() {
3382                    trie.remove(key.as_bytes())?;
3383                } else {
3384                    trie.insert(key.as_bytes().to_vec(), value.encode_to_vec())?;
3385                }
3386                dirty = true;
3387            }
3388            WorkerRequest::DeleteStorage(prefix) => {
3389                pre_collected_storage.remove(&prefix);
3390                storage_tries.insert(prefix, Trie::new_temp());
3391                dirty = true;
3392            }
3393            WorkerRequest::FinishRouting => {
3394                // Signal all workers that we're done routing MerklizeStorage.
3395                let senders = worker_senders
3396                    .as_ref()
3397                    .expect("FinishRouting after senders dropped");
3398                for i in 0..16u8 {
3399                    senders[i as usize]
3400                        .send(WorkerRequest::RoutingDone { from: index })
3401                        .map_err(|e| StoreError::Custom(format!("send error: {e}")))?;
3402                }
3403            }
3404            WorkerRequest::RoutingDone { from } => {
3405                routing_done_mask |= 1u16 << from;
3406                if routing_done_mask == 0xFFFF && !collecting_storages && !routing_complete {
3407                    collecting_storages = true;
3408                    routing_complete = true;
3409                    storage_to_collect = storage_tries.drain().collect();
3410                }
3411            }
3412            WorkerRequest::MerklizeAccounts { accounts: batch } => {
3413                // Info already applied in ProcessAccount — just record empty storage nodes
3414                for hashed_account in batch {
3415                    storage_nodes.push((hashed_account, vec![]));
3416                }
3417            }
3418            WorkerRequest::StorageShard {
3419                prefix,
3420                index: shard_index,
3421                mut subroot,
3422                nodes,
3423            } => {
3424                let state = storage_state.entry(prefix).or_default();
3425                match &mut state.storage_root {
3426                    Some(root) => {
3427                        root.choices[shard_index as usize] =
3428                            std::mem::take(&mut subroot.choices[shard_index as usize]);
3429                    }
3430                    rootptr => {
3431                        *rootptr = Some(subroot);
3432                    }
3433                }
3434                state.nodes.extend(nodes);
3435
3436                let received = received_shards.entry(prefix).or_insert(0u16);
3437                *received |= 1 << shard_index;
3438                if *received == expected_shards.get(&prefix).copied().unwrap_or(0) {
3439                    // All shards received — resolve storage root
3440                    let mut state = storage_state.remove(&prefix).expect("shard without state");
3441                    let new_storage_root = if let Some(mut root) = state.storage_root {
3442                        // Children from other shards need clear_hash to be re-committed.
3443                        root.choices.iter_mut().for_each(NodeRef::clear_hash);
3444                        let collapsed =
3445                            collapse_root_node(&storage, parent_state_root, Some(prefix), *root)?;
3446                        if let Some(root) = collapsed {
3447                            let mut root = NodeRef::from(root);
3448                            let hash =
3449                                root.commit(Nibbles::default(), &mut state.nodes, &NativeCrypto);
3450                            let _ = DROP_SENDER.send(Box::new(root));
3451                            hash.finalize(&NativeCrypto)
3452                        } else {
3453                            state.nodes.push((Nibbles::default(), vec![RLP_NULL]));
3454                            *EMPTY_TRIE_HASH
3455                        }
3456                    } else {
3457                        *EMPTY_TRIE_HASH
3458                    };
3459                    storage_nodes.push((prefix, state.nodes));
3460
3461                    // Update account's storage root and re-insert into state trie
3462                    let old_state = accounts.get_mut(&prefix).expect("loaded in ProcessAccount");
3463                    old_state.storage_root = new_storage_root;
3464                    let path = prefix.as_bytes();
3465                    if *old_state != AccountState::default() {
3466                        state_trie.insert(path.to_vec(), old_state.encode_to_vec())?;
3467                    } else {
3468                        state_trie.remove(path)?;
3469                    }
3470
3471                    dirty = true;
3472                    pending_storage_accounts -= 1;
3473                    if pending_storage_accounts == 0
3474                        && !collecting_storages
3475                        && routing_complete
3476                        && let Some(tx) = pending_collect_tx.take()
3477                    {
3478                        collect_and_send(
3479                            index,
3480                            &mut state_trie,
3481                            &mut pre_collected_state,
3482                            &mut storage_nodes,
3483                            tx,
3484                        )?;
3485                        break;
3486                    }
3487                }
3488            }
3489            WorkerRequest::CollectState { tx } => {
3490                if pending_storage_accounts == 0 && !collecting_storages && routing_complete {
3491                    collect_and_send(
3492                        index,
3493                        &mut state_trie,
3494                        &mut pre_collected_state,
3495                        &mut storage_nodes,
3496                        tx,
3497                    )?;
3498                    break;
3499                }
3500                // Defer until collection is done and all StorageShards resolved
3501                pending_collect_tx = Some(tx);
3502            }
3503        }
3504    }
3505    Ok(())
3506}
3507
3508pub fn new_evm(blockchain_type: &BlockchainType, vm_db: StoreVmDatabase) -> Result<Evm, EvmError> {
3509    let evm = match blockchain_type {
3510        BlockchainType::L1 => Evm::new_for_l1(vm_db, Arc::new(NativeCrypto)),
3511        BlockchainType::L2(l2_config) => {
3512            let fee_config = *l2_config
3513                .fee_config
3514                .read()
3515                .map_err(|_| EvmError::Custom("Fee config lock was poisoned".to_string()))?;
3516
3517            Evm::new_for_l2(vm_db, fee_config, Arc::new(NativeCrypto))?
3518        }
3519    };
3520    Ok(evm)
3521}
3522
3523/// Performs post-execution checks
3524pub fn validate_state_root(
3525    block_header: &BlockHeader,
3526    new_state_root: H256,
3527) -> Result<(), ChainError> {
3528    // Compare state root
3529    if new_state_root == block_header.state_root {
3530        Ok(())
3531    } else {
3532        Err(ChainError::InvalidBlock(
3533            InvalidBlockError::StateRootMismatch,
3534        ))
3535    }
3536}
3537
3538// Returns the hash of the head of the canonical chain (the latest valid hash).
3539pub async fn latest_canonical_block_hash(storage: &Store) -> Result<H256, ChainError> {
3540    let latest_block_number = storage.get_latest_block_number().await?;
3541    if let Some(latest_valid_header) = storage.get_block_header(latest_block_number)? {
3542        let latest_valid_hash = latest_valid_header.hash();
3543        return Ok(latest_valid_hash);
3544    }
3545    Err(ChainError::StoreError(StoreError::Custom(
3546        "Could not find latest valid hash".to_string(),
3547    )))
3548}
3549
3550/// Searchs the header of the parent block header. If the parent header is missing,
3551/// Returns a ChainError::ParentNotFound. If the storage has an error it propagates it
3552pub fn find_parent_header(
3553    block_header: &BlockHeader,
3554    storage: &Store,
3555) -> Result<BlockHeader, ChainError> {
3556    match storage.get_block_header_by_hash(block_header.parent_hash)? {
3557        Some(parent_header) => Ok(parent_header),
3558        None => Err(ChainError::ParentNotFound),
3559    }
3560}
3561
3562pub async fn is_canonical(
3563    store: &Store,
3564    block_number: BlockNumber,
3565    block_hash: BlockHash,
3566) -> Result<bool, StoreError> {
3567    match store.get_canonical_block_hash(block_number).await? {
3568        Some(hash) if hash == block_hash => Ok(true),
3569        _ => Ok(false),
3570    }
3571}
3572
3573fn branchify(node: Node) -> Box<BranchNode> {
3574    match node {
3575        Node::Branch(branch_node) => branch_node,
3576        Node::Extension(extension_node) => {
3577            let index = extension_node.prefix.as_ref()[0];
3578            let noderef = if extension_node.prefix.len() == 1 {
3579                extension_node.child
3580            } else {
3581                let prefix = extension_node.prefix.offset(1);
3582                let node = ExtensionNode::new(prefix, extension_node.child);
3583                NodeRef::from(Arc::new(node.into()))
3584            };
3585            let mut choices = BranchNode::EMPTY_CHOICES;
3586            choices[index as usize] = noderef;
3587            Box::new(BranchNode::new(choices))
3588        }
3589        Node::Leaf(leaf_node) => {
3590            let index = leaf_node.partial.as_ref()[0];
3591            let node = LeafNode::new(leaf_node.partial.offset(1), leaf_node.value);
3592            let mut choices = BranchNode::EMPTY_CHOICES;
3593            choices[index as usize] = NodeRef::from(Arc::new(node.into()));
3594            Box::new(BranchNode::new(choices))
3595        }
3596    }
3597}
3598
3599fn collect_trie(index: u8, mut trie: Trie) -> Result<(Box<BranchNode>, Vec<TrieNode>), TrieError> {
3600    let root = branchify(
3601        trie.root_node()?
3602            .map(Arc::unwrap_or_clone)
3603            .unwrap_or_else(|| Node::Branch(Box::default())),
3604    );
3605    trie.root = Node::Branch(root).into();
3606    let (_, mut nodes) = trie.collect_changes_since_last_hash(&NativeCrypto);
3607    nodes.retain(|(nib, _)| nib.as_ref().first() == Some(&index));
3608
3609    let Some(Node::Branch(root)) = trie.root_node()?.map(Arc::unwrap_or_clone) else {
3610        return Err(TrieError::InvalidInput);
3611    };
3612    Ok((root, nodes))
3613}