1use std::{
2 cmp::{Ordering, max},
3 ops::Div,
4 sync::Arc,
5 time::{Duration, Instant},
6};
7
8use rustc_hash::FxHashMap;
9
10use ethrex_common::{
11 Address, Bloom, Bytes, H256, U256,
12 constants::{
13 DEFAULT_OMMERS_HASH, DEFAULT_REQUESTS_HASH, GAS_PER_BLOB, MAX_RLP_BLOCK_SIZE,
14 TX_MAX_GAS_LIMIT_AMSTERDAM,
15 },
16 types::{
17 AccountUpdate, BlobsBundle, Block, BlockBody, BlockHash, BlockHeader, BlockNumber,
18 ChainConfig, Fork, MempoolTransaction, Receipt, Transaction, TxKind, TxType, Withdrawal,
19 block_access_list::BlockAccessList,
20 bloom_from_logs, calc_excess_blob_gas, calculate_base_fee_per_blob_gas,
21 calculate_base_fee_per_gas, compute_receipts_root, compute_transactions_root,
22 compute_withdrawals_root,
23 requests::{EncodedRequests, compute_requests_hash},
24 },
25};
26
27use ethrex_crypto::NativeCrypto;
28use ethrex_crypto::keccak::Keccak256;
29use ethrex_vm::{Evm, EvmError, check_2d_gas_allowance};
30
31use ethrex_rlp::encode::RLPEncode;
32use ethrex_storage::{Store, error::StoreError};
33
34use ethrex_metrics::metrics;
35
36#[cfg(feature = "metrics")]
37use ethrex_metrics::blocks::METRICS_BLOCKS;
38#[cfg(feature = "metrics")]
39use ethrex_metrics::transactions::{METRICS_TX, MetricsTxType};
40use tokio_util::sync::CancellationToken;
41
42use crate::{
43 Blockchain, BlockchainType, MAX_PAYLOADS,
44 constants::{GAS_LIMIT_BOUND_DIVISOR, MIN_GAS_LIMIT, TX_GAS_COST},
45 error::{ChainError, InvalidBlockError},
46 mempool::PendingTxFilter,
47 new_evm,
48 vm::StoreVmDatabase,
49};
50
51use thiserror::Error;
52use tracing::{debug, warn};
53
54#[derive(Debug)]
55pub struct PayloadBuildTask {
56 task: tokio::task::JoinHandle<Result<PayloadBuildResult, ChainError>>,
57 cancel: CancellationToken,
58}
59
60#[derive(Debug)]
61pub enum PayloadOrTask {
62 Payload(Box<PayloadBuildResult>),
63 Task(PayloadBuildTask),
64}
65
66impl PayloadBuildTask {
67 pub async fn finish(self) -> Result<PayloadBuildResult, ChainError> {
69 self.cancel.cancel();
70 self.task
71 .await
72 .map_err(|_| ChainError::Custom("Failed to join task".to_string()))?
73 }
74}
75
76impl PayloadOrTask {
77 pub async fn to_payload(self) -> Result<Self, ChainError> {
80 Ok(match self {
81 PayloadOrTask::Payload(_) => self,
82 PayloadOrTask::Task(task) => PayloadOrTask::Payload(Box::new(task.finish().await?)),
83 })
84 }
85}
86
87pub struct BuildPayloadArgs {
88 pub parent: BlockHash,
89 pub timestamp: u64,
90 pub fee_recipient: Address,
91 pub random: H256,
92 pub withdrawals: Option<Vec<Withdrawal>>,
93 pub beacon_root: Option<H256>,
94 pub slot_number: Option<u64>,
95 pub version: u8,
96 pub elasticity_multiplier: u64,
97 pub gas_ceil: u64,
98}
99
100#[derive(Debug, Error)]
101pub enum BuildPayloadArgsError {
102 #[error("Payload hashed has wrong size")]
103 FailedToConvertPayload,
104}
105
106impl BuildPayloadArgs {
107 pub fn id(&self) -> Result<u64, BuildPayloadArgsError> {
109 let mut hasher = Keccak256::new();
110 hasher.update(self.parent);
111 hasher.update(self.timestamp.to_be_bytes());
112 hasher.update(self.random);
113 hasher.update(self.fee_recipient);
114 if let Some(withdrawals) = &self.withdrawals {
115 hasher.update(withdrawals.encode_to_vec());
116 }
117 if let Some(beacon_root) = self.beacon_root {
118 hasher.update(beacon_root);
119 }
120 let res = &mut hasher.finalize()[..8];
121 res[0] = self.version;
122 Ok(u64::from_be_bytes(res.try_into().map_err(|_| {
123 BuildPayloadArgsError::FailedToConvertPayload
124 })?))
125 }
126}
127
128pub fn create_payload(
131 args: &BuildPayloadArgs,
132 storage: &Store,
133 extra_data: Bytes,
134) -> Result<Block, ChainError> {
135 let parent_block = storage
136 .get_block_header_by_hash(args.parent)?
137 .ok_or_else(|| ChainError::ParentNotFound)?;
138 let chain_config = storage.get_chain_config();
139 let fork = chain_config.fork(args.timestamp);
140 let gas_limit = calc_gas_limit(parent_block.gas_limit, args.gas_ceil);
141 let excess_blob_gas = chain_config
142 .get_fork_blob_schedule(args.timestamp)
143 .map(|schedule| calc_excess_blob_gas(&parent_block, schedule, fork));
144
145 let header = BlockHeader {
146 parent_hash: args.parent,
147 ommers_hash: *DEFAULT_OMMERS_HASH,
148 coinbase: args.fee_recipient,
149 state_root: parent_block.state_root,
150 transactions_root: compute_transactions_root(&[], &NativeCrypto),
151 receipts_root: compute_receipts_root(&[], &NativeCrypto),
152 logs_bloom: Bloom::default(),
153 difficulty: U256::zero(),
154 number: parent_block.number.saturating_add(1),
155 gas_limit,
156 gas_used: 0,
157 timestamp: args.timestamp,
158 extra_data,
159 prev_randao: args.random,
160 nonce: 0,
161 base_fee_per_gas: calculate_base_fee_per_gas(
162 gas_limit,
163 parent_block.gas_limit,
164 parent_block.gas_used,
165 parent_block.base_fee_per_gas.unwrap_or_default(),
166 args.elasticity_multiplier,
167 ),
168 withdrawals_root: chain_config
169 .is_shanghai_activated(args.timestamp)
170 .then_some(compute_withdrawals_root(
171 args.withdrawals.as_ref().unwrap_or(&Vec::new()),
172 &NativeCrypto,
173 )),
174 blob_gas_used: chain_config
175 .is_cancun_activated(args.timestamp)
176 .then_some(0),
177 excess_blob_gas,
178 parent_beacon_block_root: args.beacon_root,
179 requests_hash: chain_config
180 .is_prague_activated(args.timestamp)
181 .then_some(*DEFAULT_REQUESTS_HASH),
182 slot_number: args.slot_number,
183 ..Default::default()
184 };
185
186 let body = BlockBody {
187 transactions: Vec::new(),
188 ommers: Vec::new(),
189 withdrawals: args.withdrawals.clone(),
190 };
191
192 Ok(Block::new(header, body))
194}
195
196pub fn calc_gas_limit(parent_gas_limit: u64, builder_gas_ceil: u64) -> u64 {
197 let delta = parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR - 1;
199 let mut limit = parent_gas_limit;
200 let desired_limit = max(builder_gas_ceil, MIN_GAS_LIMIT);
201 if limit < desired_limit {
202 limit = parent_gas_limit + delta;
203 if limit > desired_limit {
204 limit = desired_limit
205 }
206 return limit;
207 }
208 if limit > desired_limit {
209 limit = parent_gas_limit - delta;
210 if limit < desired_limit {
211 limit = desired_limit
212 }
213 }
214 limit
215}
216
217#[derive(Clone)]
218pub struct PayloadBuildContext {
219 pub payload: Block,
220 pub remaining_gas: u64,
221 pub cumulative_gas_spent: u64,
224 pub block_regular_gas_used: u64,
226 pub block_state_gas_used: u64,
228 pub is_amsterdam: bool,
230 pub receipts: Vec<Receipt>,
231 pub requests: Option<Vec<EncodedRequests>>,
232 pub block_value: U256,
233 base_fee_per_blob_gas: U256,
234 pub blobs_bundle: BlobsBundle,
235 pub store: Store,
236 pub vm: Evm,
237 pub account_updates: Vec<AccountUpdate>,
238 pub payload_size: u64,
239 pub block_access_list: Option<BlockAccessList>,
241}
242
243impl PayloadBuildContext {
244 pub fn new(
245 payload: Block,
246 storage: &Store,
247 blockchain_type: &BlockchainType,
248 ) -> Result<Self, EvmError> {
249 let config = storage.get_chain_config();
250 let base_fee_per_blob_gas = calculate_base_fee_per_blob_gas(
251 payload.header.excess_blob_gas.unwrap_or_default(),
252 config
253 .get_fork_blob_schedule(payload.header.timestamp)
254 .map(|schedule| schedule.base_fee_update_fraction)
255 .unwrap_or_default(),
256 );
257
258 let parent_header = storage
259 .get_block_header_by_hash(payload.header.parent_hash)
260 .map_err(|e| EvmError::DB(e.to_string()))?
261 .ok_or_else(|| EvmError::DB("parent header not found".to_string()))?;
262 let vm_db = StoreVmDatabase::new(storage.clone(), parent_header)?;
263 let mut vm = new_evm(blockchain_type, vm_db)?;
264
265 if config.is_amsterdam_activated(payload.header.timestamp) {
267 vm.enable_bal_recording();
268 vm.set_bal_index(0);
270 }
271
272 let is_amsterdam = config.is_amsterdam_activated(payload.header.timestamp);
273 let payload_size = payload.length() as u64;
274 Ok(PayloadBuildContext {
275 remaining_gas: payload.header.gas_limit,
276 cumulative_gas_spent: 0,
277 block_regular_gas_used: 0,
278 block_state_gas_used: 0,
279 is_amsterdam,
280 receipts: vec![],
281 requests: config
282 .is_prague_activated(payload.header.timestamp)
283 .then_some(Vec::new()),
284 block_value: U256::zero(),
285 base_fee_per_blob_gas,
286 payload,
287 blobs_bundle: BlobsBundle::default(),
288 store: storage.clone(),
289 vm,
290 account_updates: Vec::new(),
291 payload_size,
292 block_access_list: None,
293 })
294 }
295
296 pub fn gas_used(&self) -> u64 {
297 if self.is_amsterdam {
298 self.block_regular_gas_used.max(self.block_state_gas_used)
300 } else {
301 self.payload.header.gas_limit - self.remaining_gas
302 }
303 }
304}
305
306impl PayloadBuildContext {
307 fn parent_hash(&self) -> BlockHash {
308 self.payload.header.parent_hash
309 }
310
311 pub fn block_number(&self) -> BlockNumber {
312 self.payload.header.number
313 }
314
315 fn chain_config(&self) -> ChainConfig {
316 self.store.get_chain_config()
317 }
318
319 fn base_fee_per_gas(&self) -> Option<u64> {
320 self.payload.header.base_fee_per_gas
321 }
322}
323
324#[derive(Debug, Clone)]
325pub struct PayloadBuildResult {
326 pub blobs_bundle: BlobsBundle,
327 pub block_value: U256,
328 pub receipts: Vec<Receipt>,
329 pub requests: Vec<EncodedRequests>,
330 pub account_updates: Vec<AccountUpdate>,
331 pub payload: Block,
332 pub block_access_list: Option<BlockAccessList>,
334}
335
336impl From<PayloadBuildContext> for PayloadBuildResult {
337 fn from(value: PayloadBuildContext) -> Self {
338 let PayloadBuildContext {
339 blobs_bundle,
340 block_value,
341 requests,
342 receipts,
343 account_updates,
344 payload,
345 block_access_list,
346 ..
347 } = value;
348
349 Self {
350 blobs_bundle,
351 block_value,
352 requests: requests.unwrap_or_default(),
353 receipts,
354 account_updates,
355 payload,
356 block_access_list,
357 }
358 }
359}
360
361impl Blockchain {
362 pub async fn get_payload(&self, payload_id: u64) -> Result<PayloadBuildResult, ChainError> {
365 let mut payloads = self.payloads.lock().await;
366 let idx = payloads
368 .iter()
369 .position(|(id, _)| id == &payload_id)
370 .ok_or(ChainError::UnknownPayload)?;
371 let finished_payload = (payload_id, payloads.remove(idx).1.to_payload().await?);
372 payloads.insert(idx, finished_payload);
373 match &payloads[idx].1 {
375 PayloadOrTask::Payload(payload) => Ok(*payload.clone()),
376 _ => unreachable!("we already converted the payload into a finished version"),
377 }
378 }
379
380 pub async fn initiate_payload_build(self: Arc<Blockchain>, payload: Block, payload_id: u64) {
383 let self_clone = self.clone();
384 let cancel_token = CancellationToken::new();
385 let cancel_token_clone = cancel_token.clone();
386 let payload_build_task = tokio::task::spawn(async move {
387 self_clone
388 .build_payload_loop(payload, cancel_token_clone)
389 .await
390 });
391 let mut payloads = self.payloads.lock().await;
392 if payloads.len() >= MAX_PAYLOADS {
393 payloads.remove(0);
395 }
396 payloads.push((
397 payload_id,
398 PayloadOrTask::Task(PayloadBuildTask {
399 task: payload_build_task,
400 cancel: cancel_token,
401 }),
402 ));
403 }
404
405 pub async fn build_payload_loop(
408 self: Arc<Blockchain>,
409 payload: Block,
410 cancel_token: CancellationToken,
411 ) -> Result<PayloadBuildResult, ChainError> {
412 let start = Instant::now();
413 const SECONDS_PER_SLOT: Duration = Duration::from_secs(12);
414 let mut last_built_seq = self.mempool.tx_seq();
419 let mut res = self.build_payload(payload.clone())?;
420 while start.elapsed() < SECONDS_PER_SLOT && !cancel_token.is_cancelled() {
421 let remaining = SECONDS_PER_SLOT.saturating_sub(start.elapsed());
423 let notified = self.mempool.tx_added().notified();
424 tokio::select! {
425 _ = notified => {}
426 _ = cancel_token.cancelled() => break,
427 _ = tokio::time::sleep(remaining) => break,
428 }
429 let payload = payload.clone();
430 let self_clone = self.clone();
431 let seq_before = self.mempool.tx_seq();
432 let building_task =
433 tokio::task::spawn_blocking(move || self_clone.build_payload(payload));
434 match cancel_token.run_until_cancelled(building_task).await {
438 Some(Ok(current_res)) => {
439 res = current_res?;
440 last_built_seq = seq_before;
441 }
442 Some(Err(err)) => {
443 warn!(%err, "Payload-building task panicked");
444 }
445 None => {}
446 }
447 }
448
449 if self.mempool.tx_seq() > last_built_seq {
455 let blockchain = self.clone();
456 match tokio::task::spawn_blocking(move || blockchain.build_payload(payload)).await {
457 Ok(Ok(final_res)) => res = final_res,
458 Ok(Err(err)) => {
459 warn!(%err, "Final payload rebuild failed; returning previous result")
460 }
461 Err(err) => warn!(%err, "Final payload rebuild task panicked"),
462 }
463 }
464
465 Ok(res)
466 }
467
468 pub fn build_payload(&self, payload: Block) -> Result<PayloadBuildResult, ChainError> {
470 let since = Instant::now();
471
472 debug!("Building payload");
473 let base_fee = payload.header.base_fee_per_gas.unwrap_or_default();
474 let mut context = PayloadBuildContext::new(payload, &self.storage, &self.options.r#type)?;
475
476 if let BlockchainType::L1 = self.options.r#type {
477 self.apply_system_operations(&mut context)?;
478 }
479 self.fill_transactions(&mut context)?;
480 if context
483 .chain_config()
484 .is_amsterdam_activated(context.payload.header.timestamp)
485 {
486 let post_tx_index =
487 u32::try_from(context.payload.body.transactions.len() + 1).unwrap_or(u32::MAX);
488 context.vm.set_bal_index(post_tx_index);
489 if let Some(recorder) = context.vm.db.bal_recorder_mut()
491 && let Some(withdrawals) = &context.payload.body.withdrawals
492 {
493 recorder.extend_touched_addresses(withdrawals.iter().map(|w| w.address));
494 }
495 }
496 self.extract_requests(&mut context)?;
497 self.apply_withdrawals(&mut context)?;
498 self.finalize_payload(&mut context)?;
499
500 let interval = Instant::now().duration_since(since).as_millis();
501
502 tracing::debug!(
503 "[METRIC] BUILDING PAYLOAD TOOK: {interval} ms, base fee {}",
504 base_fee
505 );
506 metrics!(METRICS_BLOCKS.set_block_building_ms(interval as i64));
507 metrics!(METRICS_BLOCKS.set_block_building_base_fee(base_fee as i64));
508 let gas_used = context.gas_used();
509 if gas_used > 0 {
510 let as_gigas = (gas_used as f64).div(10_f64.powf(9_f64));
511
512 if interval != 0 {
513 let throughput = (as_gigas) / (interval as f64) * 1000_f64;
514 metrics!(METRICS_BLOCKS.set_latest_gigagas_block_building(throughput));
515
516 tracing::debug!(
517 "[METRIC] BLOCK BUILDING THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs"
518 );
519 }
520 }
521
522 Ok(context.into())
523 }
524
525 pub fn apply_withdrawals(&self, context: &mut PayloadBuildContext) -> Result<(), EvmError> {
526 let binding = Vec::new();
527 let withdrawals = context
528 .payload
529 .body
530 .withdrawals
531 .as_ref()
532 .unwrap_or(&binding);
533 context.vm.process_withdrawals(withdrawals)
534 }
535
536 pub fn apply_system_operations(
540 &self,
541 context: &mut PayloadBuildContext,
542 ) -> Result<(), EvmError> {
543 context.vm.apply_system_calls(&context.payload.header)
544 }
545
546 pub fn fetch_mempool_transactions(
549 &self,
550 context: &mut PayloadBuildContext,
551 ) -> Result<(TransactionQueue, TransactionQueue), ChainError> {
552 let blob_fee: u64 = context.base_fee_per_blob_gas.try_into().map_err(|_| {
553 ChainError::Custom("base_fee_per_blob_gas does not fit in u64".to_owned())
554 })?;
555 let tx_filter = PendingTxFilter {
556 base_fee: context.base_fee_per_gas(),
558 blob_fee: Some(blob_fee),
559 ..Default::default()
560 };
561 let plain_tx_filter = PendingTxFilter {
562 only_plain_txs: true,
563 ..tx_filter
564 };
565 let blob_tx_filter = PendingTxFilter {
566 only_blob_txs: true,
567 ..tx_filter
568 };
569 Ok((
570 TransactionQueue::new(
572 self.mempool.filter_transactions(&plain_tx_filter)?,
573 context.base_fee_per_gas(),
574 )?,
575 TransactionQueue::new(
577 self.mempool.filter_transactions(&blob_tx_filter)?,
578 context.base_fee_per_gas(),
579 )?,
580 ))
581 }
582
583 fn effective_max_blobs(&self, context: &PayloadBuildContext) -> usize {
586 let protocol_max = context
587 .chain_config()
588 .get_fork_blob_schedule(context.payload.header.timestamp)
589 .map(|schedule| schedule.max)
590 .unwrap_or_default();
591 match self.options.max_blobs_per_block {
592 Some(user_max) => protocol_max.min(user_max) as usize,
593 None => protocol_max as usize,
594 }
595 }
596
597 pub fn fill_transactions(&self, context: &mut PayloadBuildContext) -> Result<(), ChainError> {
600 let chain_config = context.chain_config();
601 let max_blob_number_per_block = self.effective_max_blobs(context);
602
603 debug!("Fetching transactions from mempool");
604 let (mut plain_txs, mut blob_txs) = self.fetch_mempool_transactions(context)?;
606 loop {
608 if context.remaining_gas < TX_GAS_COST {
610 debug!("No more gas to run transactions");
611 break;
612 };
613 if !blob_txs.is_empty() && context.blobs_bundle.blobs.len() >= max_blob_number_per_block
614 {
615 debug!("No more blob gas to run blob transactions");
616 blob_txs.clear();
617 }
618 let (head_tx, is_blob) = match (plain_txs.peek(), blob_txs.peek()) {
620 (None, None) => break,
621 (None, Some(tx)) => (tx, true),
622 (Some(tx), None) => (tx, false),
623 (Some(a), Some(b)) if b < a => (b, true),
624 (Some(tx), _) => (tx, false),
625 };
626
627 let txs = if is_blob {
628 &mut blob_txs
629 } else {
630 &mut plain_txs
631 };
632
633 let tx_gas_reservation = if context.is_amsterdam {
637 head_tx.tx.gas_limit().min(TX_MAX_GAS_LIMIT_AMSTERDAM)
638 } else {
639 head_tx.tx.gas_limit()
640 };
641 if context.remaining_gas < tx_gas_reservation {
642 debug!("Skipping transaction: {}, no gas left", head_tx.tx.hash());
643 txs.pop();
645 continue;
646 }
647
648 let potential_rlp_block_size =
652 context.payload_size + head_tx.encode_canonical_to_vec().len() as u64;
653 if context
654 .chain_config()
655 .is_osaka_activated(context.payload.header.timestamp)
656 && potential_rlp_block_size > MAX_RLP_BLOCK_SIZE
657 {
658 break;
659 }
660 context.payload_size = potential_rlp_block_size;
661
662 let tx_hash = head_tx.tx.hash();
664
665 if head_tx.tx.protected() && !chain_config.is_eip155_activated(context.block_number()) {
667 debug!("Ignoring replay-protected transaction: {}", tx_hash);
670 txs.pop();
671 self.remove_transaction_from_pool(&tx_hash)?;
672 continue;
673 }
674
675 match self.apply_tx_to_payload(head_tx, context) {
676 Ok(()) => txs.shift()?,
677 Err(_) => txs.pop(),
678 }
679 }
680 Ok(())
681 }
682
683 pub fn apply_tx_to_payload(
694 &self,
695 head: HeadTransaction,
696 context: &mut PayloadBuildContext,
697 ) -> Result<(), ChainError> {
698 let tx_hash = head.tx.hash();
699
700 if context.is_amsterdam
704 && let Err(e) = check_2d_gas_allowance(
705 &head.tx,
706 Fork::Amsterdam,
707 context.block_regular_gas_used,
708 context.block_state_gas_used,
709 context.payload.header.gas_limit,
710 )
711 {
712 debug!("Skipping tx {tx_hash:x}: fails 2D inclusion check: {e}");
713 return Err(e.into());
714 }
715
716 let tx_index =
720 u32::try_from(context.payload.body.transactions.len() + 1).unwrap_or(u32::MAX);
721 context.vm.set_bal_index(tx_index);
722
723 let bal_checkpoint = context
728 .vm
729 .db
730 .bal_recorder
731 .as_ref()
732 .map(|r| r.tx_checkpoint());
733
734 if let Some(recorder) = context.vm.db.bal_recorder_mut() {
735 recorder.record_touched_address(head.tx.sender());
736 if let TxKind::Call(to) = head.to() {
737 recorder.record_touched_address(to);
738 }
739 }
740
741 let receipt = match self.apply_transaction(&head, context) {
742 Ok(receipt) => {
743 metrics!(METRICS_TX.inc_tx_with_type(MetricsTxType(head.tx_type())));
744 receipt
745 }
746 Err(e) => {
747 debug!("Failed to execute transaction: {tx_hash:x}, {e}");
748 metrics!(METRICS_TX.inc_tx_errors(e.to_metric()));
749 if let (Some(recorder), Some(checkpoint)) =
750 (context.vm.db.bal_recorder_mut(), bal_checkpoint)
751 {
752 recorder.tx_restore(checkpoint);
753 }
754 return Err(e);
755 }
756 };
757
758 debug!("Adding transaction: {} to payload", tx_hash);
759 context.payload.body.transactions.push(head.into());
760 context.receipts.push(receipt);
761 Ok(())
762 }
763
764 fn apply_transaction(
767 &self,
768 head: &HeadTransaction,
769 context: &mut PayloadBuildContext,
770 ) -> Result<Receipt, ChainError> {
771 match **head {
772 Transaction::EIP4844Transaction(_) => self.apply_blob_transaction(head, context),
773 _ => apply_plain_transaction(head, context),
774 }
775 }
776
777 fn apply_blob_transaction(
779 &self,
780 head: &HeadTransaction,
781 context: &mut PayloadBuildContext,
782 ) -> Result<Receipt, ChainError> {
783 let tx_hash = head.tx.hash();
785 let max_blob_number_per_block = self.effective_max_blobs(context);
786 let Some(blobs_bundle) = self.mempool.get_blobs_bundle(tx_hash)? else {
787 return Err(
789 StoreError::Custom(format!("No blobs bundle found for blob tx {tx_hash}")).into(),
790 );
791 };
792 if context.blobs_bundle.blobs.len() + blobs_bundle.blobs.len() > max_blob_number_per_block {
793 return Err(EvmError::Custom("max data blobs reached".to_string()).into());
795 };
796 let receipt = apply_plain_transaction(head, context)?;
798 let prev_blob_gas = context.payload.header.blob_gas_used.unwrap_or_default();
800 context.payload.header.blob_gas_used =
801 Some(prev_blob_gas + (blobs_bundle.blobs.len() * GAS_PER_BLOB as usize) as u64);
802 context.blobs_bundle += blobs_bundle;
803 Ok(receipt)
804 }
805
806 pub fn extract_requests(&self, context: &mut PayloadBuildContext) -> Result<(), EvmError> {
807 if !context
808 .chain_config()
809 .is_prague_activated(context.payload.header.timestamp)
810 {
811 return Ok(());
812 };
813
814 let requests = context
815 .vm
816 .extract_requests(&context.receipts, &context.payload.header)?;
817
818 context.requests = Some(requests.iter().map(|r| r.encode()).collect());
819
820 Ok(())
821 }
822
823 pub fn finalize_payload(&self, context: &mut PayloadBuildContext) -> Result<(), ChainError> {
824 let block_access_list = context.vm.take_bal();
826
827 let account_updates = context.vm.get_state_transitions()?;
828
829 let ret_acount_updates_list = self
830 .storage
831 .apply_account_updates_batch(context.parent_hash(), &account_updates)?
832 .ok_or(ChainError::ParentStateNotFound)?;
833
834 let state_root = ret_acount_updates_list.state_trie_hash;
835
836 context.payload.header.state_root = state_root;
837 context.payload.header.transactions_root =
838 compute_transactions_root(&context.payload.body.transactions, &NativeCrypto);
839 context.payload.header.receipts_root =
840 compute_receipts_root(&context.receipts, &NativeCrypto);
841 context.payload.header.requests_hash = context
842 .requests
843 .as_ref()
844 .map(|requests| compute_requests_hash(requests));
845 let gas_used = context.gas_used();
846 if context.is_amsterdam {
847 debug!(
848 "EIP-8037 block finalize: gas_used={gas_used} regular={} state={} txs={}",
849 context.block_regular_gas_used,
850 context.block_state_gas_used,
851 context.payload.body.transactions.len(),
852 );
853 }
854 context.payload.header.gas_used = gas_used;
855 context.account_updates = account_updates;
856
857 context.payload.header.block_access_list_hash =
859 block_access_list.as_ref().map(|bal| bal.compute_hash());
860 context.block_access_list = block_access_list;
861
862 let mut logs = vec![];
863 for receipt in context.receipts.iter().cloned() {
864 for log in receipt.logs {
865 logs.push(log);
866 }
867 }
868
869 context.payload.header.logs_bloom = bloom_from_logs(&logs, &NativeCrypto);
870 Ok(())
871 }
872}
873
874pub fn apply_plain_transaction(
876 head: &HeadTransaction,
877 context: &mut PayloadBuildContext,
878) -> Result<Receipt, ChainError> {
879 let (receipt, report) = context.vm.execute_tx(
880 &head.tx,
881 &context.payload.header,
882 &mut context.cumulative_gas_spent,
883 head.tx.sender(),
884 )?;
885
886 let tx_state_gas = report.state_gas_used;
888 let tx_regular_gas = report.gas_used.saturating_sub(tx_state_gas);
889
890 let new_regular = context
892 .block_regular_gas_used
893 .saturating_add(tx_regular_gas);
894 let new_state = context.block_state_gas_used.saturating_add(tx_state_gas);
895
896 if context.is_amsterdam && new_regular.max(new_state) > context.payload.header.gas_limit {
899 context.vm.undo_last_tx()?;
904 debug_assert!(
910 context.cumulative_gas_spent >= report.gas_spent,
911 "cumulative_gas_spent underflow on tx rollback"
912 );
913 context.cumulative_gas_spent = context
914 .cumulative_gas_spent
915 .saturating_sub(report.gas_spent);
916
917 return Err(EvmError::Custom(format!(
918 "block gas limit exceeded (state gas overflow): \
919 max({new_regular}, {new_state}) = {} > gas_limit {}",
920 new_regular.max(new_state),
921 context.payload.header.gas_limit
922 ))
923 .into());
924 }
925
926 context.block_regular_gas_used = new_regular;
928 context.block_state_gas_used = new_state;
929
930 if context.is_amsterdam {
931 debug!(
932 "EIP-8037 tx gas: regular={tx_regular_gas} state={tx_state_gas} gas_used={} gas_spent={} block_regular={} block_state={} block_max={}",
933 report.gas_used,
934 report.gas_spent,
935 context.block_regular_gas_used,
936 context.block_state_gas_used,
937 context
938 .block_regular_gas_used
939 .max(context.block_state_gas_used),
940 );
941 }
942
943 if context.is_amsterdam {
947 context.remaining_gas = context
948 .payload
949 .header
950 .gas_limit
951 .saturating_sub(new_regular.max(new_state));
952 } else {
953 context.remaining_gas = context.remaining_gas.saturating_sub(report.gas_used);
954 }
955
956 context.block_value += U256::from(report.gas_spent) * head.tip;
958 Ok(receipt)
959}
960
961pub struct TransactionQueue {
964 heads: Vec<HeadTransaction>,
966 txs: FxHashMap<Address, Vec<MempoolTransaction>>,
968 base_fee: Option<u64>,
970}
971
972#[derive(Clone, Debug, Eq, PartialEq)]
973pub struct HeadTransaction {
974 pub tx: MempoolTransaction,
975 pub tip: U256,
976}
977
978impl std::ops::Deref for HeadTransaction {
979 type Target = Transaction;
980
981 fn deref(&self) -> &Self::Target {
982 &self.tx
983 }
984}
985
986impl From<HeadTransaction> for Transaction {
987 fn from(val: HeadTransaction) -> Self {
988 val.tx.transaction().clone()
989 }
990}
991
992impl TransactionQueue {
993 fn new(
995 mut txs: FxHashMap<Address, Vec<MempoolTransaction>>,
996 base_fee: Option<u64>,
997 ) -> Result<Self, ChainError> {
998 let mut heads = Vec::with_capacity(100);
999 for (_, txs) in txs.iter_mut() {
1000 let head_tx = txs.remove(0);
1003 heads.push(HeadTransaction {
1004 tip: head_tx
1006 .effective_gas_tip(base_fee)
1007 .ok_or(ChainError::InvalidBlock(
1008 InvalidBlockError::InvalidTransaction("Attempted to add an invalid transaction to the block. The transaction filter must have failed.".to_owned()),
1009 ))?,
1010 tx: head_tx,
1011 });
1012 }
1013 heads.sort();
1015 Ok(TransactionQueue {
1016 heads,
1017 txs,
1018 base_fee,
1019 })
1020 }
1021
1022 pub fn clear(&mut self) {
1024 self.heads.clear();
1025 self.txs.clear();
1026 }
1027
1028 pub fn is_empty(&self) -> bool {
1030 self.heads.is_empty()
1031 }
1032
1033 pub fn peek(&self) -> Option<HeadTransaction> {
1036 self.heads.first().cloned()
1037 }
1038
1039 pub fn pop(&mut self) {
1041 if !self.is_empty() {
1042 let sender = self.heads.remove(0).tx.sender();
1043 self.txs.remove(&sender);
1044 }
1045 }
1046
1047 pub fn shift(&mut self) -> Result<(), ChainError> {
1050 let tx = self.heads.remove(0);
1051 if let Some(txs) = self.txs.get_mut(&tx.tx.sender()) {
1052 if !txs.is_empty() {
1054 let head_tx = txs.remove(0);
1055 let head = HeadTransaction {
1056 tip: head_tx.effective_gas_tip(self.base_fee).ok_or(
1058 ChainError::InvalidBlock(
1059 InvalidBlockError::InvalidTransaction("Attempted to add an invalid transaction to the block. The transaction filter must have failed.".to_owned()),
1060 ),
1061 )?,
1062 tx: head_tx,
1063 };
1064 let index = match self.heads.binary_search(&head) {
1066 Ok(index) => index, Err(index) => index,
1068 };
1069 self.heads.insert(index, head);
1070 }
1071 }
1072 Ok(())
1073 }
1074}
1075
1076impl Ord for HeadTransaction {
1078 fn cmp(&self, other: &Self) -> Ordering {
1079 match (self.tx_type(), other.tx_type()) {
1080 (TxType::Privileged, TxType::Privileged) => return self.nonce().cmp(&other.nonce()),
1081 (TxType::Privileged, _) => return Ordering::Less,
1082 (_, TxType::Privileged) => return Ordering::Greater,
1083 _ => (),
1084 };
1085 match other.tip.cmp(&self.tip) {
1086 Ordering::Equal => self.tx.time().cmp(&other.tx.time()),
1087 ordering => ordering,
1088 }
1089 }
1090}
1091
1092impl PartialOrd for HeadTransaction {
1093 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1094 Some(self.cmp(other))
1095 }
1096}