Skip to main content

forest/message_pool/msgpool/
msg_pool.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4// Contains the implementation of Message Pool component.
5// The Message Pool is the component of forest that handles pending messages for
6// inclusion in the chain. Messages are added either directly for locally
7// published messages or through pubsub propagation.
8
9use std::{num::NonZeroUsize, sync::Arc, time::Duration};
10
11use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey};
12use crate::chain::{HeadChanges, MINIMUM_BASE_FEE};
13#[cfg(test)]
14use crate::db::SettingsStore;
15use crate::eth::is_valid_eth_tx_for_sending;
16use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
17use crate::message::{ChainMessage, MessageRead as _, SignedMessage, valid_for_block_inclusion};
18use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION};
19use crate::rpc::eth::types::EthAddress;
20use crate::shim::{
21    address::{Address, Protocol},
22    crypto::{Signature, SignatureType},
23    econ::TokenAmount,
24    gas::{Gas, price_list_by_network_version},
25};
26use crate::state_manager::IdToAddressCache;
27use crate::state_manager::utils::is_valid_for_sending;
28use crate::utils::ShallowClone as _;
29use crate::utils::cache::SizeTrackingLruCache;
30use crate::utils::get_size::CidWrapper;
31use ahash::{HashSet, HashSetExt};
32use anyhow::Context as _;
33use cid::Cid;
34use futures::StreamExt;
35use fvm_ipld_encoding::to_vec;
36use get_size2::GetSize;
37use itertools::Itertools;
38use nonzero_ext::nonzero;
39use parking_lot::RwLock as SyncRwLock;
40use tokio::{
41    sync::broadcast::{self, error::RecvError},
42    task::JoinSet,
43    time::interval,
44};
45use tracing::warn;
46
47use crate::message_pool::{
48    config::MpoolConfig,
49    errors::Error,
50    head_change,
51    msgpool::{
52        BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore,
53        recover_sig, republish_pending_messages,
54    },
55    provider::Provider,
56    utils::get_base_fee_lower_bound,
57};
58
59// LruCache sizes have been taken from the lotus implementation
60const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize);
61const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize);
62const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize);
63const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize);
64
65#[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)]
66pub(crate) struct StateNonceCacheKey {
67    tipset_key: TipsetKey,
68    addr: Address,
69}
70
71pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000;
72pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10;
73/// Maximum size of a serialized message in bytes. This is an anti-DOS measure to prevent
74/// large messages from being added to the message pool.
75const MAX_MESSAGE_SIZE: usize = 64 << 10; // 64 KiB
76
77/// Trust policy for whether a message is from a trusted or untrusted source.
78/// Untrusted sources are subject to stricter limits.
79#[derive(Clone, Copy, Debug, PartialEq, Eq)]
80pub enum TrustPolicy {
81    Trusted,
82    Untrusted,
83}
84
85pub use super::msg_set::{MsgSetLimits, StrictnessPolicy};
86
87/// This contains all necessary information needed for the message pool.
88/// Keeps track of messages to apply, as well as context needed for verifying
89/// transactions.
90pub struct MessagePool<T> {
91    /// The local address of the client
92    local_addrs: Arc<SyncRwLock<Vec<Address>>>,
93    /// Pending messages, keyed by resolved-key address, together with the
94    /// broadcast channel for [`MpoolUpdate`] events. See [`PendingStore`].
95    pub(in crate::message_pool) pending_store: PendingStore,
96    /// The current tipset (a set of blocks)
97    pub cur_tipset: Arc<SyncRwLock<Tipset>>,
98    /// The underlying provider
99    pub api: Arc<T>,
100    /// Sender half to send messages to other components
101    pub network_sender: flume::Sender<NetworkMessage>,
102    /// A cache for BLS signature keyed by Cid
103    pub bls_sig_cache: SizeTrackingLruCache<CidWrapper, Signature>,
104    /// A cache for BLS signature keyed by Cid
105    pub sig_val_cache: SizeTrackingLruCache<CidWrapper, ()>,
106    /// Cache for ID address ID to key address resolution.
107    pub key_cache: IdToAddressCache,
108    /// Cache for state nonce lookups keyed by (`TipsetKey`, `Address`).
109    pub state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64>,
110    /// A set of republished messages identified by their Cid
111    pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
112    /// Acts as a signal to republish messages from the republished set of
113    /// messages
114    pub repub_trigger: flume::Sender<()>,
115    local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
116    /// Configurable parameters of the message pool
117    pub config: MpoolConfig,
118    /// Chain configuration
119    pub chain_config: Arc<ChainConfig>,
120}
121
122/// Resolve an address to its key form, checking the cache first.
123/// Non-ID addresses are returned unchanged.
124pub(in crate::message_pool) fn resolve_to_key<T: Provider>(
125    api: &T,
126    key_cache: &IdToAddressCache,
127    addr: &Address,
128    cur_ts: &Tipset,
129) -> Result<Address, Error> {
130    let id = addr.id().ok();
131    if let Some(id) = &id
132        && let Some(resolved) = key_cache.get_cloned(id)
133    {
134        return Ok(resolved);
135    }
136    let resolved = api.resolve_to_deterministic_address_at_finality(addr, cur_ts)?;
137    if let Some(id) = id {
138        key_cache.push(id, resolved);
139    }
140    Ok(resolved)
141}
142
143/// Get the state nonce for an address, accounting for messages already included in `cur_ts`.
144pub(in crate::message_pool) fn get_state_sequence<T: Provider>(
145    api: &T,
146    key_cache: &IdToAddressCache,
147    state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
148    addr: &Address,
149    cur_ts: &Tipset,
150) -> Result<u64, Error> {
151    let nk = StateNonceCacheKey {
152        tipset_key: cur_ts.key().clone(),
153        addr: *addr,
154    };
155
156    if let Some(cached) = state_nonce_cache.get_cloned(&nk) {
157        return Ok(cached);
158    }
159
160    let actor = api.get_actor_after(addr, cur_ts)?;
161    let mut next_nonce = actor.sequence;
162
163    if let (Ok(resolved), Ok(messages)) = (
164        resolve_to_key(api, key_cache, addr, cur_ts)
165            .inspect_err(|e| tracing::warn!(%addr, "failed to resolve address to key: {e:#}")),
166        api.messages_for_tipset(cur_ts)
167            .inspect_err(|e| tracing::warn!("failed to get messages for tipset: {e:#}")),
168    ) {
169        for msg in messages.iter() {
170            if let Ok(from) = resolve_to_key(api, key_cache, &msg.from(), cur_ts).inspect_err(
171                |e| tracing::warn!(from = %msg.from(), "failed to resolve message sender: {e:#}"),
172            ) && from == resolved
173            {
174                let n = msg.sequence() + 1;
175                if n > next_nonce {
176                    next_nonce = n;
177                }
178            }
179        }
180    }
181
182    state_nonce_cache.push(nk, next_nonce);
183    Ok(next_nonce)
184}
185
186impl<T> MessagePool<T>
187where
188    T: Provider,
189{
190    /// Gets the current tipset
191    pub fn current_tipset(&self) -> Tipset {
192        self.cur_tipset.read().clone()
193    }
194
195    pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result<Address, Error> {
196        resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts)
197    }
198
199    /// Add a signed message to the pool and its address.
200    fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
201        let cur_ts = self.current_tipset();
202        let resolved = self.resolve_to_key(&m.from(), &cur_ts)?;
203        self.local_addrs.write().push(resolved);
204        self.local_msgs.write().insert(m);
205        Ok(())
206    }
207
208    /// Push a signed message to the `MessagePool`. Additionally performs basic
209    /// checks on the validity of a message.
210    pub async fn push_internal(
211        &self,
212        msg: SignedMessage,
213        trust_policy: TrustPolicy,
214    ) -> Result<Cid, Error> {
215        self.check_message(&msg)?;
216        let cid = msg.cid();
217        let cur_ts = self.current_tipset();
218        let publish = self.add_tipset(msg.clone(), &cur_ts, true, trust_policy)?;
219        let msg_ser = to_vec(&msg)?;
220        let network_name = self.chain_config.network.genesis_name();
221        self.add_local(msg)?;
222        if publish {
223            self.network_sender
224                .send_async(NetworkMessage::PubsubMessage {
225                    topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
226                    message: msg_ser,
227                })
228                .await
229                .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
230        }
231        Ok(cid)
232    }
233
234    /// Push a signed message to the `MessagePool` from an trusted source.
235    pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
236        self.push_internal(msg, TrustPolicy::Trusted).await
237    }
238
239    /// Push a signed message to the `MessagePool` from an untrusted source.
240    pub async fn push_untrusted(&self, msg: SignedMessage) -> Result<Cid, Error> {
241        self.push_internal(msg, TrustPolicy::Untrusted).await
242    }
243
244    fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> {
245        if to_vec(msg)?.len() > MAX_MESSAGE_SIZE {
246            return Err(Error::MessageTooBig);
247        }
248        let to = msg.message().to();
249        if to.protocol() == Protocol::Delegated {
250            EthAddress::from_filecoin_address(&to).context(format!(
251                "message recipient {to} is a delegated address but not a valid Eth Address"
252            ))?;
253        }
254        valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?;
255        if msg.value() > *crate::shim::econ::TOTAL_FILECOIN {
256            return Err(Error::MessageValueTooHigh);
257        }
258        if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() {
259            return Err(Error::GasFeeCapTooLow);
260        }
261        self.verify_msg_sig(msg)
262    }
263
264    /// This is a helper to push that will help to make sure that the message
265    /// fits the parameters to be pushed to the `MessagePool`.
266    pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
267        self.check_message(&msg)?;
268        let ts = self.current_tipset();
269        self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?;
270        Ok(())
271    }
272
273    /// Verify the message signature. first check if it has already been
274    /// verified and put into cache. If it has not, then manually verify it
275    /// then put it into cache for future use.
276    fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
277        let cid = msg.cid();
278
279        if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
280            return Ok(());
281        }
282
283        msg.verify(self.chain_config.eth_chain_id)
284            .map_err(|e| Error::Other(e.to_string()))?;
285
286        self.sig_val_cache.push(cid.into(), ());
287
288        Ok(())
289    }
290
291    /// Verify the `state_sequence` and balance for the sender of the message
292    /// given then call `add_locked` to finish adding the `signed_message`
293    /// to pending.
294    fn add_tipset(
295        &self,
296        msg: SignedMessage,
297        cur_ts: &Tipset,
298        local: bool,
299        trust_policy: TrustPolicy,
300    ) -> Result<bool, Error> {
301        let sequence = self.get_state_sequence(&msg.from(), cur_ts)?;
302
303        if sequence > msg.message().sequence {
304            return Err(Error::SequenceTooLow);
305        }
306
307        let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?;
308
309        // This message can only be included in the next epoch and beyond, hence the +1.
310        let nv = self.chain_config.network_version(cur_ts.epoch() + 1);
311        let eth_chain_id = self.chain_config.eth_chain_id;
312        if msg.signature().signature_type() == SignatureType::Delegated
313            && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg)
314        {
315            return Err(Error::Other(
316                "Invalid Ethereum message for the current network version".to_owned(),
317            ));
318        }
319        if !is_valid_for_sending(nv, &sender_actor) {
320            return Err(Error::Other(
321                "Sender actor is not a valid top-level sender".to_owned(),
322            ));
323        }
324
325        let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?;
326
327        let balance = self.get_state_balance(&msg.from(), cur_ts)?;
328
329        let msg_balance = msg.required_funds();
330        if balance < msg_balance {
331            return Err(Error::NotEnoughFunds);
332        }
333        let strictness = if local {
334            StrictnessPolicy::Relaxed
335        } else {
336            StrictnessPolicy::Strict
337        };
338        self.add_helper(msg, trust_policy, strictness)?;
339        Ok(publish)
340    }
341
342    /// Finish verifying signed message before adding it to the pending `mset`
343    /// hash-map. If an entry in the hash-map does not yet exist, create a
344    /// new `mset` that will correspond to the from message and push it to
345    /// the pending hash-map.
346    fn add_helper(
347        &self,
348        msg: SignedMessage,
349        trust_policy: TrustPolicy,
350        strictness: StrictnessPolicy,
351    ) -> Result<(), Error> {
352        let from = msg.from();
353        let cur_ts = self.current_tipset();
354        add_helper(
355            self.api.as_ref(),
356            &self.bls_sig_cache,
357            &self.pending_store,
358            &self.key_cache,
359            &cur_ts,
360            msg,
361            self.get_state_sequence(&from, &cur_ts)?,
362            trust_policy,
363            strictness,
364        )
365    }
366
367    /// Get the sequence for a given address, return Error if there is a failure
368    /// to retrieve the respective sequence.
369    pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
370        let cur_ts = self.current_tipset();
371
372        let sequence = self.get_state_sequence(addr, &cur_ts)?;
373
374        let resolved = self.resolve_to_key(addr, &cur_ts).ok();
375        let mset = resolved
376            .and_then(|r| self.pending_store.snapshot_for(&r))
377            .or_else(|| self.pending_store.snapshot_for(addr));
378        match mset {
379            Some(mset) => {
380                if sequence > mset.next_sequence {
381                    return Ok(sequence);
382                }
383                Ok(mset.next_sequence)
384            }
385            None => Ok(sequence),
386        }
387    }
388
389    /// Get the state of the sequence for a given address in `cur_ts`.
390    fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
391        get_state_sequence(
392            self.api.as_ref(),
393            &self.key_cache,
394            &self.state_nonce_cache,
395            addr,
396            cur_ts,
397        )
398    }
399
400    /// Get the state balance for the actor that corresponds to the supplied
401    /// address and tipset, if this actor does not exist, return an error.
402    fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result<TokenAmount, Error> {
403        let actor = self.api.get_actor_after(addr, ts)?;
404        Ok(TokenAmount::from(&actor.balance))
405    }
406
407    /// Return a tuple that contains a vector of all signed messages and the
408    /// current tipset for self.
409    pub fn pending(&self) -> (Vec<SignedMessage>, Tipset) {
410        let pending = self.pending_store.snapshot();
411        let len = pending.values().map(|mset| mset.msgs.len()).sum();
412        let mut out = Vec::with_capacity(len);
413
414        for mset in pending.into_values() {
415            out.extend(
416                mset.msgs
417                    .into_values()
418                    .sorted_unstable_by_key(|m| m.message().sequence),
419            );
420        }
421
422        let cur_ts = self.current_tipset();
423
424        (out, cur_ts)
425    }
426
427    /// Return a Vector of signed messages for a given from address. This vector
428    /// will be sorted by each `message`'s sequence. If no corresponding
429    /// messages found, return None result type.
430    pub fn pending_for(&self, a: &Address) -> Option<Vec<SignedMessage>> {
431        let cur_ts = self.current_tipset();
432        let resolved = self
433            .resolve_to_key(a, &cur_ts)
434            .inspect_err(|e| tracing::debug!(%a, "pending_for: failed to resolve address: {e:#}"))
435            .ok()?;
436        let mset = self.pending_store.snapshot_for(&resolved)?;
437        if mset.msgs.is_empty() {
438            return None;
439        }
440
441        Some(
442            mset.msgs
443                .into_values()
444                .sorted_by_key(|v| v.message().sequence)
445                .collect(),
446        )
447    }
448
449    /// Subscribe to [`MpoolUpdate`] events for every insertion into and
450    /// removal from the pending pool.
451    #[allow(dead_code)] // surfaces the MpoolUpdate API for external subscribers.
452    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<MpoolUpdate> {
453        self.pending_store.subscribe()
454    }
455
456    /// Return Vector of signed messages given a block header for self.
457    pub fn messages_for_blocks<'a>(
458        &self,
459        blks: impl Iterator<Item = &'a CachingBlockHeader>,
460    ) -> Result<Vec<SignedMessage>, Error> {
461        let mut msg_vec: Vec<SignedMessage> = Vec::new();
462
463        for block in blks {
464            let (umsg, mut smsgs) = self.api.messages_for_block(block)?;
465
466            msg_vec.append(smsgs.as_mut());
467            for msg in umsg {
468                let smsg = recover_sig(&self.bls_sig_cache, msg)?;
469                msg_vec.push(smsg)
470            }
471        }
472        Ok(msg_vec)
473    }
474
475    /// Loads local messages to the message pool to be applied.
476    pub fn load_local(&mut self) -> Result<(), Error> {
477        let mut local_msgs = self.local_msgs.write();
478        for k in local_msgs.iter().cloned().collect_vec() {
479            self.add(k.clone()).unwrap_or_else(|err| {
480                if err == Error::SequenceTooLow {
481                    warn!("error adding message: {:?}", err);
482                    local_msgs.remove(&k);
483                }
484            })
485        }
486
487        Ok(())
488    }
489
490    #[cfg(test)]
491    pub fn get_config(&self) -> &MpoolConfig {
492        &self.config
493    }
494
495    #[cfg(test)]
496    pub fn set_config<DB: SettingsStore>(
497        &mut self,
498        db: &DB,
499        cfg: MpoolConfig,
500    ) -> Result<(), Error> {
501        cfg.save_config(db)
502            .map_err(|e| Error::Other(e.to_string()))?;
503        self.config = cfg;
504        Ok(())
505    }
506
507    #[cfg(test)]
508    pub async fn apply_head_change(
509        &self,
510        revert: Vec<crate::blocks::Tipset>,
511        apply: Vec<crate::blocks::Tipset>,
512    ) -> Result<(), Error>
513    where
514        T: 'static,
515    {
516        head_change(
517            self.api.as_ref(),
518            &self.bls_sig_cache,
519            self.repub_trigger.clone(),
520            self.republished.as_ref(),
521            &self.pending_store,
522            self.cur_tipset.as_ref(),
523            &self.key_cache,
524            &self.state_nonce_cache,
525            revert,
526            apply,
527        )
528        .await
529    }
530}
531
532impl<T> MessagePool<T>
533where
534    T: Provider + Send + Sync + 'static,
535{
536    /// Creates a new `MessagePool` instance.
537    pub fn new(
538        api: T,
539        network_sender: flume::Sender<NetworkMessage>,
540        config: MpoolConfig,
541        chain_config: Arc<ChainConfig>,
542        services: &mut JoinSet<anyhow::Result<()>>,
543    ) -> Result<MessagePool<T>, Error>
544    where
545        T: Provider,
546    {
547        let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
548        // Per-actor limits are constant for the lifetime of this pool; capture
549        // them once here rather than re-reading on every insert.
550        let pending_store = PendingStore::new(MsgSetLimits::new(
551            api.max_actor_pending_messages(),
552            api.max_untrusted_actor_pending_messages(),
553        ));
554        let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
555        let bls_sig_cache =
556            SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE);
557        let sig_val_cache =
558            SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE);
559        let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE);
560        let state_nonce_cache =
561            SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE);
562        let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
563        let republished = Arc::new(SyncRwLock::new(HashSet::new()));
564        let block_delay = chain_config.block_delay_secs;
565
566        let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
567        let mut mp = MessagePool {
568            local_addrs,
569            pending_store,
570            cur_tipset: tipset,
571            api: Arc::new(api),
572            bls_sig_cache,
573            sig_val_cache,
574            key_cache,
575            state_nonce_cache,
576            local_msgs,
577            republished,
578            config,
579            network_sender,
580            repub_trigger,
581            chain_config: Arc::clone(&chain_config),
582        };
583
584        mp.load_local()?;
585
586        let mut head_changes_rx = mp.api.subscribe_head_changes();
587
588        let api = mp.api.clone();
589        let bls_sig_cache = mp.bls_sig_cache.shallow_clone();
590        let pending_store = mp.pending_store.shallow_clone();
591        let republished = mp.republished.clone();
592        let key_cache = mp.key_cache.shallow_clone();
593        let state_nonce_cache = mp.state_nonce_cache.shallow_clone();
594
595        let current_ts = mp.cur_tipset.clone();
596        let repub_trigger = mp.repub_trigger.clone();
597
598        // Reacts to new HeadChanges
599        services.spawn(async move {
600            loop {
601                match head_changes_rx.recv().await {
602                    Ok(HeadChanges { reverts, applies }) => {
603                        if let Err(e) = head_change(
604                            api.as_ref(),
605                            &bls_sig_cache,
606                            repub_trigger.clone(),
607                            republished.as_ref(),
608                            &pending_store,
609                            &current_ts,
610                            &key_cache,
611                            &state_nonce_cache,
612                            reverts,
613                            applies,
614                        )
615                        .await
616                        {
617                            tracing::warn!("Error changing head: {e}");
618                        }
619                    }
620                    Err(RecvError::Lagged(n)) => {
621                        warn!("Head change subscriber lagged: skipping {n} events");
622                    }
623                    Err(RecvError::Closed) => {
624                        break Ok(());
625                    }
626                }
627            }
628        });
629
630        let api = mp.api.clone();
631        let pending_store = mp.pending_store.shallow_clone();
632        let cur_tipset = mp.cur_tipset.clone();
633        let republished = mp.republished.clone();
634        let local_addrs = mp.local_addrs.clone();
635        let key_cache = mp.key_cache.shallow_clone();
636        let network_sender = Arc::new(mp.network_sender.clone());
637        let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs);
638        // Reacts to republishing requests
639        services.spawn(async move {
640            let mut repub_trigger_rx = repub_trigger_rx.stream();
641            let mut interval = interval(Duration::from_secs(republish_interval));
642            loop {
643                tokio::select! {
644                    _ = interval.tick() => (),
645                    _ = repub_trigger_rx.next() => (),
646                }
647                if let Err(e) = republish_pending_messages(
648                    api.as_ref(),
649                    network_sender.as_ref(),
650                    &pending_store,
651                    cur_tipset.as_ref(),
652                    republished.as_ref(),
653                    local_addrs.as_ref(),
654                    &key_cache,
655                    &chain_config,
656                )
657                .await
658                {
659                    warn!("Failed to republish pending messages: {}", e.to_string());
660                }
661            }
662        });
663        Ok(mp)
664    }
665}
666
667// Helpers for MessagePool
668
669/// Finish verifying the signed message before adding it to the pending `mset`
670/// hash-map. If an entry in the hash-map does not yet exist, create a new
671/// `mset` that will correspond to the form message and push it to the pending
672/// hash-map.
673#[allow(clippy::too_many_arguments)]
674pub(in crate::message_pool) fn add_helper<T>(
675    api: &T,
676    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
677    pending_store: &PendingStore,
678    key_cache: &IdToAddressCache,
679    cur_ts: &Tipset,
680    msg: SignedMessage,
681    sequence: u64,
682    trust_policy: TrustPolicy,
683    strictness: StrictnessPolicy,
684) -> Result<(), Error>
685where
686    T: Provider,
687{
688    if msg.signature().signature_type() == SignatureType::Bls {
689        bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
690    }
691
692    api.put_message(&ChainMessage::Signed(msg.clone().into()))?;
693    api.put_message(&ChainMessage::Unsigned(msg.message().clone().into()))?;
694
695    let resolved_from = resolve_to_key(api, key_cache, &msg.from(), cur_ts)?;
696    pending_store.insert(resolved_from, msg, sequence, trust_policy, strictness)
697}
698
699fn verify_msg_before_add(
700    m: &SignedMessage,
701    cur_ts: &Tipset,
702    local: bool,
703    chain_config: &ChainConfig,
704) -> Result<bool, Error> {
705    let epoch = cur_ts.epoch();
706    let min_gas = price_list_by_network_version(chain_config.network_version(epoch))
707        .on_chain_message(m.chain_length()?);
708    valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?;
709    if !cur_ts.block_headers().is_empty() {
710        let base_fee = &cur_ts.block_headers().first().parent_base_fee;
711        let base_fee_lower_bound =
712            get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE);
713        if m.gas_fee_cap() < base_fee_lower_bound {
714            if local {
715                warn!(
716                    "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})",
717                    m.gas_fee_cap(),
718                    base_fee_lower_bound
719                );
720                return Ok(false);
721            }
722            return Err(Error::SoftValidationFailure(format!(
723                "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})",
724                m.gas_fee_cap(),
725                base_fee_lower_bound
726            )));
727        }
728    }
729    Ok(local)
730}
731
732#[cfg(test)]
733mod tests {
734    use crate::blocks::RawBlockHeader;
735    use crate::chain::ChainStore;
736    use crate::db::MemoryDB;
737    use crate::message_pool::provider::Provider;
738    use crate::message_pool::test_provider::TestApi;
739    use crate::networks::ChainConfig;
740    use crate::shim::econ::TokenAmount;
741    use crate::shim::state_tree::{ActorState, StateTree, StateTreeVersion};
742    use crate::utils::db::CborStoreExt as _;
743
744    use super::*;
745    use crate::shim::message::Message as ShimMessage;
746
747    fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
748        SignedMessage::mock_bls_signed_message(ShimMessage {
749            from,
750            sequence: seq,
751            gas_premium: TokenAmount::from_atto(premium),
752            gas_limit: 1_000_000,
753            ..ShimMessage::default()
754        })
755    }
756
757    /// Build a `PendingStore` sized from the [`TestApi`] provider's limits.
758    fn test_pending_store(api: &TestApi) -> PendingStore {
759        PendingStore::new(MsgSetLimits::new(
760            api.max_actor_pending_messages(),
761            api.max_untrusted_actor_pending_messages(),
762        ))
763    }
764
765    // Regression test for https://github.com/ChainSafe/forest/pull/6118 which fixed a bogus 100M
766    // gas limit. There are no limits on a single message.
767    #[test]
768    fn add_helper_message_gas_limit_test() {
769        let api = TestApi::default();
770        let bls_sig_cache = SizeTrackingLruCache::new_mocked();
771        let key_cache = SizeTrackingLruCache::new_mocked();
772        let pending_store = test_pending_store(&api);
773        let cur_ts = api.get_heaviest_tipset();
774        let message = ShimMessage {
775            gas_limit: 666_666_666,
776            ..ShimMessage::default()
777        };
778        let msg = SignedMessage::mock_bls_signed_message(message);
779        let sequence = msg.message().sequence;
780        let res = add_helper(
781            &api,
782            &bls_sig_cache,
783            &pending_store,
784            &key_cache,
785            &cur_ts,
786            msg,
787            sequence,
788            TrustPolicy::Trusted,
789            StrictnessPolicy::Relaxed,
790        );
791        assert!(res.is_ok());
792    }
793
794    #[test]
795    fn test_resolve_to_key_returns_non_id_unchanged() {
796        let api = TestApi::default();
797        let key_cache = SizeTrackingLruCache::new_mocked();
798        let ts = api.get_heaviest_tipset();
799
800        let bls_addr = Address::new_bls(&[1u8; 48]).unwrap();
801        let result = resolve_to_key(&api, &key_cache, &bls_addr, &ts).unwrap();
802        assert_eq!(result, bls_addr);
803        assert_eq!(
804            key_cache.len(),
805            0,
806            "cache should not be populated for non-ID addresses"
807        );
808    }
809
810    #[test]
811    fn test_resolve_to_key_resolves_id_and_caches() {
812        let api = TestApi::default();
813        let key_cache = SizeTrackingLruCache::new_mocked();
814        let ts = api.get_heaviest_tipset();
815
816        let id_addr = Address::new_id(100);
817        let key_addr = Address::new_bls(&[5u8; 48]).unwrap();
818        api.set_key_address_mapping(&id_addr, &key_addr);
819
820        let result = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap();
821        assert_eq!(result, key_addr);
822        assert_eq!(
823            key_cache.len(),
824            1,
825            "cache should have one entry after resolution"
826        );
827
828        // Second call should hit the cache (no API call needed)
829        let result2 = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap();
830        assert_eq!(result2, key_addr);
831    }
832
833    #[test]
834    fn test_add_helper_keys_pending_by_resolved_address() {
835        let api = TestApi::default();
836        let bls_sig_cache = SizeTrackingLruCache::new_mocked();
837        let key_cache = SizeTrackingLruCache::new_mocked();
838        let pending_store = test_pending_store(&api);
839        let cur_ts = api.get_heaviest_tipset();
840
841        let id_addr = Address::new_id(200);
842        let key_addr = Address::new_bls(&[7u8; 48]).unwrap();
843        api.set_key_address_mapping(&id_addr, &key_addr);
844        api.set_state_sequence(&key_addr, 0);
845
846        let message = ShimMessage {
847            from: id_addr,
848            gas_limit: 1_000_000,
849            ..ShimMessage::default()
850        };
851        let msg = SignedMessage::mock_bls_signed_message(message);
852
853        add_helper(
854            &api,
855            &bls_sig_cache,
856            &pending_store,
857            &key_cache,
858            &cur_ts,
859            msg,
860            0,
861            TrustPolicy::Trusted,
862            StrictnessPolicy::Relaxed,
863        )
864        .unwrap();
865
866        assert!(
867            pending_store.snapshot_for(&key_addr).is_some(),
868            "pending should be keyed by the resolved key address"
869        );
870        assert!(
871            pending_store.snapshot_for(&id_addr).is_none(),
872            "pending should NOT have an entry under the raw ID address"
873        );
874    }
875
876    #[test]
877    fn test_get_sequence_works_with_both_address_forms() {
878        let api = TestApi::default();
879        let bls_sig_cache = SizeTrackingLruCache::new_mocked();
880        let key_cache = SizeTrackingLruCache::new_mocked();
881        let pending_store = test_pending_store(&api);
882        let cur_ts = api.get_heaviest_tipset();
883
884        let id_addr = Address::new_id(300);
885        let key_addr = Address::new_bls(&[9u8; 48]).unwrap();
886        api.set_key_address_mapping(&id_addr, &key_addr);
887        api.set_state_sequence(&key_addr, 0);
888
889        // Add two messages from the ID address
890        for seq in 0..2 {
891            let message = ShimMessage {
892                from: id_addr,
893                sequence: seq,
894                gas_limit: 1_000_000,
895                ..ShimMessage::default()
896            };
897            let msg = SignedMessage::mock_bls_signed_message(message);
898            add_helper(
899                &api,
900                &bls_sig_cache,
901                &pending_store,
902                &key_cache,
903                &cur_ts,
904                msg,
905                0,
906                TrustPolicy::Trusted,
907                StrictnessPolicy::Relaxed,
908            )
909            .unwrap();
910        }
911
912        let state_seq = api.get_actor_after(&id_addr, &cur_ts).unwrap().sequence;
913        let resolved_for_id = resolve_to_key(&api, &key_cache, &id_addr, &cur_ts).unwrap();
914        let resolved_for_key = resolve_to_key(&api, &key_cache, &key_addr, &cur_ts).unwrap();
915        assert_eq!(resolved_for_id, resolved_for_key);
916
917        let next_seq = pending_store
918            .snapshot_for(&resolved_for_id)
919            .unwrap()
920            .next_sequence;
921        let expected = std::cmp::max(state_seq, next_seq);
922        assert_eq!(expected, 2, "should reflect both pending messages");
923    }
924
925    #[test]
926    fn test_get_state_sequence_accounts_for_tipset_messages() {
927        use crate::message_pool::test_provider::mock_block;
928
929        let api = TestApi::default();
930        let key_cache = SizeTrackingLruCache::new_mocked();
931        let state_nonce_cache = SizeTrackingLruCache::new_mocked();
932
933        let sender = Address::new_bls(&[3u8; 48]).unwrap();
934        api.set_state_sequence(&sender, 5);
935
936        let block = mock_block(1, 1);
937        api.inner.lock().set_block_messages(
938            &block,
939            vec![make_smsg(sender, 5, 100), make_smsg(sender, 7, 100)],
940        );
941        let ts = Tipset::from(block);
942
943        let nonce = get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
944        assert_eq!(
945            nonce, 8,
946            "should account for non-consecutive tipset message at nonce 7"
947        );
948    }
949
950    #[test]
951    fn test_get_state_sequence_ignores_other_addresses() {
952        use crate::message_pool::test_provider::mock_block;
953
954        let api = TestApi::default();
955        let key_cache = SizeTrackingLruCache::new_mocked();
956        let state_nonce_cache = SizeTrackingLruCache::new_mocked();
957
958        let addr_a = Address::new_bls(&[4u8; 48]).unwrap();
959        let addr_b = Address::new_bls(&[5u8; 48]).unwrap();
960        api.set_state_sequence(&addr_a, 0);
961        api.set_state_sequence(&addr_b, 0);
962
963        let block = mock_block(1, 1);
964        api.inner.lock().set_block_messages(
965            &block,
966            vec![
967                make_smsg(addr_b, 0, 100),
968                make_smsg(addr_b, 1, 100),
969                make_smsg(addr_b, 2, 100),
970            ],
971        );
972        let ts = Tipset::from(block);
973
974        let nonce_a =
975            get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_a, &ts).unwrap();
976        assert_eq!(
977            nonce_a, 0,
978            "addr_a nonce should be unaffected by addr_b's messages"
979        );
980
981        let nonce_b =
982            get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_b, &ts).unwrap();
983        assert_eq!(
984            nonce_b, 3,
985            "addr_b nonce should reflect its tipset messages"
986        );
987    }
988
989    #[test]
990    fn test_get_state_sequence_cache_hit() {
991        use crate::message_pool::test_provider::mock_block;
992
993        let api = TestApi::default();
994        let key_cache = SizeTrackingLruCache::new_mocked();
995        let state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64> =
996            SizeTrackingLruCache::new_mocked();
997
998        let sender = Address::new_bls(&[6u8; 48]).unwrap();
999        api.set_state_sequence(&sender, 5);
1000
1001        let block = mock_block(1, 1);
1002        api.inner
1003            .lock()
1004            .set_block_messages(&block, vec![make_smsg(sender, 5, 100)]);
1005        let ts = Tipset::from(block);
1006
1007        let nonce1 =
1008            get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1009        assert_eq!(nonce1, 6);
1010
1011        // Mutate the underlying state; the cache should still return the old value.
1012        api.set_state_sequence(&sender, 99);
1013        let nonce2 =
1014            get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap();
1015        assert_eq!(
1016            nonce2, 6,
1017            "second call should return the cached value, not re-read state"
1018        );
1019    }
1020
1021    #[test]
1022    fn test_get_state_sequence_cache_miss_on_different_tipset() {
1023        use crate::message_pool::test_provider::mock_block;
1024
1025        let api = TestApi::default();
1026        let key_cache = SizeTrackingLruCache::new_mocked();
1027        let state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64> =
1028            SizeTrackingLruCache::new_mocked();
1029
1030        let sender = Address::new_bls(&[7u8; 48]).unwrap();
1031        api.set_state_sequence(&sender, 10);
1032
1033        let block_a = mock_block(1, 1);
1034        let ts_a = Tipset::from(&block_a);
1035
1036        let nonce_a =
1037            get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_a).unwrap();
1038        assert_eq!(nonce_a, 10);
1039
1040        // Different tipset should be a cache miss and re-read state.
1041        api.set_state_sequence(&sender, 20);
1042        let block_b = mock_block(2, 2);
1043        let ts_b = Tipset::from(&block_b);
1044
1045        let nonce_b =
1046            get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_b).unwrap();
1047        assert_eq!(
1048            nonce_b, 20,
1049            "different tipset should miss the cache and read fresh state"
1050        );
1051    }
1052
1053    #[test]
1054    fn resolve_to_key_uses_finality_lookback() {
1055        let db = Arc::new(MemoryDB::default());
1056
1057        let mut cfg = ChainConfig::default();
1058        cfg.policy.chain_finality = 1;
1059        let cfg = Arc::new(cfg);
1060
1061        let bls_a = Address::new_bls(&[8u8; 48]).unwrap();
1062        let bls_b = Address::new_bls(&[9u8; 48]).unwrap();
1063
1064        // root_a: only contains f0300
1065        let mut st_a = StateTree::new(db.clone(), StateTreeVersion::V5).unwrap();
1066        st_a.set_actor(
1067            &Address::new_id(300),
1068            ActorState::new_empty(Cid::default(), Some(bls_a)),
1069        )
1070        .unwrap();
1071        let root_a = st_a.flush().unwrap();
1072
1073        // root_b: only contains f0400
1074        let mut st_b = StateTree::new(db.clone(), StateTreeVersion::V5).unwrap();
1075        st_b.set_actor(
1076            &Address::new_id(400),
1077            ActorState::new_empty(Cid::default(), Some(bls_b)),
1078        )
1079        .unwrap();
1080        let root_b = st_b.flush().unwrap();
1081
1082        let genesis = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1083            state_root: root_a,
1084            ..Default::default()
1085        }));
1086        db.put_cbor_default(genesis.block_headers().first())
1087            .unwrap();
1088
1089        let ts1 = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1090            parents: genesis.key().clone(),
1091            epoch: 1,
1092            state_root: root_a,
1093            timestamp: 1,
1094            ..Default::default()
1095        }));
1096        db.put_cbor_default(ts1.block_headers().first()).unwrap();
1097
1098        let head = Tipset::from(CachingBlockHeader::new(RawBlockHeader {
1099            parents: ts1.key().clone(),
1100            epoch: 2,
1101            state_root: root_b,
1102            timestamp: 2,
1103            ..Default::default()
1104        }));
1105        db.put_cbor_default(head.block_headers().first()).unwrap();
1106
1107        let cs = ChainStore::new(
1108            db.clone(),
1109            db.clone(),
1110            db,
1111            cfg,
1112            genesis.block_headers().first().clone(),
1113        )
1114        .unwrap();
1115
1116        // f0300 exists in lookback state (root_a) → resolves successfully.
1117        let result = Provider::resolve_to_deterministic_address_at_finality(
1118            &cs,
1119            &Address::new_id(300),
1120            &head,
1121        )
1122        .unwrap();
1123        assert_eq!(result, bls_a);
1124
1125        // f0400 exists only in head state (root_b), not in lookback → fails.
1126        Provider::resolve_to_deterministic_address_at_finality(&cs, &Address::new_id(400), &head)
1127            .expect_err("actor only in head state must not resolve via finality lookback");
1128    }
1129}