Skip to main content

forest/message_pool/msgpool/
msg_pool.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4// Contains the implementation of Message Pool component.
5// The Message Pool is the component of forest that handles pending messages for
6// inclusion in the chain. Messages are added either directly for locally
7// published messages or through pubsub propagation.
8
9use std::{num::NonZeroUsize, sync::Arc, time::Duration};
10
11use crate::blocks::{CachingBlockHeader, Tipset};
12use crate::chain::{HeadChange, MINIMUM_BASE_FEE};
13#[cfg(test)]
14use crate::db::SettingsStore;
15use crate::eth::is_valid_eth_tx_for_sending;
16use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
17use crate::message::{ChainMessage, Message, SignedMessage, valid_for_block_inclusion};
18use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
19use crate::shim::{
20    address::Address,
21    crypto::{Signature, SignatureType},
22    econ::TokenAmount,
23    gas::{Gas, price_list_by_network_version},
24};
25use crate::state_manager::utils::is_valid_for_sending;
26use crate::utils::cache::SizeTrackingLruCache;
27use crate::utils::get_size::CidWrapper;
28use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
29use anyhow::Context as _;
30use cid::Cid;
31use futures::StreamExt;
32use fvm_ipld_encoding::to_vec;
33use itertools::Itertools;
34use nonzero_ext::nonzero;
35use parking_lot::RwLock as SyncRwLock;
36use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
37use tracing::warn;
38
39use crate::message_pool::{
40    config::MpoolConfig,
41    errors::Error,
42    head_change, metrics,
43    msgpool::{
44        BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, RBF_DENOM, RBF_NUM, recover_sig,
45        republish_pending_messages,
46    },
47    provider::Provider,
48    utils::get_base_fee_lower_bound,
49};
50
51// LruCache sizes have been taken from the lotus implementation
52const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
53const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
54
55pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
56pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
57/// Maximum size of a serialized message in bytes. This is an anti-DOS measure to prevent
58/// large messages from being added to the message pool.
59const MAX_MESSAGE_SIZE: usize = 64 << 10; // 64 KiB
60
61/// Trust policy for whether a message is from a trusted or untrusted source.
62/// Untrusted sources are subject to stricter limits.
63#[derive(Clone, Copy, Debug, PartialEq, Eq)]
64pub enum TrustPolicy {
65    Trusted,
66    Untrusted,
67}
68
69/// Simple structure that contains a hash-map of messages where k: a message
70/// from address, v: a message which corresponds to that address.
71#[derive(Clone, Default, Debug)]
72pub struct MsgSet {
73    pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
74    next_sequence: u64,
75}
76
77impl MsgSet {
78    /// Generate a new `MsgSet` with an empty hash-map and setting the sequence
79    /// specifically.
80    pub fn new(sequence: u64) -> Self {
81        MsgSet {
82            msgs: HashMap::new(),
83            next_sequence: sequence,
84        }
85    }
86
87    /// Add a signed message to the `MsgSet`. Increase `next_sequence` if the
88    /// message has a sequence greater than any existing message sequence.
89    /// Use this method when pushing a message coming from trusted sources.
90    pub fn add_trusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
91    where
92        T: Provider,
93    {
94        self.add(api, m, true)
95    }
96
97    /// Add a signed message to the `MsgSet`. Increase `next_sequence` if the
98    /// message has a sequence greater than any existing message sequence.
99    /// Use this method when pushing a message coming from untrusted sources.
100    pub fn add_untrusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
101    where
102        T: Provider,
103    {
104        self.add(api, m, false)
105    }
106
107    fn add<T>(&mut self, api: &T, m: SignedMessage, trusted: bool) -> Result<(), Error>
108    where
109        T: Provider,
110    {
111        let max_actor_pending_messages = if trusted {
112            api.max_actor_pending_messages()
113        } else {
114            api.max_untrusted_actor_pending_messages()
115        };
116
117        if self.msgs.is_empty() || m.sequence() >= self.next_sequence {
118            self.next_sequence = m.sequence() + 1;
119        }
120
121        let has_existing = if let Some(exms) = self.msgs.get(&m.sequence()) {
122            if m.cid() != exms.cid() {
123                let premium = &exms.message().gas_premium;
124                let min_price = premium.clone()
125                    + ((premium * RBF_NUM).div_floor(RBF_DENOM))
126                    + TokenAmount::from_atto(1u8);
127                if m.message().gas_premium <= min_price {
128                    return Err(Error::GasPriceTooLow);
129                }
130            } else {
131                return Err(Error::DuplicateSequence);
132            }
133            true
134        } else {
135            false
136        };
137
138        // Only check the limit when adding a new message, not when replacing an existing one (RBF)
139        if !has_existing && self.msgs.len() as u64 >= max_actor_pending_messages {
140            return Err(Error::TooManyPendingMessages(
141                m.message.from().to_string(),
142                trusted,
143            ));
144        }
145        if self.msgs.insert(m.sequence(), m).is_none() {
146            metrics::MPOOL_MESSAGE_TOTAL.inc();
147        }
148        Ok(())
149    }
150
151    /// Removes message with the given sequence. If applied, update the set's
152    /// next sequence.
153    pub fn rm(&mut self, sequence: u64, applied: bool) {
154        if self.msgs.remove(&sequence).is_none() {
155            if applied && sequence >= self.next_sequence {
156                self.next_sequence = sequence + 1;
157                while self.msgs.contains_key(&self.next_sequence) {
158                    self.next_sequence += 1;
159                }
160            }
161            return;
162        }
163        metrics::MPOOL_MESSAGE_TOTAL.dec();
164
165        // adjust next sequence
166        if applied {
167            // we removed a (known) message because it was applied in a tipset
168            // we can't possibly have filled a gap in this case
169            if sequence >= self.next_sequence {
170                self.next_sequence = sequence + 1;
171            }
172            return;
173        }
174        // we removed a message because it was pruned
175        // we have to adjust the sequence if it creates a gap or rewinds state
176        if sequence < self.next_sequence {
177            self.next_sequence = sequence;
178        }
179    }
180}
181
182/// This contains all necessary information needed for the message pool.
183/// Keeps track of messages to apply, as well as context needed for verifying
184/// transactions.
185pub struct MessagePool<T> {
186    /// The local address of the client
187    local_addrs: Arc<SyncRwLock<Vec<Address>>>,
188    /// A map of pending messages where the key is the address
189    pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
190    /// The current tipset (a set of blocks)
191    pub cur_tipset: Arc<SyncRwLock<Tipset>>,
192    /// The underlying provider
193    pub api: Arc<T>,
194    /// Sender half to send messages to other components
195    pub network_sender: flume::Sender<NetworkMessage>,
196    /// A cache for BLS signature keyed by Cid
197    pub bls_sig_cache: Arc<SizeTrackingLruCache<CidWrapper, Signature>>,
198    /// A cache for BLS signature keyed by Cid
199    pub sig_val_cache: Arc<SizeTrackingLruCache<CidWrapper, ()>>,
200    /// A set of republished messages identified by their Cid
201    pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
202    /// Acts as a signal to republish messages from the republished set of
203    /// messages
204    pub repub_trigger: flume::Sender<()>,
205    local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
206    /// Configurable parameters of the message pool
207    pub config: MpoolConfig,
208    /// Chain configuration
209    pub chain_config: Arc<ChainConfig>,
210}
211
212impl<T> MessagePool<T>
213where
214    T: Provider,
215{
216    /// Gets the current tipset
217    pub fn current_tipset(&self) -> Tipset {
218        self.cur_tipset.read().clone()
219    }
220
221    /// Add a signed message to the pool and its address.
222    fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
223        self.local_addrs.write().push(m.from());
224        self.local_msgs.write().insert(m);
225        Ok(())
226    }
227
228    /// Push a signed message to the `MessagePool`. Additionally performs basic
229    /// checks on the validity of a message.
230    pub async fn push_internal(
231        &self,
232        msg: SignedMessage,
233        trust_policy: TrustPolicy,
234    ) -> Result<Cid, Error> {
235        self.check_message(&msg)?;
236        let cid = msg.cid();
237        let cur_ts = self.current_tipset();
238        let publish = self.add_tipset(msg.clone(), &cur_ts, true, trust_policy)?;
239        let msg_ser = to_vec(&msg)?;
240        let network_name = self.chain_config.network.genesis_name();
241        self.add_local(msg)?;
242        if publish {
243            self.network_sender
244                .send_async(NetworkMessage::PubsubMessage {
245                    topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
246                    message: msg_ser,
247                })
248                .await
249                .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
250        }
251        Ok(cid)
252    }
253
254    /// Push a signed message to the `MessagePool` from an trusted source.
255    pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
256        self.push_internal(msg, TrustPolicy::Trusted).await
257    }
258
259    /// Push a signed message to the `MessagePool` from an untrusted source.
260    pub async fn push_untrusted(&self, msg: SignedMessage) -> Result<Cid, Error> {
261        self.push_internal(msg, TrustPolicy::Untrusted).await
262    }
263
264    fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
265        if to_vec(msg)?.len() > MAX_MESSAGE_SIZE {
266            return Err(Error::MessageTooBig);
267        }
268        valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
269        if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
270            return Err(Error::MessageValueTooHigh);
271        }
272        if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
273            return Err(Error::GasFeeCapTooLow);
274        }
275        self.verify_msg_sig(msg)
276    }
277
278    /// This is a helper to push that will help to make sure that the message
279    /// fits the parameters to be pushed to the `MessagePool`.
280    pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
281        self.check_message(&msg)?;
282        let ts = self.current_tipset();
283        self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?;
284        Ok(())
285    }
286
287    /// Verify the message signature. first check if it has already been
288    /// verified and put into cache. If it has not, then manually verify it
289    /// then put it into cache for future use.
290    fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
291        let cid = msg.cid();
292
293        if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
294            return Ok(());
295        }
296
297        msg.verify(self.chain_config.eth_chain_id)
298            .map_err(|e| Error::Other(e.to_string()))?;
299
300        self.sig_val_cache.push(cid.into(), ());
301
302        Ok(())
303    }
304
305    /// Verify the `state_sequence` and balance for the sender of the message
306    /// given then call `add_locked` to finish adding the `signed_message`
307    /// to pending.
308    fn add_tipset(
309        &self,
310        msg: SignedMessage,
311        cur_ts: &Tipset,
312        local: bool,
313        trust_policy: TrustPolicy,
314    ) -> Result<bool, Error> {
315        let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
316
317        if sequence > msg.message().sequence {
318            return Err(Error::SequenceTooLow);
319        }
320
321        let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
322
323        // This message can only be included in the next epoch and beyond, hence the +1.
324        let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
325        let eth_chain_id = self.chain_config.eth_chain_id;
326        if msg.signature().signature_type() == SignatureType::Delegated
327            && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
328        {
329            return Err(Error::Other(
330                "Invalid Ethereum message for the current network version".to_owned(),
331            ));
332        }
333        if !is_valid_for_sending(nv, &sender_actor) {
334            return Err(Error::Other(
335                "Sender actor is not a valid top-level sender".to_owned(),
336            ));
337        }
338
339        let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
340
341        let balance = self.get_state_balance(&msg.from(), cur_ts)?;
342
343        let msg_balance = msg.required_funds();
344        if balance < msg_balance {
345            return Err(Error::NotEnoughFunds);
346        }
347        self.add_helper(msg, trust_policy)?;
348        Ok(publish)
349    }
350
351    /// Finish verifying signed message before adding it to the pending `mset`
352    /// hash-map. If an entry in the hash-map does not yet exist, create a
353    /// new `mset` that will correspond to the from message and push it to
354    /// the pending hash-map.
355    fn add_helper(&self, msg: SignedMessage, trust_policy: TrustPolicy) -> Result<(), Error> {
356        let from = msg.from();
357        let cur_ts = self.current_tipset();
358        add_helper(
359            self.api.as_ref(),
360            self.bls_sig_cache.as_ref(),
361            self.pending.as_ref(),
362            msg,
363            self.get_state_sequence(&from, &cur_ts)?,
364            trust_policy,
365        )
366    }
367
368    /// Get the sequence for a given address, return Error if there is a failure
369    /// to retrieve the respective sequence.
370    pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
371        let cur_ts = self.current_tipset();
372
373        let sequence = self.get_state_sequence(addr, &cur_ts)?;
374
375        let pending = self.pending.read();
376
377        let msgset = pending.get(addr);
378        match msgset {
379            Some(mset) => {
380                if sequence > mset.next_sequence {
381                    return Ok(sequence);
382                }
383                Ok(mset.next_sequence)
384            }
385            None => Ok(sequence),
386        }
387    }
388
389    /// Get the state of the sequence for a given address in `cur_ts`.
390    fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
391        let actor = self.api.get_actor_after(addr, cur_ts)?;
392        Ok(actor.sequence)
393    }
394
395    /// Get the state balance for the actor that corresponds to the supplied
396    /// address and tipset, if this actor does not exist, return an error.
397    fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
398        let actor = self.api.get_actor_after(addr, ts)?;
399        Ok(TokenAmount::from(&actor.balance))
400    }
401
402    /// Return a tuple that contains a vector of all signed messages and the
403    /// current tipset for self.
404    pub fn pending(&self) -> Result<(Vec<SignedMessage>, Tipset), Error> {
405        let mut out: Vec<SignedMessage> = Vec::new();
406        let pending = self.pending.read().clone();
407
408        for (addr, _) in pending {
409            out.append(
410                self.pending_for(&addr)
411                    .ok_or(Error::InvalidFromAddr)?
412                    .as_mut(),
413            )
414        }
415
416        let cur_ts = self.current_tipset();
417
418        Ok((out, cur_ts))
419    }
420
421    /// Return a Vector of signed messages for a given from address. This vector
422    /// will be sorted by each `message`'s sequence. If no corresponding
423    /// messages found, return None result type.
424    pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
425        let pending = self.pending.read();
426        let mset = pending.get(a)?;
427        if mset.msgs.is_empty() {
428            return None;
429        }
430
431        Some(
432            mset.msgs
433                .values()
434                .cloned()
435                .sorted_by_key(|v| v.message().sequence)
436                .collect(),
437        )
438    }
439
440    /// Return Vector of signed messages given a block header for self.
441    pub fn messages_for_blocks<'a>(
442        &self,
443        blks: impl Iterator<Item = &'a CachingBlockHeader>,
444    ) -> Result<Vec<SignedMessage>, Error> {
445        let mut msg_vec: Vec<SignedMessage> = Vec::new();
446
447        for block in blks {
448            let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
449
450            msg_vec.append(smsgs.as_mut());
451            for msg in umsg {
452                let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?;
453                msg_vec.push(smsg)
454            }
455        }
456        Ok(msg_vec)
457    }
458
459    /// Loads local messages to the message pool to be applied.
460    pub fn load_local(&mut self) -> Result<(), Error> {
461        let mut local_msgs = self.local_msgs.write();
462        for k in local_msgs.iter().cloned().collect_vec() {
463            self.add(k.clone()).unwrap_or_else(|err| {
464                if err == Error::SequenceTooLow {
465                    warn!("error adding message: {:?}", err);
466                    local_msgs.remove(&k);
467                }
468            })
469        }
470
471        Ok(())
472    }
473
474    #[cfg(test)]
475    pub fn get_config(&self) -> &MpoolConfig {
476        &self.config
477    }
478
479    #[cfg(test)]
480    pub fn set_config<DB: SettingsStore>(
481        &mut self,
482        db: &DB,
483        cfg: MpoolConfig,
484    ) -> Result<(), Error> {
485        cfg.save_config(db)
486            .map_err(|e| Error::Other(e.to_string()))?;
487        self.config = cfg;
488        Ok(())
489    }
490}
491
492impl<T> MessagePool<T>
493where
494    T: Provider + Send + Sync + 'static,
495{
496    /// Creates a new `MessagePool` instance.
497    pub fn new(
498        api: T,
499        network_sender: flume::Sender<NetworkMessage>,
500        config: MpoolConfig,
501        chain_config: Arc<ChainConfig>,
502        services: &mut JoinSet<anyhow::Result<()>>,
503    ) -> Result<MessagePool<T>, Error>
504    where
505        T: Provider,
506    {
507        let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
508        let pending = Arc::new(SyncRwLock::new(HashMap::new()));
509        let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
510        let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
511            "bls_sig".into(),
512            BLS_SIG_CACHE_SIZE,
513        ));
514        let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
515            "sig_val".into(),
516            SIG_VAL_CACHE_SIZE,
517        ));
518        let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
519        let republished = Arc::new(SyncRwLock::new(HashSet::new()));
520        let block_delay = chain_config.block_delay_secs;
521
522        let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
523        let mut mp = MessagePool {
524            local_addrs,
525            pending,
526            cur_tipset: tipset,
527            api: Arc::new(api),
528            bls_sig_cache,
529            sig_val_cache,
530            local_msgs,
531            republished,
532            config,
533            network_sender,
534            repub_trigger,
535            chain_config: Arc::clone(&chain_config),
536        };
537
538        mp.load_local()?;
539
540        let mut subscriber = mp.api.subscribe_head_changes();
541
542        let api = mp.api.clone();
543        let bls_sig_cache = mp.bls_sig_cache.clone();
544        let pending = mp.pending.clone();
545        let republished = mp.republished.clone();
546
547        let cur_tipset = mp.cur_tipset.clone();
548        let repub_trigger = mp.repub_trigger.clone();
549
550        // Reacts to new HeadChanges
551        services.spawn(async move {
552            loop {
553                match subscriber.recv().await {
554                    Ok(ts) => {
555                        let (cur, rev, app) = match ts {
556                            HeadChange::Apply(tipset) => {
557                                (cur_tipset.clone(), Vec::new(), vec![tipset])
558                            }
559                        };
560                        head_change(
561                            api.as_ref(),
562                            bls_sig_cache.as_ref(),
563                            repub_trigger.clone(),
564                            republished.as_ref(),
565                            pending.as_ref(),
566                            cur.as_ref(),
567                            rev,
568                            app,
569                        )
570                        .await
571                        .context("Error changing head")?;
572                    }
573                    Err(RecvError::Lagged(e)) => {
574                        warn!("Head change subscriber lagged: skipping {} events", e);
575                    }
576                    Err(RecvError::Closed) => {
577                        break Ok(());
578                    }
579                }
580            }
581        });
582
583        let api = mp.api.clone();
584        let pending = mp.pending.clone();
585        let cur_tipset = mp.cur_tipset.clone();
586        let republished = mp.republished.clone();
587        let local_addrs = mp.local_addrs.clone();
588        let network_sender = Arc::new(mp.network_sender.clone());
589        let republish_interval = (10 * block_delay + chain_config.propagation_delay_secs) as u64;
590        // Reacts to republishing requests
591        services.spawn(async move {
592            let mut repub_trigger_rx = repub_trigger_rx.stream();
593            let mut interval = interval(Duration::from_secs(republish_interval));
594            loop {
595                tokio::select! {
596                    _ = interval.tick() => (),
597                    _ = repub_trigger_rx.next() => (),
598                }
599                if let Err(e) = republish_pending_messages(
600                    api.as_ref(),
601                    network_sender.as_ref(),
602                    pending.as_ref(),
603                    cur_tipset.as_ref(),
604                    republished.as_ref(),
605                    local_addrs.as_ref(),
606                    &chain_config,
607                )
608                .await
609                {
610                    warn!("Failed to republish pending messages: {}", e.to_string());
611                }
612            }
613        });
614        Ok(mp)
615    }
616}
617
618// Helpers for MessagePool
619
620/// Finish verifying signed message before adding it to the pending `mset`
621/// hash-map. If an entry in the hash-map does not yet exist, create a new
622/// `mset` that will correspond to the from message and push it to the pending
623/// hash-map.
624pub(in crate::message_pool) fn add_helper<T>(
625    api: &T,
626    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
627    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
628    msg: SignedMessage,
629    sequence: u64,
630    trust_policy: TrustPolicy,
631) -> Result<(), Error>
632where
633    T: Provider,
634{
635    if msg.signature().signature_type() == SignatureType::Bls {
636        bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
637    }
638
639    api.put_message(&ChainMessage::Signed(msg.clone()))?;
640    api.put_message(&ChainMessage::Unsigned(msg.message().clone()))?;
641
642    let mut pending = pending.write();
643    let from = msg.from();
644    let mset = pending.entry(from).or_insert_with(|| MsgSet::new(sequence));
645    match trust_policy {
646        TrustPolicy::Untrusted => mset.add_untrusted(api, msg)?,
647        TrustPolicy::Trusted => mset.add_trusted(api, msg)?,
648    }
649
650    Ok(())
651}
652
653fn verify_msg_before_add(
654    m: &SignedMessage,
655    cur_ts: &Tipset,
656    local: bool,
657    chain_config: &ChainConfig,
658) -> Result<bool, Error> {
659    let epoch = cur_ts.epoch();
660    let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
661        .on_chain_message(m.chain_length()?);
662    valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
663    if !cur_ts.block_headers().is_empty() {
664        let base_fee = &cur_ts.block_headers().first().parent_base_fee;
665        let base_fee_lower_bound =
666            get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
667        if m.gas_fee_cap() < base_fee_lower_bound {
668            if local {
669                warn!(
670                    "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",
671                    m.gas_fee_cap(),
672                    base_fee_lower_bound
673                );
674                return Ok(false);
675            }
676            return Err(Error::SoftValidationFailure(format!(
677                "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
678                m.gas_fee_cap(),
679                base_fee_lower_bound
680            )));
681        }
682    }
683    Ok(local)
684}
685
686/// Remove a message from pending given the from address and sequence.
687pub fn remove(
688    from: &Address,
689    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
690    sequence: u64,
691    applied: bool,
692) -> Result<(), Error> {
693    let mut pending = pending.write();
694    let mset = if let Some(mset) = pending.get_mut(from) {
695        mset
696    } else {
697        return Ok(());
698    };
699
700    mset.rm(sequence, applied);
701
702    if mset.msgs.is_empty() {
703        pending.remove(from);
704    }
705
706    Ok(())
707}
708
709#[cfg(test)]
710mod tests {
711    use crate::message_pool::test_provider::TestApi;
712
713    use super::*;
714    use crate::shim::message::Message as ShimMessage;
715
716    // Regression test for https://github.com/ChainSafe/forest/pull/6118 which fixed a bogus 100M
717    // gas limit. There are no limits on a single message.
718    #[test]
719    fn add_helper_message_gas_limit_test() {
720        let api = TestApi::default();
721        let bls_sig_cache = SizeTrackingLruCache::new_mocked();
722        let pending = SyncRwLock::new(HashMap::new());
723        let message = ShimMessage {
724            gas_limit: 666_666_666,
725            ..ShimMessage::default()
726        };
727        let msg = SignedMessage::mock_bls_signed_message(message);
728        let sequence = msg.message().sequence;
729        let res = add_helper(
730            &api,
731            &bls_sig_cache,
732            &pending,
733            msg,
734            sequence,
735            TrustPolicy::Trusted,
736        );
737        assert!(res.is_ok());
738    }
739
740    // Test that RBF (Replace By Fee) is allowed even when at max_actor_pending_messages capacity
741    // This matches Lotus behavior where the check is: https://github.com/filecoin-project/lotus/blob/5f32d00550ddd2f2d0f9abe97dbae07615f18547/chain/messagepool/messagepool.go#L296-L299
742    #[test]
743    fn test_rbf_at_capacity() {
744        use crate::shim::econ::TokenAmount;
745
746        let api = TestApi::with_max_actor_pending_messages(10);
747        let mut mset = MsgSet::new(0);
748
749        // Fill up to capacity (10 messages)
750        for i in 0..10 {
751            let message = ShimMessage {
752                sequence: i,
753                gas_premium: TokenAmount::from_atto(100u64),
754                ..ShimMessage::default()
755            };
756            let msg = SignedMessage::mock_bls_signed_message(message);
757            let res = mset.add_trusted(&api, msg);
758            assert!(res.is_ok(), "Failed to add message {}: {:?}", i, res);
759        }
760
761        // Should reject adding a NEW message (sequence 10) when at capacity
762        let message_new = ShimMessage {
763            sequence: 10,
764            gas_premium: TokenAmount::from_atto(100u64),
765            ..ShimMessage::default()
766        };
767        let msg_new = SignedMessage::mock_bls_signed_message(message_new);
768        let res_new = mset.add_trusted(&api, msg_new);
769        assert!(matches!(res_new, Err(Error::TooManyPendingMessages(_, _))));
770
771        // Should ALLOW replacing an existing message (RBF) even when at capacity
772        // Replace message with sequence 5 with higher gas premium
773        let message_rbf = ShimMessage {
774            sequence: 5,
775            gas_premium: TokenAmount::from_atto(200u64),
776            ..ShimMessage::default()
777        };
778        let msg_rbf = SignedMessage::mock_bls_signed_message(message_rbf);
779        let res_rbf = mset.add_trusted(&api, msg_rbf);
780        assert!(
781            res_rbf.is_ok(),
782            "RBF should be allowed at capacity: {:?}",
783            res_rbf
784        );
785    }
786}