Skip to main content

ethrex_blockchain/
payload.rs

1use std::{
2    cmp::{Ordering, max},
3    ops::Div,
4    sync::Arc,
5    time::{Duration, Instant},
6};
7
8use rustc_hash::FxHashMap;
9
10use ethrex_common::{
11    Address, Bloom, Bytes, H256, U256,
12    constants::{
13        DEFAULT_OMMERS_HASH, DEFAULT_REQUESTS_HASH, GAS_PER_BLOB, MAX_RLP_BLOCK_SIZE,
14        TX_MAX_GAS_LIMIT_AMSTERDAM,
15    },
16    types::{
17        AccountUpdate, BlobsBundle, Block, BlockBody, BlockHash, BlockHeader, BlockNumber,
18        ChainConfig, Fork, MempoolTransaction, Receipt, Transaction, TxKind, TxType, Withdrawal,
19        block_access_list::BlockAccessList,
20        bloom_from_logs, calc_excess_blob_gas, calculate_base_fee_per_blob_gas,
21        calculate_base_fee_per_gas, compute_receipts_root, compute_transactions_root,
22        compute_withdrawals_root,
23        requests::{EncodedRequests, compute_requests_hash},
24    },
25};
26
27use ethrex_crypto::NativeCrypto;
28use ethrex_crypto::keccak::Keccak256;
29use ethrex_vm::{Evm, EvmError, check_2d_gas_allowance};
30
31use ethrex_rlp::encode::RLPEncode;
32use ethrex_storage::{Store, error::StoreError};
33
34use ethrex_metrics::metrics;
35
36#[cfg(feature = "metrics")]
37use ethrex_metrics::blocks::METRICS_BLOCKS;
38#[cfg(feature = "metrics")]
39use ethrex_metrics::transactions::{METRICS_TX, MetricsTxType};
40use tokio_util::sync::CancellationToken;
41
42use crate::{
43    Blockchain, BlockchainType, MAX_PAYLOADS,
44    constants::{GAS_LIMIT_BOUND_DIVISOR, MIN_GAS_LIMIT, TX_GAS_COST},
45    error::{ChainError, InvalidBlockError},
46    mempool::PendingTxFilter,
47    new_evm,
48    vm::StoreVmDatabase,
49};
50
51use thiserror::Error;
52use tracing::{debug, warn};
53
54#[derive(Debug)]
55pub struct PayloadBuildTask {
56    task: tokio::task::JoinHandle<Result<PayloadBuildResult, ChainError>>,
57    cancel: CancellationToken,
58}
59
60#[derive(Debug)]
61pub enum PayloadOrTask {
62    Payload(Box<PayloadBuildResult>),
63    Task(PayloadBuildTask),
64}
65
66impl PayloadBuildTask {
67    /// Finishes the current payload build process and returns its result
68    pub async fn finish(self) -> Result<PayloadBuildResult, ChainError> {
69        self.cancel.cancel();
70        self.task
71            .await
72            .map_err(|_| ChainError::Custom("Failed to join task".to_string()))?
73    }
74}
75
76impl PayloadOrTask {
77    /// Converts self into a `PayloadOrTask::Payload` by finishing the current build task
78    /// If self is already a `PayloadOrTask::Payload` this is a NoOp
79    pub async fn to_payload(self) -> Result<Self, ChainError> {
80        Ok(match self {
81            PayloadOrTask::Payload(_) => self,
82            PayloadOrTask::Task(task) => PayloadOrTask::Payload(Box::new(task.finish().await?)),
83        })
84    }
85}
86
87pub struct BuildPayloadArgs {
88    pub parent: BlockHash,
89    pub timestamp: u64,
90    pub fee_recipient: Address,
91    pub random: H256,
92    pub withdrawals: Option<Vec<Withdrawal>>,
93    pub beacon_root: Option<H256>,
94    pub slot_number: Option<u64>,
95    pub version: u8,
96    pub elasticity_multiplier: u64,
97    pub gas_ceil: u64,
98}
99
100#[derive(Debug, Error)]
101pub enum BuildPayloadArgsError {
102    #[error("Payload hashed has wrong size")]
103    FailedToConvertPayload,
104}
105
106impl BuildPayloadArgs {
107    /// Computes an 8-byte identifier by hashing the components of the payload arguments.
108    pub fn id(&self) -> Result<u64, BuildPayloadArgsError> {
109        let mut hasher = Keccak256::new();
110        hasher.update(self.parent);
111        hasher.update(self.timestamp.to_be_bytes());
112        hasher.update(self.random);
113        hasher.update(self.fee_recipient);
114        if let Some(withdrawals) = &self.withdrawals {
115            hasher.update(withdrawals.encode_to_vec());
116        }
117        if let Some(beacon_root) = self.beacon_root {
118            hasher.update(beacon_root);
119        }
120        let res = &mut hasher.finalize()[..8];
121        res[0] = self.version;
122        Ok(u64::from_be_bytes(res.try_into().map_err(|_| {
123            BuildPayloadArgsError::FailedToConvertPayload
124        })?))
125    }
126}
127
128/// Creates a new payload based on the payload arguments
129// Basic payload block building, can and should be improved
130pub fn create_payload(
131    args: &BuildPayloadArgs,
132    storage: &Store,
133    extra_data: Bytes,
134) -> Result<Block, ChainError> {
135    let parent_block = storage
136        .get_block_header_by_hash(args.parent)?
137        .ok_or_else(|| ChainError::ParentNotFound)?;
138    let chain_config = storage.get_chain_config();
139    let fork = chain_config.fork(args.timestamp);
140    let gas_limit = calc_gas_limit(parent_block.gas_limit, args.gas_ceil);
141    let excess_blob_gas = chain_config
142        .get_fork_blob_schedule(args.timestamp)
143        .map(|schedule| calc_excess_blob_gas(&parent_block, schedule, fork));
144
145    let header = BlockHeader {
146        parent_hash: args.parent,
147        ommers_hash: *DEFAULT_OMMERS_HASH,
148        coinbase: args.fee_recipient,
149        state_root: parent_block.state_root,
150        transactions_root: compute_transactions_root(&[], &NativeCrypto),
151        receipts_root: compute_receipts_root(&[], &NativeCrypto),
152        logs_bloom: Bloom::default(),
153        difficulty: U256::zero(),
154        number: parent_block.number.saturating_add(1),
155        gas_limit,
156        gas_used: 0,
157        timestamp: args.timestamp,
158        extra_data,
159        prev_randao: args.random,
160        nonce: 0,
161        base_fee_per_gas: calculate_base_fee_per_gas(
162            gas_limit,
163            parent_block.gas_limit,
164            parent_block.gas_used,
165            parent_block.base_fee_per_gas.unwrap_or_default(),
166            args.elasticity_multiplier,
167        ),
168        withdrawals_root: chain_config
169            .is_shanghai_activated(args.timestamp)
170            .then_some(compute_withdrawals_root(
171                args.withdrawals.as_ref().unwrap_or(&Vec::new()),
172                &NativeCrypto,
173            )),
174        blob_gas_used: chain_config
175            .is_cancun_activated(args.timestamp)
176            .then_some(0),
177        excess_blob_gas,
178        parent_beacon_block_root: args.beacon_root,
179        requests_hash: chain_config
180            .is_prague_activated(args.timestamp)
181            .then_some(*DEFAULT_REQUESTS_HASH),
182        slot_number: args.slot_number,
183        ..Default::default()
184    };
185
186    let body = BlockBody {
187        transactions: Vec::new(),
188        ommers: Vec::new(),
189        withdrawals: args.withdrawals.clone(),
190    };
191
192    // Delay applying withdrawals until the payload is requested and built
193    Ok(Block::new(header, body))
194}
195
196pub fn calc_gas_limit(parent_gas_limit: u64, builder_gas_ceil: u64) -> u64 {
197    // TODO: check where we should get builder values from
198    let delta = parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR - 1;
199    let mut limit = parent_gas_limit;
200    let desired_limit = max(builder_gas_ceil, MIN_GAS_LIMIT);
201    if limit < desired_limit {
202        limit = parent_gas_limit + delta;
203        if limit > desired_limit {
204            limit = desired_limit
205        }
206        return limit;
207    }
208    if limit > desired_limit {
209        limit = parent_gas_limit - delta;
210        if limit < desired_limit {
211            limit = desired_limit
212        }
213    }
214    limit
215}
216
217#[derive(Clone)]
218pub struct PayloadBuildContext {
219    pub payload: Block,
220    pub remaining_gas: u64,
221    /// Cumulative gas spent (post-refund) for receipt tracking.
222    /// Per EIP-7778 this differs from `remaining_gas` which tracks pre-refund gas.
223    pub cumulative_gas_spent: u64,
224    /// EIP-8037 (Amsterdam+): cumulative regular (non-state) gas used.
225    pub block_regular_gas_used: u64,
226    /// EIP-8037 (Amsterdam+): cumulative state gas used.
227    pub block_state_gas_used: u64,
228    /// Whether Amsterdam fork is active for this block.
229    pub is_amsterdam: bool,
230    pub receipts: Vec<Receipt>,
231    pub requests: Option<Vec<EncodedRequests>>,
232    pub block_value: U256,
233    base_fee_per_blob_gas: U256,
234    pub blobs_bundle: BlobsBundle,
235    pub store: Store,
236    pub vm: Evm,
237    pub account_updates: Vec<AccountUpdate>,
238    pub payload_size: u64,
239    /// Block Access List for EIP-7928
240    pub block_access_list: Option<BlockAccessList>,
241}
242
243impl PayloadBuildContext {
244    pub fn new(
245        payload: Block,
246        storage: &Store,
247        blockchain_type: &BlockchainType,
248    ) -> Result<Self, EvmError> {
249        let config = storage.get_chain_config();
250        let base_fee_per_blob_gas = calculate_base_fee_per_blob_gas(
251            payload.header.excess_blob_gas.unwrap_or_default(),
252            config
253                .get_fork_blob_schedule(payload.header.timestamp)
254                .map(|schedule| schedule.base_fee_update_fraction)
255                .unwrap_or_default(),
256        );
257
258        let parent_header = storage
259            .get_block_header_by_hash(payload.header.parent_hash)
260            .map_err(|e| EvmError::DB(e.to_string()))?
261            .ok_or_else(|| EvmError::DB("parent header not found".to_string()))?;
262        let vm_db = StoreVmDatabase::new(storage.clone(), parent_header)?;
263        let mut vm = new_evm(blockchain_type, vm_db)?;
264
265        // Enable BAL recording for Amsterdam and later forks (EIP-7928)
266        if config.is_amsterdam_activated(payload.header.timestamp) {
267            vm.enable_bal_recording();
268            // Set index 0 for pre-execution phase (system contracts)
269            vm.set_bal_index(0);
270        }
271
272        let is_amsterdam = config.is_amsterdam_activated(payload.header.timestamp);
273        let payload_size = payload.length() as u64;
274        Ok(PayloadBuildContext {
275            remaining_gas: payload.header.gas_limit,
276            cumulative_gas_spent: 0,
277            block_regular_gas_used: 0,
278            block_state_gas_used: 0,
279            is_amsterdam,
280            receipts: vec![],
281            requests: config
282                .is_prague_activated(payload.header.timestamp)
283                .then_some(Vec::new()),
284            block_value: U256::zero(),
285            base_fee_per_blob_gas,
286            payload,
287            blobs_bundle: BlobsBundle::default(),
288            store: storage.clone(),
289            vm,
290            account_updates: Vec::new(),
291            payload_size,
292            block_access_list: None,
293        })
294    }
295
296    pub fn gas_used(&self) -> u64 {
297        if self.is_amsterdam {
298            // EIP-8037: block gas = max(sum_regular, sum_state)
299            self.block_regular_gas_used.max(self.block_state_gas_used)
300        } else {
301            self.payload.header.gas_limit - self.remaining_gas
302        }
303    }
304}
305
306impl PayloadBuildContext {
307    fn parent_hash(&self) -> BlockHash {
308        self.payload.header.parent_hash
309    }
310
311    pub fn block_number(&self) -> BlockNumber {
312        self.payload.header.number
313    }
314
315    fn chain_config(&self) -> ChainConfig {
316        self.store.get_chain_config()
317    }
318
319    fn base_fee_per_gas(&self) -> Option<u64> {
320        self.payload.header.base_fee_per_gas
321    }
322}
323
324#[derive(Debug, Clone)]
325pub struct PayloadBuildResult {
326    pub blobs_bundle: BlobsBundle,
327    pub block_value: U256,
328    pub receipts: Vec<Receipt>,
329    pub requests: Vec<EncodedRequests>,
330    pub account_updates: Vec<AccountUpdate>,
331    pub payload: Block,
332    /// Block Access List for EIP-7928
333    pub block_access_list: Option<BlockAccessList>,
334}
335
336impl From<PayloadBuildContext> for PayloadBuildResult {
337    fn from(value: PayloadBuildContext) -> Self {
338        let PayloadBuildContext {
339            blobs_bundle,
340            block_value,
341            requests,
342            receipts,
343            account_updates,
344            payload,
345            block_access_list,
346            ..
347        } = value;
348
349        Self {
350            blobs_bundle,
351            block_value,
352            requests: requests.unwrap_or_default(),
353            receipts,
354            account_updates,
355            payload,
356            block_access_list,
357        }
358    }
359}
360
361impl Blockchain {
362    /// Attempts to fetch a payload given it's id. If the payload is still being built, it will be finished.
363    /// Fails if there is no payload or active payload build task for the given id.
364    pub async fn get_payload(&self, payload_id: u64) -> Result<PayloadBuildResult, ChainError> {
365        let mut payloads = self.payloads.lock().await;
366        // Find the given payload and finish the active build process if needed
367        let idx = payloads
368            .iter()
369            .position(|(id, _)| id == &payload_id)
370            .ok_or(ChainError::UnknownPayload)?;
371        let finished_payload = (payload_id, payloads.remove(idx).1.to_payload().await?);
372        payloads.insert(idx, finished_payload);
373        // Return the held payload
374        match &payloads[idx].1 {
375            PayloadOrTask::Payload(payload) => Ok(*payload.clone()),
376            _ => unreachable!("we already converted the payload into a finished version"),
377        }
378    }
379
380    /// Starts a payload build process. The built payload can be retrieved by calling `get_payload`.
381    /// The build process will run for the full block building timeslot or until `get_payload` is called
382    pub async fn initiate_payload_build(self: Arc<Blockchain>, payload: Block, payload_id: u64) {
383        let self_clone = self.clone();
384        let cancel_token = CancellationToken::new();
385        let cancel_token_clone = cancel_token.clone();
386        let payload_build_task = tokio::task::spawn(async move {
387            self_clone
388                .build_payload_loop(payload, cancel_token_clone)
389                .await
390        });
391        let mut payloads = self.payloads.lock().await;
392        if payloads.len() >= MAX_PAYLOADS {
393            // Remove oldest unclaimed payload
394            payloads.remove(0);
395        }
396        payloads.push((
397            payload_id,
398            PayloadOrTask::Task(PayloadBuildTask {
399                task: payload_build_task,
400                cancel: cancel_token,
401            }),
402        ));
403    }
404
405    /// Build the given payload and keep on rebuilding it until either the time slot
406    /// given by `SECONDS_PER_SLOT` is up or the `cancel_token` is cancelled
407    pub async fn build_payload_loop(
408        self: Arc<Blockchain>,
409        payload: Block,
410        cancel_token: CancellationToken,
411    ) -> Result<PayloadBuildResult, ChainError> {
412        let start = Instant::now();
413        const SECONDS_PER_SLOT: Duration = Duration::from_secs(12);
414        // Attempt to rebuild the payload as many times within the given timeframe to maximize fee revenue
415        // TODO(#4997): start with an empty block
416        // Snapshot the mempool sequence *before* the build so any tx that lands
417        // during the build is seen as newer than the current `res`.
418        let mut last_built_seq = self.mempool.tx_seq();
419        let mut res = self.build_payload(payload.clone())?;
420        while start.elapsed() < SECONDS_PER_SLOT && !cancel_token.is_cancelled() {
421            // Wait for new transactions, cancellation, or slot deadline before rebuilding
422            let remaining = SECONDS_PER_SLOT.saturating_sub(start.elapsed());
423            let notified = self.mempool.tx_added().notified();
424            tokio::select! {
425                _ = notified => {}
426                _ = cancel_token.cancelled() => break,
427                _ = tokio::time::sleep(remaining) => break,
428            }
429            let payload = payload.clone();
430            let self_clone = self.clone();
431            let seq_before = self.mempool.tx_seq();
432            let building_task =
433                tokio::task::spawn_blocking(move || self_clone.build_payload(payload));
434            // Cancel the current build process and return the previous payload if it is requested earlier
435            // TODO(#5011): this doesn't stop the building task, but only keeps it running in the background,
436            //   which wastes CPU resources.
437            match cancel_token.run_until_cancelled(building_task).await {
438                Some(Ok(current_res)) => {
439                    res = current_res?;
440                    last_built_seq = seq_before;
441                }
442                Some(Err(err)) => {
443                    warn!(%err, "Payload-building task panicked");
444                }
445                None => {}
446            }
447        }
448
449        // If a tx landed after the snapshot that produced `res`, do one final
450        // build before returning. Covers both races: (a) cancellation dropping
451        // an in-progress rebuild via `run_until_cancelled`, and (b) the slot-
452        // timeout `select!` arm winning over a simultaneous `tx_added`
453        // notification near the slot boundary.
454        if self.mempool.tx_seq() > last_built_seq {
455            let blockchain = self.clone();
456            match tokio::task::spawn_blocking(move || blockchain.build_payload(payload)).await {
457                Ok(Ok(final_res)) => res = final_res,
458                Ok(Err(err)) => {
459                    warn!(%err, "Final payload rebuild failed; returning previous result")
460                }
461                Err(err) => warn!(%err, "Final payload rebuild task panicked"),
462            }
463        }
464
465        Ok(res)
466    }
467
468    /// Completes the payload building process, return the block value
469    pub fn build_payload(&self, payload: Block) -> Result<PayloadBuildResult, ChainError> {
470        let since = Instant::now();
471
472        debug!("Building payload");
473        let base_fee = payload.header.base_fee_per_gas.unwrap_or_default();
474        let mut context = PayloadBuildContext::new(payload, &self.storage, &self.options.r#type)?;
475
476        if let BlockchainType::L1 = self.options.r#type {
477            self.apply_system_operations(&mut context)?;
478        }
479        self.fill_transactions(&mut context)?;
480        // EIP-7928: Post-tx phase uses index n+1 for both requests and withdrawals.
481        // Order must match geth: requests (system calls) BEFORE withdrawals.
482        if context
483            .chain_config()
484            .is_amsterdam_activated(context.payload.header.timestamp)
485        {
486            let post_tx_index =
487                u32::try_from(context.payload.body.transactions.len() + 1).unwrap_or(u32::MAX);
488            context.vm.set_bal_index(post_tx_index);
489            // Record withdrawal recipients as touched addresses per EIP-7928
490            if let Some(recorder) = context.vm.db.bal_recorder_mut()
491                && let Some(withdrawals) = &context.payload.body.withdrawals
492            {
493                recorder.extend_touched_addresses(withdrawals.iter().map(|w| w.address));
494            }
495        }
496        self.extract_requests(&mut context)?;
497        self.apply_withdrawals(&mut context)?;
498        self.finalize_payload(&mut context)?;
499
500        let interval = Instant::now().duration_since(since).as_millis();
501
502        tracing::debug!(
503            "[METRIC] BUILDING PAYLOAD TOOK: {interval} ms, base fee {}",
504            base_fee
505        );
506        metrics!(METRICS_BLOCKS.set_block_building_ms(interval as i64));
507        metrics!(METRICS_BLOCKS.set_block_building_base_fee(base_fee as i64));
508        let gas_used = context.gas_used();
509        if gas_used > 0 {
510            let as_gigas = (gas_used as f64).div(10_f64.powf(9_f64));
511
512            if interval != 0 {
513                let throughput = (as_gigas) / (interval as f64) * 1000_f64;
514                metrics!(METRICS_BLOCKS.set_latest_gigagas_block_building(throughput));
515
516                tracing::debug!(
517                    "[METRIC] BLOCK BUILDING THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs"
518                );
519            }
520        }
521
522        Ok(context.into())
523    }
524
525    pub fn apply_withdrawals(&self, context: &mut PayloadBuildContext) -> Result<(), EvmError> {
526        let binding = Vec::new();
527        let withdrawals = context
528            .payload
529            .body
530            .withdrawals
531            .as_ref()
532            .unwrap_or(&binding);
533        context.vm.process_withdrawals(withdrawals)
534    }
535
536    // This function applies system level operations:
537    // - Call beacon root contract, and obtain the new state root
538    // - Call block hash process contract, and store parent block hash
539    pub fn apply_system_operations(
540        &self,
541        context: &mut PayloadBuildContext,
542    ) -> Result<(), EvmError> {
543        context.vm.apply_system_calls(&context.payload.header)
544    }
545
546    /// Fetches suitable transactions from the mempool
547    /// Returns two transaction queues, one for plain and one for blob txs
548    pub fn fetch_mempool_transactions(
549        &self,
550        context: &mut PayloadBuildContext,
551    ) -> Result<(TransactionQueue, TransactionQueue), ChainError> {
552        let blob_fee: u64 = context.base_fee_per_blob_gas.try_into().map_err(|_| {
553            ChainError::Custom("base_fee_per_blob_gas does not fit in u64".to_owned())
554        })?;
555        let tx_filter = PendingTxFilter {
556            /*TODO(https://github.com/lambdaclass/ethrex/issues/680): add tip filter */
557            base_fee: context.base_fee_per_gas(),
558            blob_fee: Some(blob_fee),
559            ..Default::default()
560        };
561        let plain_tx_filter = PendingTxFilter {
562            only_plain_txs: true,
563            ..tx_filter
564        };
565        let blob_tx_filter = PendingTxFilter {
566            only_blob_txs: true,
567            ..tx_filter
568        };
569        Ok((
570            // Plain txs
571            TransactionQueue::new(
572                self.mempool.filter_transactions(&plain_tx_filter)?,
573                context.base_fee_per_gas(),
574            )?,
575            // Blob txs
576            TransactionQueue::new(
577                self.mempool.filter_transactions(&blob_tx_filter)?,
578                context.base_fee_per_gas(),
579            )?,
580        ))
581    }
582
583    /// EIP-7872: Computes effective max blobs per block.
584    /// Returns min(protocol_max, user_configured_max).
585    fn effective_max_blobs(&self, context: &PayloadBuildContext) -> usize {
586        let protocol_max = context
587            .chain_config()
588            .get_fork_blob_schedule(context.payload.header.timestamp)
589            .map(|schedule| schedule.max)
590            .unwrap_or_default();
591        match self.options.max_blobs_per_block {
592            Some(user_max) => protocol_max.min(user_max) as usize,
593            None => protocol_max as usize,
594        }
595    }
596
597    /// Fills the payload with transactions taken from the mempool
598    /// Returns the block value
599    pub fn fill_transactions(&self, context: &mut PayloadBuildContext) -> Result<(), ChainError> {
600        let chain_config = context.chain_config();
601        let max_blob_number_per_block = self.effective_max_blobs(context);
602
603        debug!("Fetching transactions from mempool");
604        // Fetch mempool transactions
605        let (mut plain_txs, mut blob_txs) = self.fetch_mempool_transactions(context)?;
606        // Execute and add transactions to payload (if suitable)
607        loop {
608            // Check if we have enough gas to run more transactions
609            if context.remaining_gas < TX_GAS_COST {
610                debug!("No more gas to run transactions");
611                break;
612            };
613            if !blob_txs.is_empty() && context.blobs_bundle.blobs.len() >= max_blob_number_per_block
614            {
615                debug!("No more blob gas to run blob transactions");
616                blob_txs.clear();
617            }
618            // Fetch the next transactions
619            let (head_tx, is_blob) = match (plain_txs.peek(), blob_txs.peek()) {
620                (None, None) => break,
621                (None, Some(tx)) => (tx, true),
622                (Some(tx), None) => (tx, false),
623                (Some(a), Some(b)) if b < a => (b, true),
624                (Some(tx), _) => (tx, false),
625            };
626
627            let txs = if is_blob {
628                &mut blob_txs
629            } else {
630                &mut plain_txs
631            };
632
633            // Check if we have enough gas to run the transaction.
634            // EIP-7825/EIP-8037: for Amsterdam, cap at TX_MAX_GAS_LIMIT since
635            // remaining_gas tracks regular gas only.
636            let tx_gas_reservation = if context.is_amsterdam {
637                head_tx.tx.gas_limit().min(TX_MAX_GAS_LIMIT_AMSTERDAM)
638            } else {
639                head_tx.tx.gas_limit()
640            };
641            if context.remaining_gas < tx_gas_reservation {
642                debug!("Skipping transaction: {}, no gas left", head_tx.tx.hash());
643                // We don't have enough gas left for the transaction, so we skip all txs from this account
644                txs.pop();
645                continue;
646            }
647
648            // Check adding a transaction wouldn't exceed the Osaka block size limit of 10 MiB
649            // if inclusion of the transaction puts the block size over the size limit
650            // we don't add any more txs to the payload.
651            let potential_rlp_block_size =
652                context.payload_size + head_tx.encode_canonical_to_vec().len() as u64;
653            if context
654                .chain_config()
655                .is_osaka_activated(context.payload.header.timestamp)
656                && potential_rlp_block_size > MAX_RLP_BLOCK_SIZE
657            {
658                break;
659            }
660            context.payload_size = potential_rlp_block_size;
661
662            // TODO: maybe fetch hash too when filtering mempool so we don't have to compute it here (we can do this in the same refactor as adding timestamp)
663            let tx_hash = head_tx.tx.hash();
664
665            // Check whether the tx is replay-protected
666            if head_tx.tx.protected() && !chain_config.is_eip155_activated(context.block_number()) {
667                // Ignore replay protected tx & all txs from the sender
668                // Pull transaction from the mempool
669                debug!("Ignoring replay-protected transaction: {}", tx_hash);
670                txs.pop();
671                self.remove_transaction_from_pool(&tx_hash)?;
672                continue;
673            }
674
675            match self.apply_tx_to_payload(head_tx, context) {
676                Ok(()) => txs.shift()?,
677                Err(_) => txs.pop(),
678            }
679        }
680        Ok(())
681    }
682
683    /// Apply a single transaction to the in-progress payload.
684    ///
685    /// Runs the full per-tx pipeline: EIP-8037 2D inclusion check, EIP-7928
686    /// BAL index/checkpoint setup, sender/recipient recording, dispatch to
687    /// blob/plain execution, and on failure rolls the BAL recorder back so
688    /// rejected txs leave no trace. On success the tx is appended to the
689    /// payload body and the receipt to `context.receipts`.
690    ///
691    /// Caller is responsible for mempool bookkeeping (advancing or dropping
692    /// the sender's queue) — this function only mutates the payload context.
693    pub fn apply_tx_to_payload(
694        &self,
695        head: HeadTransaction,
696        context: &mut PayloadBuildContext,
697    ) -> Result<(), ChainError> {
698        let tx_hash = head.tx.hash();
699
700        // EIP-8037 (Amsterdam+, PR #2703): per-tx 2D inclusion check against
701        // running block totals. Run BEFORE we touch the BAL recorder so a
702        // rejected tx doesn't even produce a sender/recipient touch.
703        if context.is_amsterdam
704            && let Err(e) = check_2d_gas_allowance(
705                &head.tx,
706                Fork::Amsterdam,
707                context.block_regular_gas_used,
708                context.block_state_gas_used,
709                context.payload.header.gas_limit,
710            )
711        {
712            debug!("Skipping tx {tx_hash:x}: fails 2D inclusion check: {e}");
713            return Err(e.into());
714        }
715
716        // Set BAL index for this transaction (1-indexed per EIP-7928).
717        // Must happen BEFORE tx_checkpoint: set_bal_index flushes net-zero
718        // filters for the previous (committed) tx, which may insert reads.
719        let tx_index =
720            u32::try_from(context.payload.body.transactions.len() + 1).unwrap_or(u32::MAX);
721        context.vm.set_bal_index(tx_index);
722
723        // EIP-7928: lightweight tx-level checkpoint before trying the tx.
724        // If the tx is rejected, restore so only included txs affect the BAL.
725        // Taken after set_bal_index (which flushes previous tx) but before
726        // this tx's touches, so rejected txs leave no trace.
727        let bal_checkpoint = context
728            .vm
729            .db
730            .bal_recorder
731            .as_ref()
732            .map(|r| r.tx_checkpoint());
733
734        if let Some(recorder) = context.vm.db.bal_recorder_mut() {
735            recorder.record_touched_address(head.tx.sender());
736            if let TxKind::Call(to) = head.to() {
737                recorder.record_touched_address(to);
738            }
739        }
740
741        let receipt = match self.apply_transaction(&head, context) {
742            Ok(receipt) => {
743                metrics!(METRICS_TX.inc_tx_with_type(MetricsTxType(head.tx_type())));
744                receipt
745            }
746            Err(e) => {
747                debug!("Failed to execute transaction: {tx_hash:x}, {e}");
748                metrics!(METRICS_TX.inc_tx_errors(e.to_metric()));
749                if let (Some(recorder), Some(checkpoint)) =
750                    (context.vm.db.bal_recorder_mut(), bal_checkpoint)
751                {
752                    recorder.tx_restore(checkpoint);
753                }
754                return Err(e);
755            }
756        };
757
758        debug!("Adding transaction: {} to payload", tx_hash);
759        context.payload.body.transactions.push(head.into());
760        context.receipts.push(receipt);
761        Ok(())
762    }
763
764    /// Executes the transaction, updates gas-related context values & return the receipt
765    /// The payload build context should have enough remaining gas to cover the transaction's gas_limit
766    fn apply_transaction(
767        &self,
768        head: &HeadTransaction,
769        context: &mut PayloadBuildContext,
770    ) -> Result<Receipt, ChainError> {
771        match **head {
772            Transaction::EIP4844Transaction(_) => self.apply_blob_transaction(head, context),
773            _ => apply_plain_transaction(head, context),
774        }
775    }
776
777    /// Runs a blob transaction, updates the gas count & blob data and returns the receipt
778    fn apply_blob_transaction(
779        &self,
780        head: &HeadTransaction,
781        context: &mut PayloadBuildContext,
782    ) -> Result<Receipt, ChainError> {
783        // Fetch blobs bundle
784        let tx_hash = head.tx.hash();
785        let max_blob_number_per_block = self.effective_max_blobs(context);
786        let Some(blobs_bundle) = self.mempool.get_blobs_bundle(tx_hash)? else {
787            // No blob tx should enter the mempool without its blobs bundle so this is an internal error
788            return Err(
789                StoreError::Custom(format!("No blobs bundle found for blob tx {tx_hash}")).into(),
790            );
791        };
792        if context.blobs_bundle.blobs.len() + blobs_bundle.blobs.len() > max_blob_number_per_block {
793            // This error will only be used for debug tracing
794            return Err(EvmError::Custom("max data blobs reached".to_string()).into());
795        };
796        // Apply transaction
797        let receipt = apply_plain_transaction(head, context)?;
798        // Update context with blob data
799        let prev_blob_gas = context.payload.header.blob_gas_used.unwrap_or_default();
800        context.payload.header.blob_gas_used =
801            Some(prev_blob_gas + (blobs_bundle.blobs.len() * GAS_PER_BLOB as usize) as u64);
802        context.blobs_bundle += blobs_bundle;
803        Ok(receipt)
804    }
805
806    pub fn extract_requests(&self, context: &mut PayloadBuildContext) -> Result<(), EvmError> {
807        if !context
808            .chain_config()
809            .is_prague_activated(context.payload.header.timestamp)
810        {
811            return Ok(());
812        };
813
814        let requests = context
815            .vm
816            .extract_requests(&context.receipts, &context.payload.header)?;
817
818        context.requests = Some(requests.iter().map(|r| r.encode()).collect());
819
820        Ok(())
821    }
822
823    pub fn finalize_payload(&self, context: &mut PayloadBuildContext) -> Result<(), ChainError> {
824        // Take BAL from VM before getting state transitions (which clears state)
825        let block_access_list = context.vm.take_bal();
826
827        let account_updates = context.vm.get_state_transitions()?;
828
829        let ret_acount_updates_list = self
830            .storage
831            .apply_account_updates_batch(context.parent_hash(), &account_updates)?
832            .ok_or(ChainError::ParentStateNotFound)?;
833
834        let state_root = ret_acount_updates_list.state_trie_hash;
835
836        context.payload.header.state_root = state_root;
837        context.payload.header.transactions_root =
838            compute_transactions_root(&context.payload.body.transactions, &NativeCrypto);
839        context.payload.header.receipts_root =
840            compute_receipts_root(&context.receipts, &NativeCrypto);
841        context.payload.header.requests_hash = context
842            .requests
843            .as_ref()
844            .map(|requests| compute_requests_hash(requests));
845        let gas_used = context.gas_used();
846        if context.is_amsterdam {
847            debug!(
848                "EIP-8037 block finalize: gas_used={gas_used} regular={} state={} txs={}",
849                context.block_regular_gas_used,
850                context.block_state_gas_used,
851                context.payload.body.transactions.len(),
852            );
853        }
854        context.payload.header.gas_used = gas_used;
855        context.account_updates = account_updates;
856
857        // Set BAL hash in block header (EIP-7928)
858        context.payload.header.block_access_list_hash =
859            block_access_list.as_ref().map(|bal| bal.compute_hash());
860        context.block_access_list = block_access_list;
861
862        let mut logs = vec![];
863        for receipt in context.receipts.iter().cloned() {
864            for log in receipt.logs {
865                logs.push(log);
866            }
867        }
868
869        context.payload.header.logs_bloom = bloom_from_logs(&logs, &NativeCrypto);
870        Ok(())
871    }
872}
873
874/// Runs a plain (non blob) transaction, updates the gas count and returns the receipt
875pub fn apply_plain_transaction(
876    head: &HeadTransaction,
877    context: &mut PayloadBuildContext,
878) -> Result<Receipt, ChainError> {
879    let (receipt, report) = context.vm.execute_tx(
880        &head.tx,
881        &context.payload.header,
882        &mut context.cumulative_gas_spent,
883        head.tx.sender(),
884    )?;
885
886    // EIP-8037 (Amsterdam+): track regular and state gas separately
887    let tx_state_gas = report.state_gas_used;
888    let tx_regular_gas = report.gas_used.saturating_sub(tx_state_gas);
889
890    // Compute new totals before committing them
891    let new_regular = context
892        .block_regular_gas_used
893        .saturating_add(tx_regular_gas);
894    let new_state = context.block_state_gas_used.saturating_add(tx_state_gas);
895
896    // EIP-8037 (Amsterdam+): post-execution block gas overflow check
897    // Reject the transaction if adding it would cause max(regular, state) to exceed the gas limit
898    if context.is_amsterdam && new_regular.max(new_state) > context.payload.header.gas_limit {
899        // Rollback transaction state before returning error:
900        // 1. Undo DB mutations (nonce, balance, storage, etc.)
901        // 2. Revert cumulative gas counter inflation
902        // This ensures the next transaction executes against clean state.
903        context.vm.undo_last_tx()?;
904        // `cumulative_gas_spent` was bumped inside `execute_tx` above; revert it
905        // now that the tx is being rejected. Use `saturating_sub` as a defensive
906        // guard — cumulative must always dominate this tx's contribution unless
907        // some upstream bug leaks a stale value, in which case we'd rather clamp
908        // to 0 than underflow the counter.
909        debug_assert!(
910            context.cumulative_gas_spent >= report.gas_spent,
911            "cumulative_gas_spent underflow on tx rollback"
912        );
913        context.cumulative_gas_spent = context
914            .cumulative_gas_spent
915            .saturating_sub(report.gas_spent);
916
917        return Err(EvmError::Custom(format!(
918            "block gas limit exceeded (state gas overflow): \
919             max({new_regular}, {new_state}) = {} > gas_limit {}",
920            new_regular.max(new_state),
921            context.payload.header.gas_limit
922        ))
923        .into());
924    }
925
926    // Commit the new totals
927    context.block_regular_gas_used = new_regular;
928    context.block_state_gas_used = new_state;
929
930    if context.is_amsterdam {
931        debug!(
932            "EIP-8037 tx gas: regular={tx_regular_gas} state={tx_state_gas} gas_used={} gas_spent={} block_regular={} block_state={} block_max={}",
933            report.gas_used,
934            report.gas_spent,
935            context.block_regular_gas_used,
936            context.block_state_gas_used,
937            context
938                .block_regular_gas_used
939                .max(context.block_state_gas_used),
940        );
941    }
942
943    // Update remaining_gas for block gas limit checks.
944    // EIP-8037 (Amsterdam+): remaining_gas reflects both regular and state gas dimensions.
945    // For pre-tx heuristic checks, this ensures we reject txs when either dimension is full.
946    if context.is_amsterdam {
947        context.remaining_gas = context
948            .payload
949            .header
950            .gas_limit
951            .saturating_sub(new_regular.max(new_state));
952    } else {
953        context.remaining_gas = context.remaining_gas.saturating_sub(report.gas_used);
954    }
955
956    // Block value uses gas_spent (what the user actually pays) for tip calculation
957    context.block_value += U256::from(report.gas_spent) * head.tip;
958    Ok(receipt)
959}
960
961/// A struct representing suitable mempool transactions waiting to be included in a block
962// TODO: Consider using VecDequeue instead of Vec
963pub struct TransactionQueue {
964    // The first transaction for each account along with its tip, sorted by highest tip
965    heads: Vec<HeadTransaction>,
966    // The remaining txs grouped by account and sorted by nonce
967    txs: FxHashMap<Address, Vec<MempoolTransaction>>,
968    // Base Fee stored for tip calculations
969    base_fee: Option<u64>,
970}
971
972#[derive(Clone, Debug, Eq, PartialEq)]
973pub struct HeadTransaction {
974    pub tx: MempoolTransaction,
975    pub tip: U256,
976}
977
978impl std::ops::Deref for HeadTransaction {
979    type Target = Transaction;
980
981    fn deref(&self) -> &Self::Target {
982        &self.tx
983    }
984}
985
986impl From<HeadTransaction> for Transaction {
987    fn from(val: HeadTransaction) -> Self {
988        val.tx.transaction().clone()
989    }
990}
991
992impl TransactionQueue {
993    /// Creates a new TransactionQueue from a set of transactions grouped by sender and sorted by nonce
994    fn new(
995        mut txs: FxHashMap<Address, Vec<MempoolTransaction>>,
996        base_fee: Option<u64>,
997    ) -> Result<Self, ChainError> {
998        let mut heads = Vec::with_capacity(100);
999        for (_, txs) in txs.iter_mut() {
1000            // Pull the first tx from each list and add it to the heads list
1001            // This should be a newly filtered tx list so we are guaranteed to have a first element
1002            let head_tx = txs.remove(0);
1003            heads.push(HeadTransaction {
1004                // We already ran this method when filtering the transactions from the mempool so it shouldn't fail
1005                tip: head_tx
1006                    .effective_gas_tip(base_fee)
1007                    .ok_or(ChainError::InvalidBlock(
1008                        InvalidBlockError::InvalidTransaction("Attempted to add an invalid transaction to the block. The transaction filter must have failed.".to_owned()),
1009                    ))?,
1010                tx: head_tx,
1011            });
1012        }
1013        // Sort heads by higest tip (and lowest timestamp if tip is equal)
1014        heads.sort();
1015        Ok(TransactionQueue {
1016            heads,
1017            txs,
1018            base_fee,
1019        })
1020    }
1021
1022    /// Remove all transactions from the queue
1023    pub fn clear(&mut self) {
1024        self.heads.clear();
1025        self.txs.clear();
1026    }
1027
1028    /// Returns true if there are no more transactions in the queue
1029    pub fn is_empty(&self) -> bool {
1030        self.heads.is_empty()
1031    }
1032
1033    /// Returns the head transaction with the highest tip
1034    /// If there is more than one transaction with the highest tip, return the one with the lowest timestamp
1035    pub fn peek(&self) -> Option<HeadTransaction> {
1036        self.heads.first().cloned()
1037    }
1038
1039    /// Removes current head transaction and all transactions from the given sender
1040    pub fn pop(&mut self) {
1041        if !self.is_empty() {
1042            let sender = self.heads.remove(0).tx.sender();
1043            self.txs.remove(&sender);
1044        }
1045    }
1046
1047    /// Remove the top transaction
1048    /// Add a tx from the same sender to the head transactions
1049    pub fn shift(&mut self) -> Result<(), ChainError> {
1050        let tx = self.heads.remove(0);
1051        if let Some(txs) = self.txs.get_mut(&tx.tx.sender()) {
1052            // Fetch next head
1053            if !txs.is_empty() {
1054                let head_tx = txs.remove(0);
1055                let head = HeadTransaction {
1056                    // We already ran this method when filtering the transactions from the mempool so it shouldn't fail
1057                    tip: head_tx.effective_gas_tip(self.base_fee).ok_or(
1058                        ChainError::InvalidBlock(
1059                            InvalidBlockError::InvalidTransaction("Attempted to add an invalid transaction to the block. The transaction filter must have failed.".to_owned()),
1060                        ),
1061                    )?,
1062                    tx: head_tx,
1063                };
1064                // Insert head into heads list while maintaing order
1065                let index = match self.heads.binary_search(&head) {
1066                    Ok(index) => index, // Same ordering shouldn't be possible when adding timestamps
1067                    Err(index) => index,
1068                };
1069                self.heads.insert(index, head);
1070            }
1071        }
1072        Ok(())
1073    }
1074}
1075
1076// Orders transactions by highest tip, if tip is equal, orders by lowest timestamp
1077impl Ord for HeadTransaction {
1078    fn cmp(&self, other: &Self) -> Ordering {
1079        match (self.tx_type(), other.tx_type()) {
1080            (TxType::Privileged, TxType::Privileged) => return self.nonce().cmp(&other.nonce()),
1081            (TxType::Privileged, _) => return Ordering::Less,
1082            (_, TxType::Privileged) => return Ordering::Greater,
1083            _ => (),
1084        };
1085        match other.tip.cmp(&self.tip) {
1086            Ordering::Equal => self.tx.time().cmp(&other.tx.time()),
1087            ordering => ordering,
1088        }
1089    }
1090}
1091
1092impl PartialOrd for HeadTransaction {
1093    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1094        Some(self.cmp(other))
1095    }
1096}