Skip to main content

forest/message_pool/msgpool/
msg_pool.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4// Contains the implementation of Message Pool component.
5// The Message Pool is the component of forest that handles pending messages for
6// inclusion in the chain. Messages are added either directly for locally
7// published messages or through pubsub propagation.
8
9use std::{num::NonZeroUsize, sync::Arc, time::Duration};
10
11use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey};
12use crate::chain::{HeadChanges, MINIMUM_BASE_FEE};
13#[cfg(test)]
14use crate::db::SettingsStore;
15use crate::eth::is_valid_eth_tx_for_sending;
16use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
17use crate::message::{ChainMessage, MessageRead as _, SignedMessage, valid_for_block_inclusion};
18use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
19use crate::rpc::eth::types::EthAddress;
20use crate::shim::{
21    address::{Address, Protocol},
22    crypto::{Signature, SignatureType},
23    econ::TokenAmount,
24    gas::{Gas, price_list_by_network_version},
25};
26use crate::state_manager::IdToAddressCache;
27use crate::state_manager::utils::is_valid_for_sending;
28use crate::utils::ShallowClone as _;
29use crate::utils::cache::SizeTrackingLruCache;
30use crate::utils::get_size::CidWrapper;
31use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
32use anyhow::Context as _;
33use cid::Cid;
34use futures::StreamExt;
35use fvm_ipld_encoding::to_vec;
36use get_size2::GetSize;
37use itertools::Itertools;
38use nonzero_ext::nonzero;
39use parking_lot::RwLock as SyncRwLock;
40use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
41use tracing::warn;
42
43use crate::message_pool::{
44    config::MpoolConfig,
45    errors::Error,
46    head_change, metrics,
47    msgpool::{
48        BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, RBF_DENOM, RBF_NUM, recover_sig,
49        republish_pending_messages,
50    },
51    provider::Provider,
52    utils::get_base_fee_lower_bound,
53};
54
55// LruCache sizes have been taken from the lotus implementation
56const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
57const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
58const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize);
59const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize);
60
61#[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)]
62pub(crate) struct StateNonceCacheKey {
63    tipset_key: TipsetKey,
64    addr: Address,
65}
66
67pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
68pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
69const MAX_NONCE_GAP: u64 = 4;
70/// Maximum size of a serialized message in bytes. This is an anti-DOS measure to prevent
71/// large messages from being added to the message pool.
72const MAX_MESSAGE_SIZE: usize = 64 << 10; // 64 KiB
73
74/// Trust policy for whether a message is from a trusted or untrusted source.
75/// Untrusted sources are subject to stricter limits.
76#[derive(Clone, Copy, Debug, PartialEq, Eq)]
77pub enum TrustPolicy {
78    Trusted,
79    Untrusted,
80}
81
82/// Strictness policy for pending insertion enforces nonce-gap and replace-by-fee-during-gap rules.
83#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum StrictnessPolicy {
85    Strict,
86    Relaxed,
87}
88
89/// Simple structure that contains a hash-map of messages where k: a message
90/// from address, v: a message which corresponds to that address.
91#[derive(Clone, Default, Debug)]
92pub struct MsgSet {
93    pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
94    next_sequence: u64,
95}
96
97impl MsgSet {
98    /// Generate a new `MsgSet` with an empty hash-map and setting the sequence
99    /// specifically.
100    pub fn new(sequence: u64) -> Self {
101        MsgSet {
102            msgs: HashMap::new(),
103            next_sequence: sequence,
104        }
105    }
106
107    /// Add a signed message to the `MsgSet`. Increase `next_sequence` if the
108    /// message has a sequence greater than any existing message sequence.
109    /// Use this method when pushing a message coming from trusted sources.
110    pub fn add_trusted<T>(
111        &mut self,
112        api: &T,
113        m: SignedMessage,
114        strictness: StrictnessPolicy,
115    ) -> Result<(), Error>
116    where
117        T: Provider,
118    {
119        self.add(api, m, strictness, true)
120    }
121
122    /// Add a signed message to the `MsgSet`. Increase `next_sequence` if the
123    /// message has a sequence greater than any existing message sequence.
124    /// Use this method when pushing a message coming from untrusted sources.
125    pub fn add_untrusted<T>(
126        &mut self,
127        api: &T,
128        m: SignedMessage,
129        strictness: StrictnessPolicy,
130    ) -> Result<(), Error>
131    where
132        T: Provider,
133    {
134        self.add(api, m, strictness, false)
135    }
136
137    /// Insert a message into this set, maintaining `next_sequence`.
138    ///
139    /// - If the message nonce equals `next_sequence`, advance past any
140    ///   consecutive existing messages (gap-filling loop).
141    /// - If the nonce exceeds `next_sequence + max_nonce_gap` and [`StrictnessPolicy::Strict`],
142    ///   reject with [`Error::NonceGap`].
143    /// - Replace-by-fee for an existing nonce is rejected when strict and
144    ///   a nonce gap is present.
145    ///
146    /// [`StrictnessPolicy`] and `trusted` are independent: strictness controls whether
147    /// nonce gap checks run, while `trusted` sets `max_nonce_gap` [`MAX_NONCE_GAP`]
148    /// and the per-actor pending message limit.
149    pub(in crate::message_pool) fn add<T>(
150        &mut self,
151        api: &T,
152        m: SignedMessage,
153        strictness: StrictnessPolicy,
154        trusted: bool,
155    ) -> Result<(), Error>
156    where
157        T: Provider,
158    {
159        let strict = matches!(strictness, StrictnessPolicy::Strict);
160        let max_nonce_gap: u64 = if trusted { MAX_NONCE_GAP } else { 0 };
161        let max_actor_pending_messages = if trusted {
162            api.max_actor_pending_messages()
163        } else {
164            api.max_untrusted_actor_pending_messages()
165        };
166
167        let mut next_nonce = self.next_sequence;
168        let nonce_gap = if m.sequence() == next_nonce {
169            next_nonce += 1;
170            while self.msgs.contains_key(&next_nonce) {
171                next_nonce += 1;
172            }
173            false
174        } else if strict && m.sequence() > next_nonce + max_nonce_gap {
175            tracing::debug!(
176                nonce = m.sequence(),
177                next_nonce,
178                "message nonce has too big a gap from expected nonce"
179            );
180            return Err(Error::NonceGap);
181        } else {
182            m.sequence() > next_nonce
183        };
184
185        let has_existing = if let Some(exms) = self.msgs.get(&m.sequence()) {
186            if strict && nonce_gap {
187                tracing::debug!(
188                    nonce = m.sequence(),
189                    next_nonce,
190                    "rejecting replace by fee because of nonce gap"
191                );
192                return Err(Error::NonceGap);
193            }
194            if m.cid() != exms.cid() {
195                let premium = &exms.message().gas_premium;
196                let min_price = premium.clone()
197                    + ((premium * RBF_NUM).div_floor(RBF_DENOM))
198                    + TokenAmount::from_atto(1u8);
199                if m.message().gas_premium <= min_price {
200                    return Err(Error::GasPriceTooLow);
201                }
202            } else {
203                return Err(Error::DuplicateSequence);
204            }
205            true
206        } else {
207            false
208        };
209
210        // Only check the limit when adding a new message, not when replacing an existing one (RBF)
211        if !has_existing && self.msgs.len() as u64 >= max_actor_pending_messages {
212            return Err(Error::TooManyPendingMessages(
213                m.message.from().to_string(),
214                trusted,
215            ));
216        }
217
218        if strict && nonce_gap {
219            tracing::debug!(
220                from = %m.from(),
221                nonce = m.sequence(),
222                next_nonce,
223                "adding nonce-gapped message"
224            );
225        }
226
227        self.next_sequence = next_nonce;
228        if self.msgs.insert(m.sequence(), m).is_none() {
229            metrics::MPOOL_MESSAGE_TOTAL.inc();
230        }
231        Ok(())
232    }
233
234    /// Remove the message at `sequence` and adjust `next_sequence`.
235    ///
236    /// - **Applied** (included on-chain): advance `next_sequence` to
237    ///   `sequence + 1` if needed. For messages not in our pool, also run
238    ///   the gap-filling loop to advance past consecutive known messages.
239    /// - **Pruned** (evicted): rewind `next_sequence` to `sequence` if the
240    ///   removal creates a gap.
241    pub fn rm(&mut self, sequence: u64, applied: bool) {
242        if self.msgs.remove(&sequence).is_none() {
243            if applied && sequence >= self.next_sequence {
244                self.next_sequence = sequence + 1;
245                while self.msgs.contains_key(&self.next_sequence) {
246                    self.next_sequence += 1;
247                }
248            }
249            return;
250        }
251        metrics::MPOOL_MESSAGE_TOTAL.dec();
252
253        // adjust next sequence
254        if applied {
255            // we removed a (known) message because it was applied in a tipset
256            // we can't possibly have filled a gap in this case
257            if sequence >= self.next_sequence {
258                self.next_sequence = sequence + 1;
259            }
260            return;
261        }
262        // we removed a message because it was pruned
263        // we have to adjust the sequence if it creates a gap or rewinds state
264        if sequence < self.next_sequence {
265            self.next_sequence = sequence;
266        }
267    }
268}
269
270/// This contains all necessary information needed for the message pool.
271/// Keeps track of messages to apply, as well as context needed for verifying
272/// transactions.
273pub struct MessagePool<T> {
274    /// The local address of the client
275    local_addrs: Arc<SyncRwLock<Vec<Address>>>,
276    /// A map of pending messages where the key is the resolved key address
277    pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
278    /// The current tipset (a set of blocks)
279    pub cur_tipset: Arc<SyncRwLock<Tipset>>,
280    /// The underlying provider
281    pub api: Arc<T>,
282    /// Sender half to send messages to other components
283    pub network_sender: flume::Sender<NetworkMessage>,
284    /// A cache for BLS signature keyed by Cid
285    pub bls_sig_cache: SizeTrackingLruCache<CidWrapper, Signature>,
286    /// A cache for BLS signature keyed by Cid
287    pub sig_val_cache: SizeTrackingLruCache<CidWrapper, ()>,
288    /// Cache for ID address ID to key address resolution.
289    pub key_cache: IdToAddressCache,
290    /// Cache for state nonce lookups keyed by (`TipsetKey`, `Address`).
291    pub state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64>,
292    /// A set of republished messages identified by their Cid
293    pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
294    /// Acts as a signal to republish messages from the republished set of
295    /// messages
296    pub repub_trigger: flume::Sender<()>,
297    local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
298    /// Configurable parameters of the message pool
299    pub config: MpoolConfig,
300    /// Chain configuration
301    pub chain_config: Arc<ChainConfig>,
302}
303
304/// Resolve an address to its key form, checking the cache first.
305/// Non-ID addresses are returned unchanged.
306pub(in crate::message_pool) fn resolve_to_key<T: Provider>(
307    api: &T,
308    key_cache: &IdToAddressCache,
309    addr: &Address,
310    cur_ts: &Tipset,
311) -> Result<Address, Error> {
312    let id = addr.id().ok();
313    if let Some(id) = &id
314        && let Some(resolved) = key_cache.get_cloned(id)
315    {
316        return Ok(resolved);
317    }
318    let resolved = api.resolve_to_deterministic_address_at_finality(addr, cur_ts)?;
319    if let Some(id) = id {
320        key_cache.push(id, resolved);
321    }
322    Ok(resolved)
323}
324
325/// Get the state nonce for an address, accounting for messages already included in `cur_ts`.
326pub(in crate::message_pool) fn get_state_sequence<T: Provider>(
327    api: &T,
328    key_cache: &IdToAddressCache,
329    state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
330    addr: &Address,
331    cur_ts: &Tipset,
332) -> Result<u64, Error> {
333    let nk = StateNonceCacheKey {
334        tipset_key: cur_ts.key().clone(),
335        addr: *addr,
336    };
337
338    if let Some(cached) = state_nonce_cache.get_cloned(&nk) {
339        return Ok(cached);
340    }
341
342    let actor = api.get_actor_after(addr, cur_ts)?;
343    let mut next_nonce = actor.sequence;
344
345    if let (Ok(resolved), Ok(messages)) = (
346        resolve_to_key(api, key_cache, addr, cur_ts)
347            .inspect_err(|e| tracing::warn!(%addr, "failed to resolve address to key: {e:#}")),
348        api.messages_for_tipset(cur_ts)
349            .inspect_err(|e| tracing::warn!("failed to get messages for tipset: {e:#}")),
350    ) {
351        for msg in messages.iter() {
352            if let Ok(from) = resolve_to_key(api, key_cache, &msg.from(), cur_ts).inspect_err(
353                |e| tracing::warn!(from = %msg.from(), "failed to resolve message sender: {e:#}"),
354            ) && from == resolved
355            {
356                let n = msg.sequence() + 1;
357                if n > next_nonce {
358                    next_nonce = n;
359                }
360            }
361        }
362    }
363
364    state_nonce_cache.push(nk, next_nonce);
365    Ok(next_nonce)
366}
367
368impl<T> MessagePool<T>
369where
370    T: Provider,
371{
372    /// Gets the current tipset
373    pub fn current_tipset(&self) -> Tipset {
374        self.cur_tipset.read().clone()
375    }
376
377    pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result<Address, Error> {
378        resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts)
379    }
380
381    /// Add a signed message to the pool and its address.
382    fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
383        let cur_ts = self.current_tipset();
384        let resolved = self.resolve_to_key(&m.from(), &cur_ts)?;
385        self.local_addrs.write().push(resolved);
386        self.local_msgs.write().insert(m);
387        Ok(())
388    }
389
390    /// Push a signed message to the `MessagePool`. Additionally performs basic
391    /// checks on the validity of a message.
392    pub async fn push_internal(
393        &self,
394        msg: SignedMessage,
395        trust_policy: TrustPolicy,
396    ) -> Result<Cid, Error> {
397        self.check_message(&msg)?;
398        let cid = msg.cid();
399        let cur_ts = self.current_tipset();
400        let publish = self.add_tipset(msg.clone(), &cur_ts, true, trust_policy)?;
401        let msg_ser = to_vec(&msg)?;
402        let network_name = self.chain_config.network.genesis_name();
403        self.add_local(msg)?;
404        if publish {
405            self.network_sender
406                .send_async(NetworkMessage::PubsubMessage {
407                    topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
408                    message: msg_ser,
409                })
410                .await
411                .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
412        }
413        Ok(cid)
414    }
415
416    /// Push a signed message to the `MessagePool` from an trusted source.
417    pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
418        self.push_internal(msg, TrustPolicy::Trusted).await
419    }
420
421    /// Push a signed message to the `MessagePool` from an untrusted source.
422    pub async fn push_untrusted(&self, msg: SignedMessage) -> Result<Cid, Error> {
423        self.push_internal(msg, TrustPolicy::Untrusted).await
424    }
425
426    fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
427        if to_vec(msg)?.len() > MAX_MESSAGE_SIZE {
428            return Err(Error::MessageTooBig);
429        }
430        let to = msg.message().to();
431        if to.protocol() == Protocol::Delegated {
432            EthAddress::from_filecoin_address(&to).context(format!(
433                "message recipient {to} is a delegated address but not a valid Eth Address"
434            ))?;
435        }
436        valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
437        if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
438            return Err(Error::MessageValueTooHigh);
439        }
440        if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
441            return Err(Error::GasFeeCapTooLow);
442        }
443        self.verify_msg_sig(msg)
444    }
445
446    /// This is a helper to push that will help to make sure that the message
447    /// fits the parameters to be pushed to the `MessagePool`.
448    pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
449        self.check_message(&msg)?;
450        let ts = self.current_tipset();
451        self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?;
452        Ok(())
453    }
454
455    /// Verify the message signature. first check if it has already been
456    /// verified and put into cache. If it has not, then manually verify it
457    /// then put it into cache for future use.
458    fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
459        let cid = msg.cid();
460
461        if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
462            return Ok(());
463        }
464
465        msg.verify(self.chain_config.eth_chain_id)
466            .map_err(|e| Error::Other(e.to_string()))?;
467
468        self.sig_val_cache.push(cid.into(), ());
469
470        Ok(())
471    }
472
473    /// Verify the `state_sequence` and balance for the sender of the message
474    /// given then call `add_locked` to finish adding the `signed_message`
475    /// to pending.
476    fn add_tipset(
477        &self,
478        msg: SignedMessage,
479        cur_ts: &Tipset,
480        local: bool,
481        trust_policy: TrustPolicy,
482    ) -> Result<bool, Error> {
483        let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
484
485        if sequence > msg.message().sequence {
486            return Err(Error::SequenceTooLow);
487        }
488
489        let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
490
491        // This message can only be included in the next epoch and beyond, hence the +1.
492        let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
493        let eth_chain_id = self.chain_config.eth_chain_id;
494        if msg.signature().signature_type() == SignatureType::Delegated
495            && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
496        {
497            return Err(Error::Other(
498                "Invalid Ethereum message for the current network version".to_owned(),
499            ));
500        }
501        if !is_valid_for_sending(nv, &sender_actor) {
502            return Err(Error::Other(
503                "Sender actor is not a valid top-level sender".to_owned(),
504            ));
505        }
506
507        let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
508
509        let balance = self.get_state_balance(&msg.from(), cur_ts)?;
510
511        let msg_balance = msg.required_funds();
512        if balance < msg_balance {
513            return Err(Error::NotEnoughFunds);
514        }
515        let strictness = if local {
516            StrictnessPolicy::Relaxed
517        } else {
518            StrictnessPolicy::Strict
519        };
520        self.add_helper(msg, trust_policy, strictness)?;
521        Ok(publish)
522    }
523
524    /// Finish verifying signed message before adding it to the pending `mset`
525    /// hash-map. If an entry in the hash-map does not yet exist, create a
526    /// new `mset` that will correspond to the from message and push it to
527    /// the pending hash-map.
528    fn add_helper(
529        &self,
530        msg: SignedMessage,
531        trust_policy: TrustPolicy,
532        strictness: StrictnessPolicy,
533    ) -> Result<(), Error> {
534        let from = msg.from();
535        let cur_ts = self.current_tipset();
536        add_helper(
537            self.api.as_ref(),
538            &self.bls_sig_cache,
539            self.pending.as_ref(),
540            &self.key_cache,
541            &cur_ts,
542            msg,
543            self.get_state_sequence(&from, &cur_ts)?,
544            trust_policy,
545            strictness,
546        )
547    }
548
549    /// Get the sequence for a given address, return Error if there is a failure
550    /// to retrieve the respective sequence.
551    pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
552        let cur_ts = self.current_tipset();
553
554        let sequence = self.get_state_sequence(addr, &cur_ts)?;
555
556        let pending = self.pending.read();
557        let msgset = self
558            .resolve_to_key(addr, &cur_ts)
559            .ok()
560            .and_then(|resolved| pending.get(&resolved))
561            .or_else(|| pending.get(addr));
562        match msgset {
563            Some(mset) => {
564                if sequence > mset.next_sequence {
565                    return Ok(sequence);
566                }
567                Ok(mset.next_sequence)
568            }
569            None => Ok(sequence),
570        }
571    }
572
573    /// Get the state of the sequence for a given address in `cur_ts`.
574    fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
575        get_state_sequence(
576            self.api.as_ref(),
577            &self.key_cache,
578            &self.state_nonce_cache,
579            addr,
580            cur_ts,
581        )
582    }
583
584    /// Get the state balance for the actor that corresponds to the supplied
585    /// address and tipset, if this actor does not exist, return an error.
586    fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
587        let actor = self.api.get_actor_after(addr, ts)?;
588        Ok(TokenAmount::from(&actor.balance))
589    }
590
591    /// Return a tuple that contains a vector of all signed messages and the
592    /// current tipset for self.
593    pub fn pending(&self) -> (Vec<SignedMessage>, Tipset) {
594        let pending = self.pending.read().clone();
595        let len = pending.values().map(|mset| mset.msgs.len()).sum();
596        let mut out = Vec::with_capacity(len);
597
598        for mset in pending.into_values() {
599            out.extend(
600                mset.msgs
601                    .into_values()
602                    .sorted_unstable_by_key(|m| m.message().sequence),
603            );
604        }
605
606        let cur_ts = self.current_tipset();
607
608        (out, cur_ts)
609    }
610
611    /// Return a Vector of signed messages for a given from address. This vector
612    /// will be sorted by each `message`'s sequence. If no corresponding
613    /// messages found, return None result type.
614    pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
615        let cur_ts = self.current_tipset();
616        let resolved = self
617            .resolve_to_key(a, &cur_ts)
618            .inspect_err(|e| tracing::debug!(%a, "pending_for: failed to resolve address: {e:#}"))
619            .ok()?;
620        let pending = self.pending.read();
621        let mset = pending.get(&resolved)?;
622        if mset.msgs.is_empty() {
623            return None;
624        }
625
626        Some(
627            mset.msgs
628                .values()
629                .cloned()
630                .sorted_by_key(|v| v.message().sequence)
631                .collect(),
632        )
633    }
634
635    /// Return Vector of signed messages given a block header for self.
636    pub fn messages_for_blocks<'a>(
637        &self,
638        blks: impl Iterator<Item = &'a CachingBlockHeader>,
639    ) -> Result<Vec<SignedMessage>, Error> {
640        let mut msg_vec: Vec<SignedMessage> = Vec::new();
641
642        for block in blks {
643            let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
644
645            msg_vec.append(smsgs.as_mut());
646            for msg in umsg {
647                let smsg = recover_sig(&self.bls_sig_cache, msg)?;
648                msg_vec.push(smsg)
649            }
650        }
651        Ok(msg_vec)
652    }
653
654    /// Loads local messages to the message pool to be applied.
655    pub fn load_local(&mut self) -> Result<(), Error> {
656        let mut local_msgs = self.local_msgs.write();
657        for k in local_msgs.iter().cloned().collect_vec() {
658            self.add(k.clone()).unwrap_or_else(|err| {
659                if err == Error::SequenceTooLow {
660                    warn!("error adding message: {:?}", err);
661                    local_msgs.remove(&k);
662                }
663            })
664        }
665
666        Ok(())
667    }
668
669    #[cfg(test)]
670    pub fn get_config(&self) -> &MpoolConfig {
671        &self.config
672    }
673
674    #[cfg(test)]
675    pub fn set_config<DB: SettingsStore>(
676        &mut self,
677        db: &DB,
678        cfg: MpoolConfig,
679    ) -> Result<(), Error> {
680        cfg.save_config(db)
681            .map_err(|e| Error::Other(e.to_string()))?;
682        self.config = cfg;
683        Ok(())
684    }
685
686    #[cfg(test)]
687    pub async fn apply_head_change(
688        &self,
689        revert: Vec<crate::blocks::Tipset>,
690        apply: Vec<crate::blocks::Tipset>,
691    ) -> Result<(), Error>
692    where
693        T: 'static,
694    {
695        head_change(
696            self.api.as_ref(),
697            &self.bls_sig_cache,
698            self.repub_trigger.clone(),
699            self.republished.as_ref(),
700            self.pending.as_ref(),
701            self.cur_tipset.as_ref(),
702            &self.key_cache,
703            &self.state_nonce_cache,
704            revert,
705            apply,
706        )
707        .await
708    }
709}
710
711impl<T> MessagePool<T>
712where
713    T: Provider + Send + Sync + 'static,
714{
715    /// Creates a new `MessagePool` instance.
716    pub fn new(
717        api: T,
718        network_sender: flume::Sender<NetworkMessage>,
719        config: MpoolConfig,
720        chain_config: Arc<ChainConfig>,
721        services: &mut JoinSet<anyhow::Result<()>>,
722    ) -> Result<MessagePool<T>, Error>
723    where
724        T: Provider,
725    {
726        let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
727        let pending = Arc::new(SyncRwLock::new(HashMap::new()));
728        let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
729        let bls_sig_cache =
730            SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE);
731        let sig_val_cache =
732            SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE);
733        let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE);
734        let state_nonce_cache =
735            SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE);
736        let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
737        let republished = Arc::new(SyncRwLock::new(HashSet::new()));
738        let block_delay = chain_config.block_delay_secs;
739
740        let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
741        let mut mp = MessagePool {
742            local_addrs,
743            pending,
744            cur_tipset: tipset,
745            api: Arc::new(api),
746            bls_sig_cache,
747            sig_val_cache,
748            key_cache,
749            state_nonce_cache,
750            local_msgs,
751            republished,
752            config,
753            network_sender,
754            repub_trigger,
755            chain_config: Arc::clone(&chain_config),
756        };
757
758        mp.load_local()?;
759
760        let mut head_changes_rx = mp.api.subscribe_head_changes();
761
762        let api = mp.api.clone();
763        let bls_sig_cache = mp.bls_sig_cache.shallow_clone();
764        let pending = mp.pending.clone();
765        let republished = mp.republished.clone();
766        let key_cache = mp.key_cache.shallow_clone();
767        let state_nonce_cache = mp.state_nonce_cache.shallow_clone();
768
769        let current_ts = mp.cur_tipset.clone();
770        let repub_trigger = mp.repub_trigger.clone();
771
772        // Reacts to new HeadChanges
773        services.spawn(async move {
774            loop {
775                match head_changes_rx.recv().await {
776                    Ok(HeadChanges { reverts, applies }) => {
777                        if let Err(e) = head_change(
778                            api.as_ref(),
779                            &bls_sig_cache,
780                            repub_trigger.clone(),
781                            republished.as_ref(),
782                            pending.as_ref(),
783                            &current_ts,
784                            &key_cache,
785                            &state_nonce_cache,
786                            reverts,
787                            applies,
788                        )
789                        .await
790                        {
791                            tracing::warn!("Error changing head: {e}");
792                        }
793                    }
794                    Err(RecvError::Lagged(e)) => {
795                        warn!("Head change subscriber lagged: skipping {e} events");
796                    }
797                    Err(RecvError::Closed) => {
798                        break Ok(());
799                    }
800                }
801            }
802        });
803
804        let api = mp.api.clone();
805        let pending = mp.pending.clone();
806        let cur_tipset = mp.cur_tipset.clone();
807        let republished = mp.republished.clone();
808        let local_addrs = mp.local_addrs.clone();
809        let key_cache = mp.key_cache.shallow_clone();
810        let network_sender = Arc::new(mp.network_sender.clone());
811        let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs);
812        // Reacts to republishing requests
813        services.spawn(async move {
814            let mut repub_trigger_rx = repub_trigger_rx.stream();
815            let mut interval = interval(Duration::from_secs(republish_interval));
816            loop {
817                tokio::select! {
818                    _ = interval.tick() => (),
819                    _ = repub_trigger_rx.next() => (),
820                }
821                if let Err(e) = republish_pending_messages(
822                    api.as_ref(),
823                    network_sender.as_ref(),
824                    pending.as_ref(),
825                    cur_tipset.as_ref(),
826                    republished.as_ref(),
827                    local_addrs.as_ref(),
828                    &key_cache,
829                    &chain_config,
830                )
831                .await
832                {
833                    warn!("Failed to republish pending messages: {}", e.to_string());
834                }
835            }
836        });
837        Ok(mp)
838    }
839}
840
841// Helpers for MessagePool
842
843/// Finish verifying signed message before adding it to the pending `mset`
844/// hash-map. If an entry in the hash-map does not yet exist, create a new
845/// `mset` that will correspond to the from message and push it to the pending
846/// hash-map.
847#[allow(clippy::too_many_arguments)]
848pub(in crate::message_pool) fn add_helper<T>(
849    api: &T,
850    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
851    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
852    key_cache: &IdToAddressCache,
853    cur_ts: &Tipset,
854    msg: SignedMessage,
855    sequence: u64,
856    trust_policy: TrustPolicy,
857    strictness: StrictnessPolicy,
858) -> Result<(), Error>
859where
860    T: Provider,
861{
862    if msg.signature().signature_type() == SignatureType::Bls {
863        bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
864    }
865
866    api.put_message(&ChainMessage::Signed(msg.clone().into()))?;
867    api.put_message(&ChainMessage::Unsigned(msg.message().clone().into()))?;
868
869    let resolved_from = resolve_to_key(api, key_cache, &msg.from(), cur_ts)?;
870    let mut pending = pending.write();
871    let mset = pending
872        .entry(resolved_from)
873        .or_insert_with(|| MsgSet::new(sequence));
874    match trust_policy {
875        TrustPolicy::Trusted => mset.add_trusted(api, msg, strictness)?,
876        TrustPolicy::Untrusted => mset.add_untrusted(api, msg, strictness)?,
877    }
878
879    Ok(())
880}
881
882fn verify_msg_before_add(
883    m: &SignedMessage,
884    cur_ts: &Tipset,
885    local: bool,
886    chain_config: &ChainConfig,
887) -> Result<bool, Error> {
888    let epoch = cur_ts.epoch();
889    let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
890        .on_chain_message(m.chain_length()?);
891    valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
892    if !cur_ts.block_headers().is_empty() {
893        let base_fee = &cur_ts.block_headers().first().parent_base_fee;
894        let base_fee_lower_bound =
895            get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
896        if m.gas_fee_cap() < base_fee_lower_bound {
897            if local {
898                warn!(
899                    "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",
900                    m.gas_fee_cap(),
901                    base_fee_lower_bound
902                );
903                return Ok(false);
904            }
905            return Err(Error::SoftValidationFailure(format!(
906                "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
907                m.gas_fee_cap(),
908                base_fee_lower_bound
909            )));
910        }
911    }
912    Ok(local)
913}
914
915/// Remove a message from pending given the from address and sequence.
916/// The `from` address should already be resolved to its key form.
917pub fn remove(
918    from: &Address,
919    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
920    sequence: u64,
921    applied: bool,
922) -> Result<(), Error> {
923    let mut pending = pending.write();
924    let mset = if let Some(mset) = pending.get_mut(from) {
925        mset
926    } else {
927        return Ok(());
928    };
929
930    mset.rm(sequence, applied);
931
932    if mset.msgs.is_empty() {
933        pending.remove(from);
934    }
935
936    Ok(())
937}
938
939#[cfg(test)]
940mod tests {
941    use crate::blocks::RawBlockHeader;
942    use crate::chain::ChainStore;
943    use crate::db::MemoryDB;
944    use crate::message_pool::provider::Provider;
945    use crate::message_pool::test_provider::TestApi;
946    use crate::networks::ChainConfig;
947    use crate::shim::econ::TokenAmount;
948    use crate::shim::state_tree::{ActorState, StateTree, StateTreeVersion};
949    use crate::utils::db::CborStoreExt as _;
950
951    use super::*;
952    use crate::shim::message::Message as ShimMessage;
953
954    fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
955        SignedMessage::mock_bls_signed_message(ShimMessage {
956            from,
957            sequence: seq,
958            gas_premium: TokenAmount::from_atto(premium),
959            gas_limit: 1_000_000,
960            ..ShimMessage::default()
961        })
962    }
963
964    // Regression test for https://github.com/ChainSafe/forest/pull/6118 which fixed a bogus 100M
965    // gas limit. There are no limits on a single message.
966    #[test]
967    fn add_helper_message_gas_limit_test() {
968        let api = TestApi::default();
969        let bls_sig_cache = SizeTrackingLruCache::new_mocked();
970        let key_cache = SizeTrackingLruCache::new_mocked();
971        let pending = SyncRwLock::new(HashMap::new());
972        let cur_ts = api.get_heaviest_tipset();
973        let message = ShimMessage {
974            gas_limit: 666_666_666,
975            ..ShimMessage::default()
976        };
977        let msg = SignedMessage::mock_bls_signed_message(message);
978        let sequence = msg.message().sequence;
979        let res = add_helper(
980            &api,
981            &bls_sig_cache,
982            &pending,
983            &key_cache,
984            &cur_ts,
985            msg,
986            sequence,
987            TrustPolicy::Trusted,
988            StrictnessPolicy::Relaxed,
989        );
990        assert!(res.is_ok());
991    }
992
993    // Test that RBF (Replace By Fee) is allowed even when at max_actor_pending_messages capacity
994    // This matches Lotus behavior where the check is: https://github.com/filecoin-project/lotus/blob/5f32d00550ddd2f2d0f9abe97dbae07615f18547/chain/messagepool/messagepool.go#L296-L299
995    #[test]
996    fn test_rbf_at_capacity() {
997        let api = TestApi::with_max_actor_pending_messages(10);
998        let mut mset = MsgSet::new(0);
999
1000        // Fill up to capacity (10 messages)
1001        for i in 0..10 {
1002            let res = mset.add_trusted(
1003                &api,
1004                make_smsg(Address::default(), i, 100),
1005                StrictnessPolicy::Relaxed,
1006            );
1007            assert!(res.is_ok(), "Failed to add message {i}");
1008        }
1009
1010        // Should reject adding a NEW message (sequence 10) when at capacity
1011        let res = mset.add_trusted(
1012            &api,
1013            make_smsg(Address::default(), 10, 100),
1014            StrictnessPolicy::Relaxed,
1015        );
1016        assert!(matches!(res, Err(Error::TooManyPendingMessages(_, _))));
1017
1018        // Should ALLOW replacing an existing message (RBF) even when at capacity
1019        // Replace message with sequence 5 with higher gas premium
1020        let res = mset.add_trusted(
1021            &api,
1022            make_smsg(Address::default(), 5, 200),
1023            StrictnessPolicy::Relaxed,
1024        );
1025        assert!(res.is_ok(), "RBF should be allowed at capacity");
1026    }
1027
1028    #[test]
1029    fn test_resolve_to_key_returns_non_id_unchanged() {
1030        let api = TestApi::default();
1031        let key_cache = SizeTrackingLruCache::new_mocked();
1032        let ts = api.get_heaviest_tipset();
1033
1034        let bls_addr = Address::new_bls(&[1u8; 48]).unwrap();
1035        let result = resolve_to_key(&api, &key_cache, &bls_addr, &ts).unwrap();
1036        assert_eq!(result, bls_addr);
1037        assert_eq!(
1038            key_cache.len(),
1039            0,
1040            "cache should not be populated for non-ID addresses"
1041        );
1042    }
1043
1044    #[test]
1045    fn test_resolve_to_key_resolves_id_and_caches() {
1046        let api = TestApi::default();
1047        let key_cache = SizeTrackingLruCache::new_mocked();
1048        let ts = api.get_heaviest_tipset();
1049
1050        let id_addr = Address::new_id(100);
1051        let key_addr = Address::new_bls(&[5u8; 48]).unwrap();
1052        api.set_key_address_mapping(&id_addr, &key_addr);
1053
1054        let result = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap();
1055        assert_eq!(result, key_addr);
1056        assert_eq!(
1057            key_cache.len(),
1058            1,
1059            "cache should have one entry after resolution"
1060        );
1061
1062        // Second call should hit the cache (no API call needed)
1063        let result2 = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap();
1064        assert_eq!(result2, key_addr);
1065    }
1066
1067    #[test]
1068    fn test_add_helper_keys_pending_by_resolved_address() {
1069        let api = TestApi::default();
1070        let bls_sig_cache = SizeTrackingLruCache::new_mocked();
1071        let key_cache = SizeTrackingLruCache::new_mocked();
1072        let pending = SyncRwLock::new(HashMap::new());
1073        let cur_ts = api.get_heaviest_tipset();
1074
1075        let id_addr = Address::new_id(200);
1076        let key_addr = Address::new_bls(&[7u8; 48]).unwrap();
1077        api.set_key_address_mapping(&id_addr, &key_addr);
1078        api.set_state_sequence(&key_addr, 0);
1079
1080        let message = ShimMessage {
1081            from: id_addr,
1082            gas_limit: 1_000_000,
1083            ..ShimMessage::default()
1084        };
1085        let msg = SignedMessage::mock_bls_signed_message(message);
1086
1087        add_helper(
1088            &api,
1089            &bls_sig_cache,
1090            &pending,
1091            &key_cache,
1092            &cur_ts,
1093            msg,
1094            0,
1095            TrustPolicy::Trusted,
1096            StrictnessPolicy::Relaxed,
1097        )
1098        .unwrap();
1099
1100        let pending_read = pending.read();
1101        assert!(
1102            pending_read.get(&key_addr).is_some(),
1103            "pending should be keyed by the resolved key address"
1104        );
1105        assert!(
1106            pending_read.get(&id_addr).is_none(),
1107            "pending should NOT have an entry under the raw ID address"
1108        );
1109    }
1110
1111    #[test]
1112    fn test_get_sequence_works_with_both_address_forms() {
1113        let api = TestApi::default();
1114        let bls_sig_cache = SizeTrackingLruCache::new_mocked();
1115        let key_cache = SizeTrackingLruCache::new_mocked();
1116        let pending = SyncRwLock::new(HashMap::new());
1117        let cur_ts = api.get_heaviest_tipset();
1118
1119        let id_addr = Address::new_id(300);
1120        let key_addr = Address::new_bls(&[9u8; 48]).unwrap();
1121        api.set_key_address_mapping(&id_addr, &key_addr);
1122        api.set_state_sequence(&key_addr, 0);
1123
1124        // Add two messages from the ID address
1125        for seq in 0..2 {
1126            let message = ShimMessage {
1127                from: id_addr,
1128                sequence: seq,
1129                gas_limit: 1_000_000,
1130                ..ShimMessage::default()
1131            };
1132            let msg = SignedMessage::mock_bls_signed_message(message);
1133            add_helper(
1134                &api,
1135                &bls_sig_cache,
1136                &pending,
1137                &key_cache,
1138                &cur_ts,
1139                msg,
1140                0,
1141                TrustPolicy::Trusted,
1142                StrictnessPolicy::Relaxed,
1143            )
1144            .unwrap();
1145        }
1146
1147        let state_seq = api.get_actor_after(&id_addr, &cur_ts).unwrap().sequence;
1148        let resolved_for_id = resolve_to_key(&api, &key_cache, &id_addr, &cur_ts).unwrap();
1149        let resolved_for_key = resolve_to_key(&api, &key_cache, &key_addr, &cur_ts).unwrap();
1150        assert_eq!(resolved_for_id, resolved_for_key);
1151
1152        let mset = pending.read();
1153        let next_seq = mset.get(&resolved_for_id).unwrap().next_sequence;
1154        let expected = std::cmp::max(state_seq, next_seq);
1155        assert_eq!(expected, 2, "should reflect both pending messages");
1156    }
1157
1158    #[test]
1159    fn test_gap_filling_advances_next_sequence() {
1160        let api = TestApi::default();
1161        let mut mset = MsgSet::new(0);
1162
1163        mset.add_trusted(
1164            &api,
1165            make_smsg(Address::default(), 0, 100),
1166            StrictnessPolicy::Relaxed,
1167        )
1168        .unwrap();
1169        assert_eq!(mset.next_sequence, 1);
1170
1171        mset.add_trusted(
1172            &api,
1173            make_smsg(Address::default(), 2, 100),
1174            StrictnessPolicy::Relaxed,
1175        )
1176        .unwrap();
1177        assert_eq!(mset.next_sequence, 1, "gap at 1, so next_sequence stays");
1178
1179        mset.add_trusted(
1180            &api,
1181            make_smsg(Address::default(), 1, 100),
1182            StrictnessPolicy::Relaxed,
1183        )
1184        .unwrap();
1185        assert_eq!(
1186            mset.next_sequence, 3,
1187            "filling the gap should advance past all consecutive messages"
1188        );
1189    }
1190
1191    #[test]
1192    fn test_trusted_allows_any_nonce_gap() {
1193        let api = TestApi::default();
1194        let mut mset = MsgSet::new(0);
1195
1196        mset.add_trusted(
1197            &api,
1198            make_smsg(Address::default(), 0, 100),
1199            StrictnessPolicy::Relaxed,
1200        )
1201        .unwrap();
1202        let res = mset.add_trusted(
1203            &api,
1204            make_smsg(Address::default(), 10, 100),
1205            StrictnessPolicy::Relaxed,
1206        );
1207        assert!(
1208            res.is_ok(),
1209            "trusted adds skip nonce gap enforcement (StrictnessPolicy::Relaxed)"
1210        );
1211    }
1212
1213    #[test]
1214    fn test_strict_allows_small_nonce_gap() {
1215        let api = TestApi::default();
1216        let mut mset = MsgSet::new(0);
1217
1218        // Strict + trusted -> max_nonce_gap=4 (non-local add path)
1219        mset.add(
1220            &api,
1221            make_smsg(Address::default(), 0, 100),
1222            StrictnessPolicy::Strict,
1223            true,
1224        )
1225        .unwrap();
1226        let res = mset.add(
1227            &api,
1228            make_smsg(Address::default(), 3, 100),
1229            StrictnessPolicy::Strict,
1230            true,
1231        );
1232        assert!(
1233            res.is_ok(),
1234            "strict+trusted: gap of 2 (within MAX_NONCE_GAP=4) should succeed"
1235        );
1236    }
1237
1238    #[test]
1239    fn test_strict_rejects_large_nonce_gap() {
1240        let api = TestApi::default();
1241        let mut mset = MsgSet::new(0);
1242
1243        // Strict + trusted -> max_nonce_gap=4
1244        mset.add(
1245            &api,
1246            make_smsg(Address::default(), 0, 100),
1247            StrictnessPolicy::Strict,
1248            true,
1249        )
1250        .unwrap();
1251        let res = mset.add(
1252            &api,
1253            make_smsg(Address::default(), 6, 100),
1254            StrictnessPolicy::Strict,
1255            true,
1256        );
1257        assert_eq!(
1258            res,
1259            Err(Error::NonceGap),
1260            "strict+trusted: gap of 5 (exceeds MAX_NONCE_GAP=4) should be rejected"
1261        );
1262    }
1263
1264    #[test]
1265    fn test_strict_untrusted_rejects_any_gap() {
1266        let api = TestApi::default();
1267        let mut mset = MsgSet::new(0);
1268
1269        // Strict + untrusted -> max_nonce_gap=0
1270        mset.add(
1271            &api,
1272            make_smsg(Address::default(), 0, 100),
1273            StrictnessPolicy::Strict,
1274            false,
1275        )
1276        .unwrap();
1277        let res = mset.add(
1278            &api,
1279            make_smsg(Address::default(), 2, 100),
1280            StrictnessPolicy::Strict,
1281            false,
1282        );
1283        assert_eq!(
1284            res,
1285            Err(Error::NonceGap),
1286            "strict+untrusted: any gap (maxNonceGap=0) is rejected"
1287        );
1288    }
1289
1290    #[test]
1291    fn test_non_strict_untrusted_skips_gap_check() {
1292        let api = TestApi::default();
1293        let mut mset = MsgSet::new(0);
1294
1295        // Relaxed + untrusted -> gap check skipped (PushUntrusted path)
1296        mset.add_untrusted(
1297            &api,
1298            make_smsg(Address::default(), 0, 100),
1299            StrictnessPolicy::Relaxed,
1300        )
1301        .unwrap();
1302        let res = mset.add_untrusted(
1303            &api,
1304            make_smsg(Address::default(), 5, 100),
1305            StrictnessPolicy::Relaxed,
1306        );
1307        assert!(
1308            res.is_ok(),
1309            "non-strict untrusted (PushUntrusted) skips gap enforcement"
1310        );
1311    }
1312
1313    #[test]
1314    fn test_strict_rbf_during_gap_rejected() {
1315        let api = TestApi::default();
1316        let mut mset = MsgSet::new(0);
1317
1318        // Set up a gap using relaxed trusted (local push path)
1319        mset.add_trusted(
1320            &api,
1321            make_smsg(Address::default(), 0, 100),
1322            StrictnessPolicy::Relaxed,
1323        )
1324        .unwrap();
1325        mset.add_trusted(
1326            &api,
1327            make_smsg(Address::default(), 2, 100),
1328            StrictnessPolicy::Relaxed,
1329        )
1330        .unwrap();
1331
1332        // Strict RBF at nonce 2 should be rejected due to gap at nonce 1
1333        let res = mset.add(
1334            &api,
1335            make_smsg(Address::default(), 2, 200),
1336            StrictnessPolicy::Strict,
1337            true,
1338        );
1339        assert_eq!(
1340            res,
1341            Err(Error::NonceGap),
1342            "strict RBF should be rejected when nonce gap exists"
1343        );
1344    }
1345
1346    #[test]
1347    fn test_rbf_without_gap_still_works() {
1348        let api = TestApi::default();
1349        let mut mset = MsgSet::new(0);
1350
1351        mset.add_trusted(
1352            &api,
1353            make_smsg(Address::default(), 0, 100),
1354            StrictnessPolicy::Relaxed,
1355        )
1356        .unwrap();
1357        mset.add_trusted(
1358            &api,
1359            make_smsg(Address::default(), 1, 100),
1360            StrictnessPolicy::Relaxed,
1361        )
1362        .unwrap();
1363        mset.add_trusted(
1364            &api,
1365            make_smsg(Address::default(), 2, 100),
1366            StrictnessPolicy::Relaxed,
1367        )
1368        .unwrap();
1369
1370        let res = mset.add_trusted(
1371            &api,
1372            make_smsg(Address::default(), 1, 200),
1373            StrictnessPolicy::Relaxed,
1374        );
1375        assert!(res.is_ok(), "RBF without a nonce gap should succeed");
1376    }
1377
1378    #[test]
1379    fn test_get_state_sequence_accounts_for_tipset_messages() {
1380        use crate::message_pool::test_provider::mock_block;
1381
1382        let api = TestApi::default();
1383        let key_cache = SizeTrackingLruCache::new_mocked();
1384        let state_nonce_cache = SizeTrackingLruCache::new_mocked();
1385
1386        let sender = Address::new_bls(&[3u8; 48]).unwrap();
1387        api.set_state_sequence(&sender, 5);
1388
1389        let block = mock_block(1, 1);
1390        api.inner.lock().set_block_messages(
1391            &block,
1392            vec![make_smsg(sender, 5, 100), make_smsg(sender, 7, 100)],
1393        );
1394        let ts = Tipset::from(block);
1395
1396        let nonce = get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1397        assert_eq!(
1398            nonce, 8,
1399            "should account for non-consecutive tipset message at nonce 7"
1400        );
1401    }
1402
1403    #[test]
1404    fn test_get_state_sequence_ignores_other_addresses() {
1405        use crate::message_pool::test_provider::mock_block;
1406
1407        let api = TestApi::default();
1408        let key_cache = SizeTrackingLruCache::new_mocked();
1409        let state_nonce_cache = SizeTrackingLruCache::new_mocked();
1410
1411        let addr_a = Address::new_bls(&[4u8; 48]).unwrap();
1412        let addr_b = Address::new_bls(&[5u8; 48]).unwrap();
1413        api.set_state_sequence(&addr_a, 0);
1414        api.set_state_sequence(&addr_b, 0);
1415
1416        let block = mock_block(1, 1);
1417        api.inner.lock().set_block_messages(
1418            &block,
1419            vec![
1420                make_smsg(addr_b, 0, 100),
1421                make_smsg(addr_b, 1, 100),
1422                make_smsg(addr_b, 2, 100),
1423            ],
1424        );
1425        let ts = Tipset::from(block);
1426
1427        let nonce_a =
1428            get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_a, &ts).unwrap();
1429        assert_eq!(
1430            nonce_a, 0,
1431            "addr_a nonce should be unaffected by addr_b's messages"
1432        );
1433
1434        let nonce_b =
1435            get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_b, &ts).unwrap();
1436        assert_eq!(
1437            nonce_b, 3,
1438            "addr_b nonce should reflect its tipset messages"
1439        );
1440    }
1441
1442    #[test]
1443    fn test_get_state_sequence_cache_hit() {
1444        use crate::message_pool::test_provider::mock_block;
1445
1446        let api = TestApi::default();
1447        let key_cache = SizeTrackingLruCache::new_mocked();
1448        let state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64> =
1449            SizeTrackingLruCache::new_mocked();
1450
1451        let sender = Address::new_bls(&[6u8; 48]).unwrap();
1452        api.set_state_sequence(&sender, 5);
1453
1454        let block = mock_block(1, 1);
1455        api.inner
1456            .lock()
1457            .set_block_messages(&block, vec![make_smsg(sender, 5, 100)]);
1458        let ts = Tipset::from(block);
1459
1460        let nonce1 =
1461            get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1462        assert_eq!(nonce1, 6);
1463
1464        // Mutate the underlying state; the cache should still return the old value.
1465        api.set_state_sequence(&sender, 99);
1466        let nonce2 =
1467            get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1468        assert_eq!(
1469            nonce2, 6,
1470            "second call should return the cached value, not re-read state"
1471        );
1472    }
1473
1474    #[test]
1475    fn test_get_state_sequence_cache_miss_on_different_tipset() {
1476        use crate::message_pool::test_provider::mock_block;
1477
1478        let api = TestApi::default();
1479        let key_cache = SizeTrackingLruCache::new_mocked();
1480        let state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64> =
1481            SizeTrackingLruCache::new_mocked();
1482
1483        let sender = Address::new_bls(&[7u8; 48]).unwrap();
1484        api.set_state_sequence(&sender, 10);
1485
1486        let block_a = mock_block(1, 1);
1487        let ts_a = Tipset::from(&block_a);
1488
1489        let nonce_a =
1490            get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_a).unwrap();
1491        assert_eq!(nonce_a, 10);
1492
1493        // Different tipset should be a cache miss and re-read state.
1494        api.set_state_sequence(&sender, 20);
1495        let block_b = mock_block(2, 2);
1496        let ts_b = Tipset::from(&block_b);
1497
1498        let nonce_b =
1499            get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_b).unwrap();
1500        assert_eq!(
1501            nonce_b, 20,
1502            "different tipset should miss the cache and read fresh state"
1503        );
1504    }
1505
1506    #[test]
1507    fn resolve_to_key_uses_finality_lookback() {
1508        let db = Arc::new(MemoryDB::default());
1509
1510        let mut cfg = ChainConfig::default();
1511        cfg.policy.chain_finality = 1;
1512        let cfg = Arc::new(cfg);
1513
1514        let bls_a = Address::new_bls(&[8u8; 48]).unwrap();
1515        let bls_b = Address::new_bls(&[9u8; 48]).unwrap();
1516
1517        // root_a: only contains f0300
1518        let mut st_a = StateTree::new(db.clone(), StateTreeVersion::V5).unwrap();
1519        st_a.set_actor(
1520            &Address::new_id(300),
1521            ActorState::new_empty(Cid::default(), Some(bls_a)),
1522        )
1523        .unwrap();
1524        let root_a = st_a.flush().unwrap();
1525
1526        // root_b: only contains f0400
1527        let mut st_b = StateTree::new(db.clone(), StateTreeVersion::V5).unwrap();
1528        st_b.set_actor(
1529            &Address::new_id(400),
1530            ActorState::new_empty(Cid::default(), Some(bls_b)),
1531        )
1532        .unwrap();
1533        let root_b = st_b.flush().unwrap();
1534
1535        let genesis = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1536            state_root: root_a,
1537            ..Default::default()
1538        }));
1539        db.put_cbor_default(genesis.block_headers().first())
1540            .unwrap();
1541
1542        let ts1 = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1543            parents: genesis.key().clone(),
1544            epoch: 1,
1545            state_root: root_a,
1546            timestamp: 1,
1547            ..Default::default()
1548        }));
1549        db.put_cbor_default(ts1.block_headers().first()).unwrap();
1550
1551        let head = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1552            parents: ts1.key().clone(),
1553            epoch: 2,
1554            state_root: root_b,
1555            timestamp: 2,
1556            ..Default::default()
1557        }));
1558        db.put_cbor_default(head.block_headers().first()).unwrap();
1559
1560        let cs = ChainStore::new(
1561            db.clone(),
1562            db.clone(),
1563            db,
1564            cfg,
1565            genesis.block_headers().first().clone(),
1566        )
1567        .unwrap();
1568
1569        // f0300 exists in lookback state (root_a) → resolves successfully.
1570        let result = Provider::resolve_to_deterministic_address_at_finality(
1571            &cs,
1572            &Address::new_id(300),
1573            &head,
1574        )
1575        .unwrap();
1576        assert_eq!(result, bls_a);
1577
1578        // f0400 exists only in head state (root_b), not in lookback → fails.
1579        Provider::resolve_to_deterministic_address_at_finality(&cs, &Address::new_id(400), &head)
1580            .expect_err("actor only in head state must not resolve via finality lookback");
1581    }
1582}