Skip to main content

ethrex_blockchain/
mempool.rs

1use std::{
2    cmp::Reverse,
3    collections::{BTreeMap, VecDeque, hash_map::Entry},
4    sync::RwLock,
5    sync::atomic::{AtomicU64, Ordering},
6    time::{Duration, Instant},
7};
8
9use rustc_hash::{FxHashMap, FxHashSet};
10
11use crate::{
12    constants::{
13        TX_ACCESS_LIST_ADDRESS_GAS, TX_ACCESS_LIST_STORAGE_KEY_GAS, TX_CREATE_GAS_COST,
14        TX_DATA_NON_ZERO_GAS, TX_DATA_NON_ZERO_GAS_EIP2028, TX_DATA_ZERO_GAS_COST, TX_GAS_COST,
15        TX_INIT_CODE_WORD_GAS_COST,
16    },
17    error::MempoolError,
18};
19use ethrex_common::{
20    Address, H160, H256, U256,
21    types::{
22        BlobTuple, BlobsBundle, BlockHeader, ChainConfig, MempoolTransaction, Transaction, TxType,
23        kzg_commitment_to_versioned_hash,
24    },
25};
26use ethrex_storage::error::StoreError;
27use ethrex_vm::{intrinsic_gas_dimensions, intrinsic_gas_floor};
28use tracing::warn;
29
30/// Maximum number of alternate announcers tracked per hash. Bounds the memory
31/// used by the alternates map and prevents pathological peers from filling it.
32///
33/// TODO(#6849): expose this through `BlockchainOptions` / CLI like the
34/// other mempool ceilings (`max_mempool_size`, RBF price-bumps). 8 is
35/// conservative; high-fan-in benchmarks and Hive adversarial-mempool scenarios
36/// might want to raise it. FIFO eviction keeps the cap safe regardless.
37pub const MAX_ALTERNATES_PER_HASH: usize = 8;
38
39/// Maximum number of blob (EIP-4844) transactions retained in the mempool,
40/// independent of `max_mempool_size`. Blob txs live in a dedicated sub-pool so a
41/// flood of regular transactions cannot evict them, and the sub-pool itself is
42/// evicted by value/nonce (see `remove_worst_blob_transaction`), never FIFO, so
43/// the node keeps the (scarce, high-value, includable) blob txs it needs to build
44/// full blocks.
45///
46/// Sized to comfortably hold several blocks' worth of includable blobs (Amsterdam
47/// allows up to 21 blobs/block) while bounding worst-case memory: blobs are held
48/// in RAM, so the bound is this count times the per-tx limit (`MAX_BLOB_TX_SIZE`,
49/// ~1 MiB) ⇒ ~0.5 GiB worst case.
50///
51/// TODO(#6849): expose through CLI and prefer a byte-based cap (like geth's
52/// blobpool `datacap`) so memory is bounded regardless of blobs-per-tx.
53pub const MAX_BLOB_MEMPOOL_SIZE: usize = 512;
54
55/// An alternate announcer for a known-in-flight transaction hash. Carries the
56/// announcer's own announced type and size so the eventual retry can validate
57/// the response against the alternate's metadata (which may differ from the
58/// primary announcer's, e.g. when one peer advertises a bare blob tx while
59/// another advertises the full sidecar).
60#[derive(Debug, Clone, Copy)]
61pub struct Alternate {
62    pub peer_id: H256,
63    pub tx_type: u8,
64    pub tx_size: usize,
65}
66
67#[derive(Debug, Default)]
68struct MempoolInner {
69    broadcast_pool: FxHashSet<H256>,
70    transaction_pool: FxHashMap<H256, MempoolTransaction>,
71    blobs_bundle_pool: FxHashMap<H256, BlobsBundle>,
72    /// Transaction hashes that have been requested via GetPooledTransactions
73    /// but whose responses haven't arrived yet. Used to avoid sending duplicate
74    /// requests when multiple peers announce the same transaction.
75    in_flight_txs: FxHashSet<H256>,
76    /// For each announced hash, the queue of *alternate* announcers that also
77    /// advertised it while the hash was already in-flight from someone else.
78    /// Each entry carries the announcer's own announced type and size so the
79    /// retry can validate the response against the alternate's metadata (which
80    /// may differ from the primary's). Used as a fallback list when an in-flight
81    /// request fails or the responding peer disconnects. The `Instant` records
82    /// the last time the entry was touched so a periodic pruner can drop stale
83    /// entries.
84    alternates: FxHashMap<H256, (VecDeque<Alternate>, Instant)>,
85    /// Maps blob versioned hashes to transaction hashes that include them and a position inside
86    /// blob bundle where blob and its adjacent data is available.
87    blobs_bundle_by_versioned_hash: FxHashMap<H256, FxHashMap<H256, usize>>,
88    txs_by_sender_nonce: BTreeMap<(H160, u64), H256>,
89    txs_order: VecDeque<H256>,
90    max_mempool_size: usize,
91    max_blob_mempool_size: usize,
92    // Max number of transactions to let the mempool order queue grow before pruning it
93    mempool_prune_threshold: usize,
94}
95
96impl MempoolInner {
97    fn new(max_mempool_size: usize) -> Self {
98        MempoolInner {
99            txs_order: VecDeque::with_capacity(max_mempool_size * 2),
100            transaction_pool: FxHashMap::with_capacity_and_hasher(
101                max_mempool_size,
102                Default::default(),
103            ),
104            max_mempool_size,
105            max_blob_mempool_size: MAX_BLOB_MEMPOOL_SIZE,
106            mempool_prune_threshold: max_mempool_size + max_mempool_size / 2,
107            ..Default::default()
108        }
109    }
110
111    /// Remove a transaction from the pool with the transaction pool lock already taken
112    fn remove_transaction_with_lock(&mut self, hash: &H256) -> Result<(), StoreError> {
113        let Some(tx) = self.transaction_pool.remove(hash) else {
114            return Ok(());
115        };
116        if matches!(tx.tx_type(), TxType::EIP4844) {
117            self.remove_blob_bundle(hash);
118        }
119
120        self.txs_by_sender_nonce.remove(&(tx.sender(), tx.nonce()));
121        self.broadcast_pool.remove(hash);
122
123        Ok(())
124    }
125
126    /// Remove a blobs bundle from the pool
127    pub fn remove_blob_bundle(&mut self, hash: &H256) {
128        let Some(h) = self.blobs_bundle_pool.remove(hash) else {
129            return;
130        };
131
132        for commitment in &h.commitments {
133            let versioned_hash = kzg_commitment_to_versioned_hash(commitment);
134            if let Entry::Occupied(mut entry) =
135                self.blobs_bundle_by_versioned_hash.entry(versioned_hash)
136            {
137                let txn_to_bundle = entry.get_mut();
138                txn_to_bundle.remove(hash);
139                if txn_to_bundle.is_empty() {
140                    entry.remove();
141                }
142            }
143        }
144    }
145
146    /// Number of blob (EIP-4844) txs currently in the pool. Each blob tx has
147    /// exactly one bundle entry, so the bundle pool size is the blob tx count.
148    fn blob_tx_count(&self) -> usize {
149        self.blobs_bundle_pool.len()
150    }
151
152    /// Number of non-blob txs currently in the pool.
153    fn regular_tx_count(&self) -> usize {
154        // `saturating_sub`: a blob bundle is inserted before its tx (see
155        // `add_blob_transaction_to_pool`), so in that window the bundle count can
156        // briefly exceed the tx entries. Treat the undercount as 0 regular txs
157        // rather than underflowing (which would wrongly trigger eviction).
158        self.transaction_pool
159            .len()
160            .saturating_sub(self.blob_tx_count())
161    }
162
163    /// Evict the oldest regular (non-blob) transactions until the regular pool is
164    /// back under its cap. Only drains `txs_order`, so blob txs are never evicted
165    /// by regular-tx pressure.
166    fn remove_oldest_regular_transaction(&mut self) -> Result<(), StoreError> {
167        while self.regular_tx_count() >= self.max_mempool_size {
168            if let Some(oldest_hash) = self.txs_order.pop_front() {
169                self.remove_transaction_with_lock(&oldest_hash)?;
170            } else {
171                warn!(
172                    "Regular mempool is full but there are no transactions to remove, this should not happen and will make the mempool grow indefinitely"
173                );
174                break;
175            }
176        }
177
178        Ok(())
179    }
180
181    /// Evict blob transactions until the blob sub-pool is back under its cap.
182    ///
183    /// Unlike a FIFO, this drops the *least includable* blob tx first. "Least
184    /// includable" is approximated by how deep a tx sits in its own sender's
185    /// queue: the nonce offset from that sender's lowest pooled blob nonce. A
186    /// large offset means the tx sits behind earlier same-sender blobs and
187    /// can't be included until those clear, so it is the safest to drop. Ties
188    /// are broken by lowest blob fee.
189    ///
190    /// The offset is measured per-sender on purpose. A raw cross-sender nonce
191    /// comparison would penalize long-lived high-throughput senders (e.g. a
192    /// rollup sequencer) whose on-wire nonces are large but whose txs are
193    /// perfectly includable. Measuring within a sender preserves the
194    /// low-offset, ready-to-include blobs the block builder actually needs
195    /// instead of FIFO-evicting them just because they arrived early.
196    fn remove_worst_blob_transaction(&mut self) -> Result<(), StoreError> {
197        while self.blob_tx_count() > self.max_blob_mempool_size {
198            // `blobs_bundle_pool` is keyed by blob-tx hash, so its keys are
199            // exactly the blob txs currently held. First pass: lowest pooled
200            // blob nonce per sender, the per-sender baseline for the offset.
201            let mut min_nonce_by_sender: FxHashMap<Address, u64> = FxHashMap::default();
202            for tx in self
203                .blobs_bundle_pool
204                .keys()
205                .filter_map(|hash| self.transaction_pool.get(hash))
206            {
207                min_nonce_by_sender
208                    .entry(tx.sender())
209                    .and_modify(|n| *n = (*n).min(tx.nonce()))
210                    .or_insert(tx.nonce());
211            }
212            // O(N) scan over the blob sub-pool (N <= max_blob_mempool_size, 512
213            // today). Fine at this cap; revisit (e.g. a priority index) before
214            // exposing a much larger cap via CLI.
215            let worst = self
216                .blobs_bundle_pool
217                .keys()
218                .filter_map(|hash| self.transaction_pool.get(hash).map(|tx| (*hash, tx)))
219                .max_by_key(|(_, tx)| {
220                    let baseline = min_nonce_by_sender.get(&tx.sender()).copied().unwrap_or(0);
221                    let offset = tx.nonce().saturating_sub(baseline);
222                    (offset, Reverse(tx.max_fee_per_blob_gas()))
223                })
224                .map(|(hash, _)| hash);
225            match worst {
226                Some(hash) => self.remove_transaction_with_lock(&hash)?,
227                None => {
228                    warn!(
229                        "Blob mempool is over cap but no evictable blob transaction is present, this should not happen"
230                    );
231                    break;
232                }
233            }
234        }
235
236        Ok(())
237    }
238}
239
240#[derive(Debug, Default)]
241pub struct Mempool {
242    inner: RwLock<MempoolInner>,
243    /// Signaled on transaction and blobs bundle insertions so payload
244    /// builders can await new work instead of busy-looping.
245    tx_added: tokio::sync::Notify,
246    /// Monotonic counter incremented on every transaction insertion. Used by
247    /// the payload builder to detect whether new txs landed since it last
248    /// snapshotted the mempool, so it can decide whether a stale build is safe
249    /// to return.
250    tx_seq: AtomicU64,
251}
252
253impl Mempool {
254    pub fn new(max_mempool_size: usize) -> Self {
255        Mempool {
256            inner: RwLock::new(MempoolInner::new(max_mempool_size)),
257            tx_added: tokio::sync::Notify::new(),
258            tx_seq: AtomicU64::new(0),
259        }
260    }
261
262    /// Override the blob sub-pool capacity (defaults to [`MAX_BLOB_MEMPOOL_SIZE`]).
263    /// Builder-style; intended for configuration and tests.
264    pub fn with_max_blob_mempool_size(self, max_blob_mempool_size: usize) -> Self {
265        if let Ok(mut inner) = self.inner.write() {
266            inner.max_blob_mempool_size = max_blob_mempool_size;
267        }
268        self
269    }
270
271    pub(crate) fn tx_added(&self) -> &tokio::sync::Notify {
272        &self.tx_added
273    }
274
275    pub(crate) fn tx_seq(&self) -> u64 {
276        self.tx_seq.load(Ordering::Acquire)
277    }
278
279    fn write(&self) -> Result<std::sync::RwLockWriteGuard<'_, MempoolInner>, StoreError> {
280        self.inner
281            .write()
282            .map_err(|error| StoreError::MempoolWriteLock(error.to_string()))
283    }
284
285    fn read(&self) -> Result<std::sync::RwLockReadGuard<'_, MempoolInner>, StoreError> {
286        self.inner
287            .read()
288            .map_err(|error| StoreError::MempoolReadLock(error.to_string()))
289    }
290
291    /// Add transaction to the pool without doing validity checks
292    pub fn add_transaction(
293        &self,
294        hash: H256,
295        sender: Address,
296        transaction: MempoolTransaction,
297    ) -> Result<(), StoreError> {
298        let mut inner = self.write()?;
299        let is_blob = matches!(transaction.tx_type(), TxType::EIP4844);
300        // Prune the regular order queue if it has grown too much
301        if inner.txs_order.len() > inner.mempool_prune_threshold {
302            // NOTE: we do this to avoid borrow checker errors
303            let txpool = core::mem::take(&mut inner.transaction_pool);
304            inner.txs_order.retain(|tx| txpool.contains_key(tx));
305            inner.transaction_pool = txpool;
306        }
307        // Blob txs are evicted against their own cap so a flood of regular txs
308        // can't push them out (and vice versa). Blob eviction is value/nonce
309        // ordered (see `remove_worst_blob_transaction`), not FIFO, so it never
310        // drops the next-includable blob tx; regular txs stay FIFO.
311        if is_blob {
312            // The bundle is inserted before the tx (see add_blob_transaction_to_pool),
313            // so the incoming blob is already counted by `blob_tx_count`.
314            if inner.blob_tx_count() > inner.max_blob_mempool_size {
315                inner.remove_worst_blob_transaction()?;
316            }
317        } else {
318            // The regular tx isn't in the pool yet (inserted below), so
319            // `regular_tx_count()` is the count *before* this tx: `>= max` means
320            // we're already at cap and must evict to make room. (Mirror of the
321            // blob branch, which uses `>` because the bundle is inserted first
322            // and is therefore already counted by `blob_tx_count`.)
323            if inner.regular_tx_count() >= inner.max_mempool_size {
324                inner.remove_oldest_regular_transaction()?;
325            }
326            inner.txs_order.push_back(hash);
327        }
328        inner
329            .txs_by_sender_nonce
330            .insert((sender, transaction.nonce()), hash);
331        inner.transaction_pool.insert(hash, transaction);
332        inner.broadcast_pool.insert(hash);
333        inner.alternates.remove(&hash);
334        // Drop the write lock before notifying to avoid holding it while waking waiters
335        drop(inner);
336        // Bump `tx_seq` *after* releasing the write lock. The payload builder
337        // snapshots `tx_seq` before reading the mempool; with this ordering,
338        // any reader that observes the new tx is guaranteed to also observe a
339        // bumped seq on its next load, so the builder never misses a tx it
340        // already incorporated as "new since last build".
341        self.tx_seq.fetch_add(1, Ordering::Release);
342        self.tx_added.notify_waiters();
343
344        Ok(())
345    }
346
347    pub fn get_txs_for_broadcast(&self) -> Result<Vec<MempoolTransaction>, StoreError> {
348        let inner = self.read()?;
349        let txs = inner
350            .transaction_pool
351            .iter()
352            .filter_map(|(hash, tx)| {
353                if !inner.broadcast_pool.contains(hash) {
354                    None
355                } else {
356                    Some(tx.clone())
357                }
358            })
359            .collect::<Vec<_>>();
360        Ok(txs)
361    }
362
363    pub fn remove_broadcasted_txs(&self, hashes: &[H256]) -> Result<(), StoreError> {
364        let mut inner = self.write()?;
365        for hash in hashes {
366            inner.broadcast_pool.remove(hash);
367        }
368        Ok(())
369    }
370
371    /// `(hash, sender, nonce)` for every blob tx in the pool. `blobs_bundle_pool`
372    /// is keyed by blob-tx hash, so its keys are exactly the held blob txs.
373    pub fn blob_txs(&self) -> Result<Vec<(H256, Address, u64)>, StoreError> {
374        let inner = self.read()?;
375        Ok(inner
376            .blobs_bundle_pool
377            .keys()
378            .filter_map(|hash| {
379                inner
380                    .transaction_pool
381                    .get(hash)
382                    .map(|tx| (*hash, tx.sender(), tx.nonce()))
383            })
384            .collect())
385    }
386
387    /// Add a blobs bundle to the pool by its blob transaction hash
388    pub fn add_blobs_bundle(
389        &self,
390        tx_hash: H256,
391        blobs_bundle: BlobsBundle,
392    ) -> Result<(), StoreError> {
393        let mut mempool = self.write()?;
394        for (i, c) in blobs_bundle.commitments.iter().enumerate() {
395            let versioned_hash = kzg_commitment_to_versioned_hash(c);
396            mempool
397                .blobs_bundle_by_versioned_hash
398                .entry(versioned_hash)
399                .or_default()
400                .insert(tx_hash, i);
401        }
402        mempool.blobs_bundle_pool.insert(tx_hash, blobs_bundle);
403        Ok(())
404    }
405
406    /// Get a blobs bundle to the pool given its blob transaction hash
407    pub fn get_blobs_bundle(&self, tx_hash: H256) -> Result<Option<BlobsBundle>, StoreError> {
408        Ok(self.read()?.blobs_bundle_pool.get(&tx_hash).cloned())
409    }
410
411    /// Remove a transaction from the pool
412    pub fn remove_transaction(&self, hash: &H256) -> Result<(), StoreError> {
413        let mut inner = self.write()?;
414        inner.remove_transaction_with_lock(hash)?;
415        Ok(())
416    }
417
418    /// Applies the filter and returns a set of suitable transactions from the mempool.
419    /// These transactions will be grouped by sender and sorted by nonce
420    pub fn filter_transactions(
421        &self,
422        filter: &PendingTxFilter,
423    ) -> Result<FxHashMap<Address, Vec<MempoolTransaction>>, StoreError> {
424        let filter_tx = |tx: &Transaction| -> bool {
425            // Filter by tx type
426            let is_blob_tx = matches!(tx, Transaction::EIP4844Transaction(_));
427            if filter.only_plain_txs && is_blob_tx || filter.only_blob_txs && !is_blob_tx {
428                return false;
429            }
430
431            // Filter by tip & base_fee
432            if let Some(min_tip) = filter.min_tip.map(U256::from) {
433                if tx
434                    .effective_gas_tip(filter.base_fee)
435                    .is_none_or(|tip| tip < min_tip)
436                {
437                    return false;
438                }
439            // This is a temporary fix to avoid invalid transactions to be included.
440            // This should be removed once https://github.com/lambdaclass/ethrex/issues/680
441            // is addressed.
442            } else if tx.effective_gas_tip(filter.base_fee).is_none() {
443                return false;
444            }
445
446            // Filter by blob gas fee
447            if is_blob_tx
448                && let Some(blob_fee) = filter.blob_fee
449                && tx
450                    .max_fee_per_blob_gas()
451                    .is_none_or(|fee| fee < blob_fee.into())
452            {
453                return false;
454            }
455            true
456        };
457        self.filter_transactions_with_filter_fn(&filter_tx)
458    }
459
460    /// Gets all the transactions in the mempool
461    pub fn get_all_txs_by_sender(
462        &self,
463    ) -> Result<FxHashMap<Address, Vec<MempoolTransaction>>, StoreError> {
464        let mut txs_by_sender: FxHashMap<Address, Vec<MempoolTransaction>> =
465            FxHashMap::with_capacity_and_hasher(128, Default::default());
466        let tx_pool = &self.read()?.transaction_pool;
467
468        for (_, tx) in tx_pool.iter() {
469            txs_by_sender
470                .entry(tx.sender())
471                .or_insert_with(|| Vec::with_capacity(128))
472                .push(tx.clone())
473        }
474
475        txs_by_sender.iter_mut().for_each(|(_, txs)| txs.sort());
476        Ok(txs_by_sender)
477    }
478
479    /// Applies the filter and returns a set of suitable transactions from the mempool.
480    /// These transactions will be grouped by sender and sorted by nonce
481    pub fn filter_transactions_with_filter_fn(
482        &self,
483        filter: &dyn Fn(&Transaction) -> bool,
484    ) -> Result<FxHashMap<Address, Vec<MempoolTransaction>>, StoreError> {
485        let mut txs_by_sender: FxHashMap<Address, Vec<MempoolTransaction>> =
486            FxHashMap::with_capacity_and_hasher(128, Default::default());
487        let tx_pool = &self.read()?.transaction_pool;
488
489        for (_, tx) in tx_pool.iter() {
490            if filter(tx) {
491                txs_by_sender
492                    .entry(tx.sender())
493                    .or_insert_with(|| Vec::with_capacity(128))
494                    .push(tx.clone())
495            }
496        }
497
498        txs_by_sender.iter_mut().for_each(|(_, txs)| txs.sort());
499        Ok(txs_by_sender)
500    }
501
502    /// Filters hashes to those not already in the mempool or in-flight, and
503    /// atomically marks the returned hashes as in-flight under a single write
504    /// lock so that concurrent peer handlers cannot request the same hashes.
505    ///
506    /// For hashes that get filtered out *because they're already in-flight
507    /// from another peer*, records `announcer` as a fallback so the request
508    /// can be retried against this peer if the original responder fails. New
509    /// hashes that the caller is about to request do not need an alternates
510    /// entry yet: the caller is the primary, and one will be created only if
511    /// some other peer later announces the same hash while it's in-flight.
512    /// Reserve hashes the caller wants to request, returning only those that are
513    /// neither already in-flight nor already in the pool. Any hash filtered out
514    /// because it's in-flight from another peer is registered with the caller's
515    /// own (type, size) metadata as an alternate, so a later retry can validate
516    /// the response against this announcer's announcement.
517    ///
518    /// `hashes`, `types`, and `sizes` must be the same length (one entry per
519    /// announced hash).
520    pub fn reserve_unknown_hashes(
521        &self,
522        hashes: &[H256],
523        types: &[u8],
524        sizes: &[usize],
525        announcer: H256,
526    ) -> Result<Vec<H256>, StoreError> {
527        debug_assert_eq!(hashes.len(), types.len());
528        debug_assert_eq!(hashes.len(), sizes.len());
529
530        let mut inner = self.write()?;
531
532        let unknown: Vec<H256> = hashes
533            .iter()
534            .filter(|hash| {
535                !inner.in_flight_txs.contains(hash) && !inner.transaction_pool.contains_key(hash)
536            })
537            .copied()
538            .collect();
539
540        inner.in_flight_txs.extend(unknown.iter().copied());
541
542        // Register alternates only for hashes the caller will *not* request
543        // (i.e. those already in-flight from someone else). Skip pool hits
544        // and skip hashes we just reserved for this peer.
545        if hashes.len() > unknown.len() {
546            let unknown_set: FxHashSet<H256> = unknown.iter().copied().collect();
547            let now = Instant::now();
548            for (i, hash) in hashes.iter().enumerate() {
549                if unknown_set.contains(hash) || inner.transaction_pool.contains_key(hash) {
550                    continue;
551                }
552                let alt = Alternate {
553                    peer_id: announcer,
554                    tx_type: types[i],
555                    tx_size: sizes[i],
556                };
557                let entry = inner
558                    .alternates
559                    .entry(*hash)
560                    .or_insert_with(|| (VecDeque::new(), now));
561                entry.1 = now;
562                if !entry.0.iter().any(|a| a.peer_id == announcer) {
563                    if entry.0.len() >= MAX_ALTERNATES_PER_HASH {
564                        entry.0.pop_front();
565                    }
566                    entry.0.push_back(alt);
567                }
568            }
569        }
570
571        Ok(unknown)
572    }
573
574    /// Removes transaction hashes from the in-flight set, typically called
575    /// when the GetPooledTransactions response arrives (or the connection drops).
576    pub fn clear_in_flight_txs(&self, hashes: &[H256]) -> Result<(), StoreError> {
577        let mut inner = self.write()?;
578        for hash in hashes {
579            inner.in_flight_txs.remove(hash);
580        }
581        Ok(())
582    }
583
584    /// Pops the next alternate announcer for the given hash, if any. Returns
585    /// `Ok(None)` when no alternates remain. The caller uses the popped
586    /// `Alternate` to look up the peer connection and build a retry request
587    /// against that peer's own announcement metadata.
588    pub fn pop_alternate(&self, hash: H256) -> Result<Option<Alternate>, StoreError> {
589        let mut inner = self.write()?;
590        let Some(entry) = inner.alternates.get_mut(&hash) else {
591            return Ok(None);
592        };
593        let popped = entry.0.pop_front();
594        if entry.0.is_empty() {
595            inner.alternates.remove(&hash);
596        }
597        Ok(popped)
598    }
599
600    /// Drop alternates entries that haven't been touched in the last `ttl`.
601    /// Called periodically to bound the size of the alternates map when
602    /// announced txs never make it into the pool.
603    pub fn prune_alternates(&self, ttl: Duration) -> Result<(), StoreError> {
604        let mut inner = self.write()?;
605        let now = Instant::now();
606        inner
607            .alternates
608            .retain(|_, (_, last_seen)| now.saturating_duration_since(*last_seen) < ttl);
609        Ok(())
610    }
611
612    pub fn get_transaction_by_hash(
613        &self,
614        transaction_hash: H256,
615    ) -> Result<Option<Transaction>, StoreError> {
616        let tx = self
617            .read()?
618            .transaction_pool
619            .get(&transaction_hash)
620            .map(|e| e.transaction().clone());
621
622        Ok(tx)
623    }
624
625    pub fn get_nonce(&self, address: &Address) -> Result<Option<u64>, MempoolError> {
626        Ok(self
627            .read()?
628            .txs_by_sender_nonce
629            .range((*address, 0)..=(*address, u64::MAX))
630            .last()
631            .map(|((_address, nonce), _hash)| nonce + 1))
632    }
633
634    pub fn get_mempool_size(&self) -> Result<(u64, u64), MempoolError> {
635        let txs_size = {
636            let pool_lock = &self.read()?.transaction_pool;
637            pool_lock.len()
638        };
639        let blobs_size = {
640            let pool_lock = &self.read()?.blobs_bundle_pool;
641            pool_lock.len()
642        };
643
644        Ok((txs_size as u64, blobs_size as u64))
645    }
646
647    /// Returns all transactions currently in the pool
648    pub fn content(&self) -> Result<Vec<Transaction>, MempoolError> {
649        let pooled_transactions = &self.read()?.transaction_pool;
650        Ok(pooled_transactions
651            .values()
652            .map(MempoolTransaction::transaction)
653            .cloned()
654            .collect())
655    }
656
657    /// Returns all blobs bundles currently in the pool
658    pub fn get_blobs_bundle_pool(&self) -> Result<Vec<BlobsBundle>, MempoolError> {
659        let blobs_bundle_pool = &self.read()?.blobs_bundle_pool;
660        Ok(blobs_bundle_pool.values().cloned().collect())
661    }
662
663    /// Returns blobs data (blob, commitment, proof) associated with the versioned hashes
664    pub fn get_blobs_data_by_versioned_hashes(
665        &self,
666        versioned_hashes: &[H256],
667    ) -> Result<Vec<Option<BlobTuple>>, MempoolError> {
668        let mempool = self.read()?;
669        let blobs_bundle_pool = &mempool.blobs_bundle_pool;
670        let blobs_bundle_by_versioned_hash = &mempool.blobs_bundle_by_versioned_hash;
671        let mut res = vec![None; versioned_hashes.len()];
672        for (idx, vh) in versioned_hashes.iter().enumerate() {
673            if let Some((found_hash, inner_pos)) = blobs_bundle_by_versioned_hash
674                .get(vh)
675                .and_then(|h| h.iter().next())
676            {
677                res[idx] = blobs_bundle_pool
678                    .get(found_hash)
679                    .and_then(|b| b.get_blob_tuple_by_index(*inner_pos))
680            }
681        }
682        Ok(res)
683    }
684
685    /// Returns the status of the mempool, which is the number of transactions currently in
686    /// the pool. Until we add "queue" transactions.
687    pub fn status(&self) -> Result<u64, MempoolError> {
688        let pool_lock = &self.read()?.transaction_pool;
689
690        Ok(pool_lock.len() as u64)
691    }
692
693    pub fn contains_sender_nonce(
694        &self,
695        sender: Address,
696        nonce: u64,
697        received_hash: H256,
698    ) -> Result<Option<MempoolTransaction>, MempoolError> {
699        let Some(hash) = self
700            .read()?
701            .txs_by_sender_nonce
702            .get(&(sender, nonce))
703            .cloned()
704        else {
705            return Ok(None);
706        };
707        if hash == received_hash {
708            return Ok(None);
709        }
710
711        let transaction_pool = &self.read()?.transaction_pool;
712        let tx = transaction_pool.get(&hash).cloned();
713        Ok(tx)
714    }
715
716    pub fn contains_tx(&self, tx_hash: H256) -> Result<bool, MempoolError> {
717        let contains = self.read()?.transaction_pool.contains_key(&tx_hash);
718        Ok(contains)
719    }
720
721    pub fn find_tx_to_replace(
722        &self,
723        sender: Address,
724        nonce: u64,
725        tx: &Transaction,
726    ) -> Result<Option<H256>, MempoolError> {
727        let Some(tx_in_pool) = self.contains_sender_nonce(sender, nonce, tx.hash())? else {
728            return Ok(None);
729        };
730        let is_a_replacement_tx = {
731            // EIP-1559 values
732            let old_tx_max_fee_per_gas = tx_in_pool.max_fee_per_gas().unwrap_or_default();
733            let old_tx_max_priority_fee_per_gas = tx_in_pool.max_priority_fee().unwrap_or_default();
734            let new_tx_max_fee_per_gas = tx.max_fee_per_gas().unwrap_or_default();
735            let new_tx_max_priority_fee_per_gas = tx.max_priority_fee().unwrap_or_default();
736
737            // Legacy tx values
738            let old_tx_gas_price = tx_in_pool.gas_price();
739            let new_tx_gas_price = tx.gas_price();
740
741            // EIP-4844 values
742            let old_tx_max_fee_per_blob = tx_in_pool.max_fee_per_blob_gas();
743            let new_tx_max_fee_per_blob = tx.max_fee_per_blob_gas();
744
745            let eip4844_higher_fees = if let (Some(old_blob_fee), Some(new_blob_fee)) =
746                (old_tx_max_fee_per_blob, new_tx_max_fee_per_blob)
747            {
748                new_blob_fee > old_blob_fee
749            } else {
750                true // We are marking it as always true if the tx is not eip-4844
751            };
752
753            let eip1559_higher_fees = new_tx_max_fee_per_gas > old_tx_max_fee_per_gas
754                && new_tx_max_priority_fee_per_gas > old_tx_max_priority_fee_per_gas;
755            let legacy_higher_fees = new_tx_gas_price > old_tx_gas_price;
756
757            eip4844_higher_fees && (eip1559_higher_fees || legacy_higher_fees)
758        };
759
760        if !is_a_replacement_tx {
761            return Err(MempoolError::UnderpricedReplacement);
762        }
763
764        Ok(Some(tx_in_pool.hash()))
765    }
766}
767
768/// Filter applied by the payload builder when querying pending transactions
769/// from the pool. NOT a mempool admission gate — all fields here are
770/// query-time filters used to pick block-includable transactions. Admission
771/// rules are enforced in `Blockchain::validate_transaction`.
772#[derive(Debug, Default)]
773pub struct PendingTxFilter {
774    /// Minimum effective priority fee for a transaction to be surfaced to
775    /// the payload builder. This is a block-building filter, not an
776    /// admission check — see `crates/common/types/constants.rs::MIN_GAS_TIP`.
777    pub min_tip: Option<u64>,
778    pub base_fee: Option<u64>,
779    pub blob_fee: Option<u64>,
780    pub only_plain_txs: bool,
781    pub only_blob_txs: bool,
782}
783
784pub fn transaction_intrinsic_gas(
785    tx: &Transaction,
786    header: &BlockHeader,
787    config: &ChainConfig,
788) -> Result<u64, MempoolError> {
789    // Amsterdam (EIP-8037): the VM splits intrinsic into (regular, state) and uses
790    // `REGULAR_GAS_CREATE = 9000` + `STATE_BYTES_PER_NEW_ACCOUNT * cpsb` for CREATE
791    // instead of the legacy `TX_CREATE_GAS_COST = 53000`. Mempool admission must
792    // match VM charge or we spuriously reject (or admit) transactions.
793    //
794    // The VM enforces `gas_limit >= max(intrinsic_regular + intrinsic_state,
795    // floor)` via two separate checks in `validate_gas_allowance` +
796    // `validate_min_gas_limit`. Apply the same max here so we don't admit
797    // txs whose calldata floor exceeds the weighted intrinsic — those would
798    // pass mempool and then fail at block inclusion, polluting the pool.
799    if config.is_amsterdam_activated(header.timestamp) {
800        let fork = config.fork(header.timestamp);
801        let (regular, state) = intrinsic_gas_dimensions(tx, fork, header.gas_limit)
802            .map_err(|_| MempoolError::TxGasOverflowError)?;
803        let intrinsic = regular
804            .checked_add(state)
805            .ok_or(MempoolError::TxGasOverflowError)?;
806        let floor = intrinsic_gas_floor(tx, fork).map_err(|_| MempoolError::TxGasOverflowError)?;
807        // Block-level gas = max(regular_dim, state_dim); regular_dim itself is
808        // `max(tx_regular, calldata_floor)` per EIP-7778. Use the same max so
809        // admission mirrors the VM's effective minimum.
810        return Ok(intrinsic.max(floor));
811    }
812
813    let is_contract_creation = tx.is_contract_creation();
814
815    let mut gas = if is_contract_creation {
816        TX_CREATE_GAS_COST
817    } else {
818        TX_GAS_COST
819    };
820
821    let data_len = tx.data().len() as u64;
822
823    if data_len > 0 {
824        let non_zero_gas_cost = if config.is_istanbul_activated(header.number) {
825            TX_DATA_NON_ZERO_GAS_EIP2028
826        } else {
827            TX_DATA_NON_ZERO_GAS
828        };
829
830        let non_zero_count = tx.data().iter().filter(|&&x| x != 0u8).count() as u64;
831
832        gas = gas
833            .checked_add(non_zero_count * non_zero_gas_cost)
834            .ok_or(MempoolError::TxGasOverflowError)?;
835
836        let zero_count = data_len - non_zero_count;
837
838        gas = gas
839            .checked_add(zero_count * TX_DATA_ZERO_GAS_COST)
840            .ok_or(MempoolError::TxGasOverflowError)?;
841
842        if is_contract_creation && config.is_shanghai_activated(header.timestamp) {
843            // Len in 32 bytes sized words
844            let len_in_words = data_len.saturating_add(31) / 32;
845
846            gas = gas
847                .checked_add(len_in_words * TX_INIT_CODE_WORD_GAS_COST)
848                .ok_or(MempoolError::TxGasOverflowError)?;
849        }
850    }
851
852    let storage_keys_count: u64 = tx
853        .access_list()
854        .iter()
855        .map(|(_, keys)| keys.len() as u64)
856        .sum();
857
858    gas = gas
859        .checked_add(tx.access_list().len() as u64 * TX_ACCESS_LIST_ADDRESS_GAS)
860        .ok_or(MempoolError::TxGasOverflowError)?;
861
862    gas = gas
863        .checked_add(storage_keys_count * TX_ACCESS_LIST_STORAGE_KEY_GAS)
864        .ok_or(MempoolError::TxGasOverflowError)?;
865
866    Ok(gas)
867}