Skip to main content

forest/message_pool/msgpool/
msg_pool.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4// Contains the implementation of Message Pool component.
5// The Message Pool is the component of forest that handles pending messages for
6// inclusion in the chain. Messages are added either directly for locally
7// published messages or through pubsub propagation.
8
9use std::{num::NonZeroUsize, sync::Arc, time::Duration};
10
11use crate::blocks::{CachingBlockHeader, Tipset};
12use crate::chain::{HeadChange, MINIMUM_BASE_FEE};
13#[cfg(test)]
14use crate::db::SettingsStore;
15use crate::eth::is_valid_eth_tx_for_sending;
16use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
17use crate::message::{ChainMessage, Message, SignedMessage, valid_for_block_inclusion};
18use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
19use crate::shim::{
20    address::Address,
21    crypto::{Signature, SignatureType},
22    econ::TokenAmount,
23    gas::{Gas, price_list_by_network_version},
24};
25use crate::state_manager::utils::is_valid_for_sending;
26use crate::utils::cache::SizeTrackingLruCache;
27use crate::utils::get_size::CidWrapper;
28use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
29use anyhow::Context as _;
30use cid::Cid;
31use futures::StreamExt;
32use fvm_ipld_encoding::to_vec;
33use itertools::Itertools;
34use nonzero_ext::nonzero;
35use parking_lot::RwLock as SyncRwLock;
36use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
37use tracing::warn;
38
39use crate::message_pool::{
40    config::MpoolConfig,
41    errors::Error,
42    head_change, metrics,
43    msgpool::{
44        BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, RBF_DENOM, RBF_NUM, recover_sig,
45        republish_pending_messages,
46    },
47    provider::Provider,
48    utils::get_base_fee_lower_bound,
49};
50
51// LruCache sizes have been taken from the lotus implementation
52const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
53const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
54
55pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
56pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
57/// Maximum size of a serialized message in bytes. This is an anti-DOS measure to prevent
58/// large messages from being added to the message pool.
59const MAX_MESSAGE_SIZE: usize = 64 << 10; // 64 KiB
60
61/// Simple structure that contains a hash-map of messages where k: a message
62/// from address, v: a message which corresponds to that address.
63#[derive(Clone, Default, Debug)]
64pub struct MsgSet {
65    pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
66    next_sequence: u64,
67}
68
69impl MsgSet {
70    /// Generate a new `MsgSet` with an empty hash-map and setting the sequence
71    /// specifically.
72    pub fn new(sequence: u64) -> Self {
73        MsgSet {
74            msgs: HashMap::new(),
75            next_sequence: sequence,
76        }
77    }
78
79    /// Add a signed message to the `MsgSet`. Increase `next_sequence` if the
80    /// message has a sequence greater than any existing message sequence.
81    /// Use this method when pushing a message coming from trusted sources.
82    pub fn add_trusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
83    where
84        T: Provider,
85    {
86        self.add(api, m, true)
87    }
88
89    /// Add a signed message to the `MsgSet`. Increase `next_sequence` if the
90    /// message has a sequence greater than any existing message sequence.
91    /// Use this method when pushing a message coming from untrusted sources.
92    #[allow(dead_code)]
93    pub fn add_untrusted<T>(&mut self, api: &T, m: SignedMessage) -> Result<(), Error>
94    where
95        T: Provider,
96    {
97        self.add(api, m, false)
98    }
99
100    fn add<T>(&mut self, api: &T, m: SignedMessage, trusted: bool) -> Result<(), Error>
101    where
102        T: Provider,
103    {
104        let max_actor_pending_messages = if trusted {
105            api.max_actor_pending_messages()
106        } else {
107            api.max_untrusted_actor_pending_messages()
108        };
109
110        if self.msgs.is_empty() || m.sequence() >= self.next_sequence {
111            self.next_sequence = m.sequence() + 1;
112        }
113
114        let has_existing = if let Some(exms) = self.msgs.get(&m.sequence()) {
115            if m.cid() != exms.cid() {
116                let premium = &exms.message().gas_premium;
117                let min_price = premium.clone()
118                    + ((premium * RBF_NUM).div_floor(RBF_DENOM))
119                    + TokenAmount::from_atto(1u8);
120                if m.message().gas_premium <= min_price {
121                    return Err(Error::GasPriceTooLow);
122                }
123            } else {
124                return Err(Error::DuplicateSequence);
125            }
126            true
127        } else {
128            false
129        };
130
131        // Only check the limit when adding a new message, not when replacing an existing one (RBF)
132        if !has_existing && self.msgs.len() as u64 >= max_actor_pending_messages {
133            return Err(Error::TooManyPendingMessages(
134                m.message.from().to_string(),
135                trusted,
136            ));
137        }
138        if self.msgs.insert(m.sequence(), m).is_none() {
139            metrics::MPOOL_MESSAGE_TOTAL.inc();
140        }
141        Ok(())
142    }
143
144    /// Removes message with the given sequence. If applied, update the set's
145    /// next sequence.
146    pub fn rm(&mut self, sequence: u64, applied: bool) {
147        if self.msgs.remove(&sequence).is_none() {
148            if applied && sequence >= self.next_sequence {
149                self.next_sequence = sequence + 1;
150                while self.msgs.contains_key(&self.next_sequence) {
151                    self.next_sequence += 1;
152                }
153            }
154            return;
155        }
156        metrics::MPOOL_MESSAGE_TOTAL.dec();
157
158        // adjust next sequence
159        if applied {
160            // we removed a (known) message because it was applied in a tipset
161            // we can't possibly have filled a gap in this case
162            if sequence >= self.next_sequence {
163                self.next_sequence = sequence + 1;
164            }
165            return;
166        }
167        // we removed a message because it was pruned
168        // we have to adjust the sequence if it creates a gap or rewinds state
169        if sequence < self.next_sequence {
170            self.next_sequence = sequence;
171        }
172    }
173}
174
175/// This contains all necessary information needed for the message pool.
176/// Keeps track of messages to apply, as well as context needed for verifying
177/// transactions.
178pub struct MessagePool<T> {
179    /// The local address of the client
180    local_addrs: Arc<SyncRwLock<Vec<Address>>>,
181    /// A map of pending messages where the key is the address
182    pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
183    /// The current tipset (a set of blocks)
184    pub cur_tipset: Arc<SyncRwLock<Tipset>>,
185    /// The underlying provider
186    pub api: Arc<T>,
187    /// Sender half to send messages to other components
188    pub network_sender: flume::Sender<NetworkMessage>,
189    /// A cache for BLS signature keyed by Cid
190    pub bls_sig_cache: Arc<SizeTrackingLruCache<CidWrapper, Signature>>,
191    /// A cache for BLS signature keyed by Cid
192    pub sig_val_cache: Arc<SizeTrackingLruCache<CidWrapper, ()>>,
193    /// A set of republished messages identified by their Cid
194    pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
195    /// Acts as a signal to republish messages from the republished set of
196    /// messages
197    pub repub_trigger: flume::Sender<()>,
198    local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
199    /// Configurable parameters of the message pool
200    pub config: MpoolConfig,
201    /// Chain configuration
202    pub chain_config: Arc<ChainConfig>,
203}
204
205impl<T> MessagePool<T>
206where
207    T: Provider,
208{
209    /// Gets the current tipset
210    pub fn current_tipset(&self) -> Tipset {
211        self.cur_tipset.read().clone()
212    }
213
214    /// Add a signed message to the pool and its address.
215    fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
216        self.local_addrs.write().push(m.from());
217        self.local_msgs.write().insert(m);
218        Ok(())
219    }
220
221    /// Push a signed message to the `MessagePool`. Additionally performs basic
222    /// checks on the validity of a message.
223    pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
224        self.check_message(&msg)?;
225        let cid = msg.cid();
226        let cur_ts = self.current_tipset();
227        let publish = self.add_tipset(msg.clone(), &cur_ts, true)?;
228        let msg_ser = to_vec(&msg)?;
229        let network_name = self.chain_config.network.genesis_name();
230        self.add_local(msg)?;
231        if publish {
232            self.network_sender
233                .send_async(NetworkMessage::PubsubMessage {
234                    topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
235                    message: msg_ser,
236                })
237                .await
238                .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
239        }
240        Ok(cid)
241    }
242
243    fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
244        if to_vec(msg)?.len() > MAX_MESSAGE_SIZE {
245            return Err(Error::MessageTooBig);
246        }
247        valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
248        if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
249            return Err(Error::MessageValueTooHigh);
250        }
251        if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
252            return Err(Error::GasFeeCapTooLow);
253        }
254        self.verify_msg_sig(msg)
255    }
256
257    /// This is a helper to push that will help to make sure that the message
258    /// fits the parameters to be pushed to the `MessagePool`.
259    pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
260        self.check_message(&msg)?;
261        let ts = self.current_tipset();
262        self.add_tipset(msg, &ts, false)?;
263        Ok(())
264    }
265
266    /// Verify the message signature. first check if it has already been
267    /// verified and put into cache. If it has not, then manually verify it
268    /// then put it into cache for future use.
269    fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
270        let cid = msg.cid();
271
272        if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
273            return Ok(());
274        }
275
276        msg.verify(self.chain_config.eth_chain_id)
277            .map_err(|e| Error::Other(e.to_string()))?;
278
279        self.sig_val_cache.push(cid.into(), ());
280
281        Ok(())
282    }
283
284    /// Verify the `state_sequence` and balance for the sender of the message
285    /// given then call `add_locked` to finish adding the `signed_message`
286    /// to pending.
287    fn add_tipset(&self, msg: SignedMessage, cur_ts: &Tipset, local: bool) -> Result<bool, Error> {
288        let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
289
290        if sequence > msg.message().sequence {
291            return Err(Error::SequenceTooLow);
292        }
293
294        let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
295
296        // This message can only be included in the next epoch and beyond, hence the +1.
297        let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
298        let eth_chain_id = self.chain_config.eth_chain_id;
299        if msg.signature().signature_type() == SignatureType::Delegated
300            && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
301        {
302            return Err(Error::Other(
303                "Invalid Ethereum message for the current network version".to_owned(),
304            ));
305        }
306        if !is_valid_for_sending(nv, &sender_actor) {
307            return Err(Error::Other(
308                "Sender actor is not a valid top-level sender".to_owned(),
309            ));
310        }
311
312        let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
313
314        let balance = self.get_state_balance(&msg.from(), cur_ts)?;
315
316        let msg_balance = msg.required_funds();
317        if balance < msg_balance {
318            return Err(Error::NotEnoughFunds);
319        }
320        self.add_helper(msg)?;
321        Ok(publish)
322    }
323
324    /// Finish verifying signed message before adding it to the pending `mset`
325    /// hash-map. If an entry in the hash-map does not yet exist, create a
326    /// new `mset` that will correspond to the from message and push it to
327    /// the pending hash-map.
328    fn add_helper(&self, msg: SignedMessage) -> Result<(), Error> {
329        let from = msg.from();
330        let cur_ts = self.current_tipset();
331        add_helper(
332            self.api.as_ref(),
333            self.bls_sig_cache.as_ref(),
334            self.pending.as_ref(),
335            msg,
336            self.get_state_sequence(&from, &cur_ts)?,
337        )
338    }
339
340    /// Get the sequence for a given address, return Error if there is a failure
341    /// to retrieve the respective sequence.
342    pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
343        let cur_ts = self.current_tipset();
344
345        let sequence = self.get_state_sequence(addr, &cur_ts)?;
346
347        let pending = self.pending.read();
348
349        let msgset = pending.get(addr);
350        match msgset {
351            Some(mset) => {
352                if sequence > mset.next_sequence {
353                    return Ok(sequence);
354                }
355                Ok(mset.next_sequence)
356            }
357            None => Ok(sequence),
358        }
359    }
360
361    /// Get the state of the sequence for a given address in `cur_ts`.
362    fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
363        let actor = self.api.get_actor_after(addr, cur_ts)?;
364        Ok(actor.sequence)
365    }
366
367    /// Get the state balance for the actor that corresponds to the supplied
368    /// address and tipset, if this actor does not exist, return an error.
369    fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
370        let actor = self.api.get_actor_after(addr, ts)?;
371        Ok(TokenAmount::from(&actor.balance))
372    }
373
374    /// Return a tuple that contains a vector of all signed messages and the
375    /// current tipset for self.
376    pub fn pending(&self) -> Result<(Vec<SignedMessage>, Tipset), Error> {
377        let mut out: Vec<SignedMessage> = Vec::new();
378        let pending = self.pending.read().clone();
379
380        for (addr, _) in pending {
381            out.append(
382                self.pending_for(&addr)
383                    .ok_or(Error::InvalidFromAddr)?
384                    .as_mut(),
385            )
386        }
387
388        let cur_ts = self.current_tipset();
389
390        Ok((out, cur_ts))
391    }
392
393    /// Return a Vector of signed messages for a given from address. This vector
394    /// will be sorted by each `message`'s sequence. If no corresponding
395    /// messages found, return None result type.
396    pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
397        let pending = self.pending.read();
398        let mset = pending.get(a)?;
399        if mset.msgs.is_empty() {
400            return None;
401        }
402
403        Some(
404            mset.msgs
405                .values()
406                .cloned()
407                .sorted_by_key(|v| v.message().sequence)
408                .collect(),
409        )
410    }
411
412    /// Return Vector of signed messages given a block header for self.
413    pub fn messages_for_blocks<'a>(
414        &self,
415        blks: impl Iterator<Item = &'a CachingBlockHeader>,
416    ) -> Result<Vec<SignedMessage>, Error> {
417        let mut msg_vec: Vec<SignedMessage> = Vec::new();
418
419        for block in blks {
420            let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
421
422            msg_vec.append(smsgs.as_mut());
423            for msg in umsg {
424                let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?;
425                msg_vec.push(smsg)
426            }
427        }
428        Ok(msg_vec)
429    }
430
431    /// Loads local messages to the message pool to be applied.
432    pub fn load_local(&mut self) -> Result<(), Error> {
433        let mut local_msgs = self.local_msgs.write();
434        for k in local_msgs.iter().cloned().collect_vec() {
435            self.add(k.clone()).unwrap_or_else(|err| {
436                if err == Error::SequenceTooLow {
437                    warn!("error adding message: {:?}", err);
438                    local_msgs.remove(&k);
439                }
440            })
441        }
442
443        Ok(())
444    }
445
446    #[cfg(test)]
447    pub fn get_config(&self) -> &MpoolConfig {
448        &self.config
449    }
450
451    #[cfg(test)]
452    pub fn set_config<DB: SettingsStore>(
453        &mut self,
454        db: &DB,
455        cfg: MpoolConfig,
456    ) -> Result<(), Error> {
457        cfg.save_config(db)
458            .map_err(|e| Error::Other(e.to_string()))?;
459        self.config = cfg;
460        Ok(())
461    }
462}
463
464impl<T> MessagePool<T>
465where
466    T: Provider + Send + Sync + 'static,
467{
468    /// Creates a new `MessagePool` instance.
469    pub fn new(
470        api: T,
471        network_sender: flume::Sender<NetworkMessage>,
472        config: MpoolConfig,
473        chain_config: Arc<ChainConfig>,
474        services: &mut JoinSet<anyhow::Result<()>>,
475    ) -> Result<MessagePool<T>, Error>
476    where
477        T: Provider,
478    {
479        let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
480        let pending = Arc::new(SyncRwLock::new(HashMap::new()));
481        let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
482        let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
483            "bls_sig".into(),
484            BLS_SIG_CACHE_SIZE,
485        ));
486        let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
487            "sig_val".into(),
488            SIG_VAL_CACHE_SIZE,
489        ));
490        let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
491        let republished = Arc::new(SyncRwLock::new(HashSet::new()));
492        let block_delay = chain_config.block_delay_secs;
493
494        let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
495        let mut mp = MessagePool {
496            local_addrs,
497            pending,
498            cur_tipset: tipset,
499            api: Arc::new(api),
500            bls_sig_cache,
501            sig_val_cache,
502            local_msgs,
503            republished,
504            config,
505            network_sender,
506            repub_trigger,
507            chain_config: Arc::clone(&chain_config),
508        };
509
510        mp.load_local()?;
511
512        let mut subscriber = mp.api.subscribe_head_changes();
513
514        let api = mp.api.clone();
515        let bls_sig_cache = mp.bls_sig_cache.clone();
516        let pending = mp.pending.clone();
517        let republished = mp.republished.clone();
518
519        let cur_tipset = mp.cur_tipset.clone();
520        let repub_trigger = mp.repub_trigger.clone();
521
522        // Reacts to new HeadChanges
523        services.spawn(async move {
524            loop {
525                match subscriber.recv().await {
526                    Ok(ts) => {
527                        let (cur, rev, app) = match ts {
528                            HeadChange::Apply(tipset) => {
529                                (cur_tipset.clone(), Vec::new(), vec![tipset])
530                            }
531                        };
532                        head_change(
533                            api.as_ref(),
534                            bls_sig_cache.as_ref(),
535                            repub_trigger.clone(),
536                            republished.as_ref(),
537                            pending.as_ref(),
538                            cur.as_ref(),
539                            rev,
540                            app,
541                        )
542                        .await
543                        .context("Error changing head")?;
544                    }
545                    Err(RecvError::Lagged(e)) => {
546                        warn!("Head change subscriber lagged: skipping {} events", e);
547                    }
548                    Err(RecvError::Closed) => {
549                        break Ok(());
550                    }
551                }
552            }
553        });
554
555        let api = mp.api.clone();
556        let pending = mp.pending.clone();
557        let cur_tipset = mp.cur_tipset.clone();
558        let republished = mp.republished.clone();
559        let local_addrs = mp.local_addrs.clone();
560        let network_sender = Arc::new(mp.network_sender.clone());
561        let republish_interval = (10 * block_delay + chain_config.propagation_delay_secs) as u64;
562        // Reacts to republishing requests
563        services.spawn(async move {
564            let mut repub_trigger_rx = repub_trigger_rx.stream();
565            let mut interval = interval(Duration::from_secs(republish_interval));
566            loop {
567                tokio::select! {
568                    _ = interval.tick() => (),
569                    _ = repub_trigger_rx.next() => (),
570                }
571                if let Err(e) = republish_pending_messages(
572                    api.as_ref(),
573                    network_sender.as_ref(),
574                    pending.as_ref(),
575                    cur_tipset.as_ref(),
576                    republished.as_ref(),
577                    local_addrs.as_ref(),
578                    &chain_config,
579                )
580                .await
581                {
582                    warn!("Failed to republish pending messages: {}", e.to_string());
583                }
584            }
585        });
586        Ok(mp)
587    }
588}
589
590// Helpers for MessagePool
591
592/// Finish verifying signed message before adding it to the pending `mset`
593/// hash-map. If an entry in the hash-map does not yet exist, create a new
594/// `mset` that will correspond to the from message and push it to the pending
595/// hash-map.
596pub(in crate::message_pool) fn add_helper<T>(
597    api: &T,
598    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
599    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
600    msg: SignedMessage,
601    sequence: u64,
602) -> Result<(), Error>
603where
604    T: Provider,
605{
606    if msg.signature().signature_type() == SignatureType::Bls {
607        bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
608    }
609
610    api.put_message(&ChainMessage::Signed(msg.clone()))?;
611    api.put_message(&ChainMessage::Unsigned(msg.message().clone()))?;
612
613    let mut pending = pending.write();
614    let msett = pending.get_mut(&msg.from());
615    match msett {
616        Some(mset) => mset.add_trusted(api, msg)?,
617        None => {
618            let mut mset = MsgSet::new(sequence);
619            let from = msg.from();
620            mset.add_trusted(api, msg)?;
621            pending.insert(from, mset);
622        }
623    }
624
625    Ok(())
626}
627
628fn verify_msg_before_add(
629    m: &SignedMessage,
630    cur_ts: &Tipset,
631    local: bool,
632    chain_config: &ChainConfig,
633) -> Result<bool, Error> {
634    let epoch = cur_ts.epoch();
635    let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
636        .on_chain_message(m.chain_length()?);
637    valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
638    if !cur_ts.block_headers().is_empty() {
639        let base_fee = &cur_ts.block_headers().first().parent_base_fee;
640        let base_fee_lower_bound =
641            get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
642        if m.gas_fee_cap() < base_fee_lower_bound {
643            if local {
644                warn!(
645                    "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",
646                    m.gas_fee_cap(),
647                    base_fee_lower_bound
648                );
649                return Ok(false);
650            }
651            return Err(Error::SoftValidationFailure(format!(
652                "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
653                m.gas_fee_cap(),
654                base_fee_lower_bound
655            )));
656        }
657    }
658    Ok(local)
659}
660
661/// Remove a message from pending given the from address and sequence.
662pub fn remove(
663    from: &Address,
664    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
665    sequence: u64,
666    applied: bool,
667) -> Result<(), Error> {
668    let mut pending = pending.write();
669    let mset = if let Some(mset) = pending.get_mut(from) {
670        mset
671    } else {
672        return Ok(());
673    };
674
675    mset.rm(sequence, applied);
676
677    if mset.msgs.is_empty() {
678        pending.remove(from);
679    }
680
681    Ok(())
682}
683
684#[cfg(test)]
685mod tests {
686    use crate::message_pool::test_provider::TestApi;
687
688    use super::*;
689    use crate::shim::message::Message as ShimMessage;
690
691    // Regression test for https://github.com/ChainSafe/forest/pull/6118 which fixed a bogus 100M
692    // gas limit. There are no limits on a single message.
693    #[test]
694    fn add_helper_message_gas_limit_test() {
695        let api = TestApi::default();
696        let bls_sig_cache = SizeTrackingLruCache::new_mocked();
697        let pending = SyncRwLock::new(HashMap::new());
698        let message = ShimMessage {
699            gas_limit: 666_666_666,
700            ..ShimMessage::default()
701        };
702        let msg = SignedMessage::mock_bls_signed_message(message);
703        let sequence = msg.message().sequence;
704        let res = add_helper(&api, &bls_sig_cache, &pending, msg, sequence);
705        assert!(res.is_ok());
706    }
707
708    // Test that RBF (Replace By Fee) is allowed even when at max_actor_pending_messages capacity
709    // This matches Lotus behavior where the check is: https://github.com/filecoin-project/lotus/blob/5f32d00550ddd2f2d0f9abe97dbae07615f18547/chain/messagepool/messagepool.go#L296-L299
710    #[test]
711    fn test_rbf_at_capacity() {
712        use crate::shim::econ::TokenAmount;
713
714        let api = TestApi::with_max_actor_pending_messages(10);
715        let mut mset = MsgSet::new(0);
716
717        // Fill up to capacity (10 messages)
718        for i in 0..10 {
719            let message = ShimMessage {
720                sequence: i,
721                gas_premium: TokenAmount::from_atto(100u64),
722                ..ShimMessage::default()
723            };
724            let msg = SignedMessage::mock_bls_signed_message(message);
725            let res = mset.add_trusted(&api, msg);
726            assert!(res.is_ok(), "Failed to add message {}: {:?}", i, res);
727        }
728
729        // Should reject adding a NEW message (sequence 10) when at capacity
730        let message_new = ShimMessage {
731            sequence: 10,
732            gas_premium: TokenAmount::from_atto(100u64),
733            ..ShimMessage::default()
734        };
735        let msg_new = SignedMessage::mock_bls_signed_message(message_new);
736        let res_new = mset.add_trusted(&api, msg_new);
737        assert!(matches!(res_new, Err(Error::TooManyPendingMessages(_, _))));
738
739        // Should ALLOW replacing an existing message (RBF) even when at capacity
740        // Replace message with sequence 5 with higher gas premium
741        let message_rbf = ShimMessage {
742            sequence: 5,
743            gas_premium: TokenAmount::from_atto(200u64),
744            ..ShimMessage::default()
745        };
746        let msg_rbf = SignedMessage::mock_bls_signed_message(message_rbf);
747        let res_rbf = mset.add_trusted(&api, msg_rbf);
748        assert!(
749            res_rbf.is_ok(),
750            "RBF should be allowed at capacity: {:?}",
751            res_rbf
752        );
753    }
754}