Skip to main content

forest/message_pool/msgpool/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub(in crate::message_pool) mod events;
5pub(in crate::message_pool) mod metrics;
6pub(in crate::message_pool) mod msg_pool;
7pub(in crate::message_pool) mod msg_set;
8pub(in crate::message_pool) mod pending_store;
9pub(in crate::message_pool) mod provider;
10pub mod selection;
11#[cfg(test)]
12pub mod test_provider;
13pub(in crate::message_pool) mod utils;
14// TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941
15#[allow(unused_imports)]
16pub use events::MpoolUpdate;
17
18use std::{borrow::BorrowMut, cmp::Ordering};
19
20use crate::blocks::Tipset;
21use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
22use crate::message::{MessageRead as _, SignedMessage};
23use crate::networks::ChainConfig;
24use crate::shim::{address::Address, crypto::Signature};
25use crate::state_manager::IdToAddressCache;
26use crate::utils::ShallowClone as _;
27use crate::utils::cache::SizeTrackingLruCache;
28use crate::utils::get_size::CidWrapper;
29use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
30use cid::Cid;
31use fvm_ipld_encoding::to_vec;
32use parking_lot::RwLock as SyncRwLock;
33use tracing::error;
34use utils::{get_base_fee_lower_bound, recover_sig};
35
36use super::errors::Error;
37use crate::message_pool::{
38    msg_chain::{Chains, create_message_chains},
39    msg_pool::{StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key},
40    msgpool::pending_store::PendingStore,
41    provider::Provider,
42};
43
44const REPLACE_BY_FEE_RATIO: f32 = 1.25;
45const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64;
46const RBF_DENOM: u64 = 256;
47const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100;
48const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
49const REPUB_MSG_LIMIT: usize = 30;
50const MIN_GAS: u64 = 1298450;
51
52#[allow(clippy::too_many_arguments)]
53async fn republish_pending_messages<T>(
54    api: &T,
55    network_sender: &flume::Sender<NetworkMessage>,
56    pending_store: &PendingStore,
57    cur_tipset: &SyncRwLock<Tipset>,
58    republished: &SyncRwLock<HashSet<Cid>>,
59    local_addrs: &SyncRwLock<Vec<Address>>,
60    key_cache: &IdToAddressCache,
61    chain_config: &ChainConfig,
62) -> Result<(), Error>
63where
64    T: Provider,
65{
66    let ts = cur_tipset.read().shallow_clone();
67    let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
68
69    republished.write().clear();
70
71    // Only republish messages from local addresses, ie. transactions which were
72    // sent to this node directly.
73    for actor in local_addrs.read().iter() {
74        let Ok(resolved) = resolve_to_key(api, key_cache, actor, &ts).inspect_err(|e| {
75            tracing::debug!(%actor, "republish: failed to resolve address: {e:#}");
76        }) else {
77            continue;
78        };
79        if let Some(mset) = pending_store.snapshot_for(&resolved) {
80            if mset.msgs.is_empty() {
81                continue;
82            }
83            pending_map.insert(resolved, mset.msgs);
84        }
85    }
86
87    let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?;
88
89    let network_name = chain_config.network.genesis_name();
90    for m in msgs.iter() {
91        let mb = to_vec(m)?;
92        network_sender
93            .send_async(NetworkMessage::PubsubMessage {
94                topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
95                message: mb,
96            })
97            .await
98            .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
99    }
100
101    let mut republished_t = HashSet::new();
102    for m in msgs.iter() {
103        republished_t.insert(m.cid());
104    }
105    *republished.write() = republished_t;
106
107    Ok(())
108}
109
110/// Select messages from the mempool to be included in the next block that
111/// builds on a given base tipset. The messages should be eligible for inclusion
112/// based on their sequences and the overall number of them should observe block
113/// gas limits.
114fn select_messages_for_block<T>(
115    api: &T,
116    chain_config: &ChainConfig,
117    base: &Tipset,
118    pending: HashMap<Address, HashMap<u64, SignedMessage>>,
119) -> Result<Vec<SignedMessage>, Error>
120where
121    T: Provider,
122{
123    let mut msgs: Vec<SignedMessage> = vec![];
124
125    let base_fee = api.chain_compute_base_fee(base)?;
126    let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
127
128    if pending.is_empty() {
129        return Ok(msgs);
130    }
131
132    let mut chains = Chains::new();
133    for (actor, mset) in pending.iter() {
134        create_message_chains(
135            api,
136            actor,
137            mset,
138            &base_fee_lower_bound,
139            base,
140            &mut chains,
141            chain_config,
142        )?;
143    }
144
145    if chains.is_empty() {
146        return Ok(msgs);
147    }
148
149    chains.sort(false);
150
151    let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
152    let mut i = 0;
153    'l: while let Some(chain) = chains.get_mut_at(i) {
154        // we can exceed this if we have picked (some) longer chain already
155        if msgs.len() > REPUB_MSG_LIMIT {
156            break;
157        }
158
159        if gas_limit <= MIN_GAS {
160            break;
161        }
162
163        // check if chain has been invalidated
164        if !chain.valid {
165            i += 1;
166            continue;
167        }
168
169        // check if fits in block
170        if chain.gas_limit <= gas_limit {
171            // check the baseFee lower bound -- only republish messages that can be included
172            // in the chain within the next 20 blocks.
173            for m in chain.msgs.iter() {
174                if m.gas_fee_cap() < base_fee_lower_bound {
175                    let key = chains.get_key_at(i);
176                    chains.invalidate(key);
177                    continue 'l;
178                }
179                gas_limit = gas_limit.saturating_sub(m.gas_limit());
180                msgs.push(m.clone());
181            }
182
183            i += 1;
184            continue;
185        }
186
187        // we can't fit the current chain but there is gas to spare
188        // trim it and push it down
189        chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
190        let mut j = i;
191        while j < chains.len() - 1 {
192            #[allow(clippy::indexing_slicing)]
193            if chains[j].compare(&chains[j + 1]) == Ordering::Less {
194                break;
195            }
196            chains.key_vec.swap(i, i + 1);
197            j += 1;
198        }
199    }
200
201    if msgs.len() > REPUB_MSG_LIMIT {
202        msgs.truncate(REPUB_MSG_LIMIT);
203    }
204
205    Ok(msgs)
206}
207
208/// Revert and/or apply tipsets to the message pool. This function should be
209/// called every time that there is a head change in the message pool.
210///
211/// - **Apply**: messages included in the new tipset are removed from the pending
212///   pool via [`MsgSet::rm`] with `applied=true`.
213/// - **Revert**: messages from the reverted tipset are re-added to the pool with
214///   [`StrictnessPolicy::Relaxed`] and [`TrustPolicy::Trusted`], allowing them back without
215///   nonce gap restrictions.
216///
217/// The state nonce cache is naturally invalidated when the tipset changes, since
218/// it is keyed by [`TipsetKey`](crate::blocks::TipsetKey).
219#[allow(clippy::too_many_arguments)]
220pub(in crate::message_pool) async fn head_change<T>(
221    api: &T,
222    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
223    repub_trigger: flume::Sender<()>,
224    republished: &SyncRwLock<HashSet<Cid>>,
225    pending_store: &PendingStore,
226    cur_tipset: &SyncRwLock<Tipset>,
227    key_cache: &IdToAddressCache,
228    state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
229    revert: Vec<Tipset>,
230    apply: Vec<Tipset>,
231) -> Result<(), Error>
232where
233    T: Provider + 'static,
234{
235    let mut repub = false;
236    let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
237    for ts in revert {
238        let Ok(pts) = api.load_tipset(ts.parents()) else {
239            tracing::error!("error loading reverted tipset parent");
240            continue;
241        };
242        *cur_tipset.write() = pts;
243
244        let mut msgs: Vec<SignedMessage> = Vec::new();
245        for block in ts.block_headers() {
246            let Ok((umsg, smsgs)) = api.messages_for_block(block) else {
247                tracing::error!("error retrieving messages for reverted block");
248                continue;
249            };
250            msgs.extend(smsgs);
251            for msg in umsg {
252                let msg_cid = msg.cid();
253                let Ok(smsg) = recover_sig(bls_sig_cache, msg) else {
254                    tracing::debug!("could not recover signature for bls message {}", msg_cid);
255                    continue;
256                };
257                msgs.push(smsg)
258            }
259        }
260
261        for msg in msgs {
262            add_to_selected_msgs(msg, rmsgs.borrow_mut());
263        }
264    }
265
266    for ts in apply {
267        let mpool_ctx = MpoolCtx {
268            api,
269            key_cache,
270            pending_store,
271            ts: &ts,
272        };
273        for b in ts.block_headers() {
274            let Ok((msgs, smsgs)) = api.messages_for_block(b) else {
275                tracing::error!("error retrieving messages for block");
276                continue;
277            };
278
279            for msg in smsgs {
280                mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?;
281                if !repub && republished.write().insert(msg.cid()) {
282                    repub = true;
283                }
284            }
285            for msg in msgs {
286                mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?;
287                if !repub && republished.write().insert(msg.cid()) {
288                    repub = true;
289                }
290            }
291        }
292        *cur_tipset.write() = ts;
293    }
294    if repub {
295        repub_trigger
296            .send_async(())
297            .await
298            .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
299    }
300    let cur_ts = cur_tipset.read().shallow_clone();
301    let mpool_ctx = MpoolCtx {
302        api,
303        key_cache,
304        pending_store,
305        ts: &cur_ts,
306    };
307    for (_, hm) in rmsgs {
308        for (_, msg) in hm {
309            let sequence = match mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from()) {
310                Ok(seq) => seq,
311                Err(e) => {
312                    tracing::debug!("Failed to get the state sequence: {}", e);
313                    continue;
314                }
315            };
316            if let Err(e) = add_helper(
317                api,
318                bls_sig_cache,
319                pending_store,
320                key_cache,
321                &cur_ts,
322                msg,
323                sequence,
324                TrustPolicy::Trusted,
325                StrictnessPolicy::Relaxed,
326            ) {
327                error!("Failed to read message from reorg to mpool: {}", e);
328            }
329        }
330    }
331    Ok(())
332}
333
334pub(in crate::message_pool) struct MpoolCtx<'a, T> {
335    pub api: &'a T,
336    pub key_cache: &'a IdToAddressCache,
337    pub pending_store: &'a PendingStore,
338    pub ts: &'a Tipset,
339}
340
341impl<T: Provider> MpoolCtx<'_, T> {
342    /// Remove a message from the selected messages map (`rmsgs`). If the
343    /// message is not there, fall back to removing it from the pending store.
344    pub(in crate::message_pool) fn remove_from_selected_msgs(
345        &self,
346        from: &Address,
347        sequence: u64,
348        rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
349    ) -> Result<(), Error> {
350        if rmsgs
351            .get_mut(from)
352            .and_then(|temp| temp.remove(&sequence))
353            .is_none()
354            && let Ok(resolved) = resolve_to_key(self.api, self.key_cache, from, self.ts)
355                .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}"))
356        {
357            let _ = self.pending_store.remove(&resolved, sequence, true);
358        }
359        Ok(())
360    }
361
362    /// Get the state nonce for an address, accounting for messages already
363    /// included in the current tipset.
364    pub(in crate::message_pool) fn get_state_sequence(
365        &self,
366        state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
367        addr: &Address,
368    ) -> Result<u64, Error> {
369        msg_pool::get_state_sequence(self.api, self.key_cache, state_nonce_cache, addr, self.ts)
370    }
371}
372
373/// This is a helper function for `head_change`. This method will add a signed
374/// message to the given messages selected by priority `HashMap`.
375pub(in crate::message_pool) fn add_to_selected_msgs(
376    m: SignedMessage,
377    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
378) {
379    rmsgs.entry(m.from()).or_default().insert(m.sequence(), m);
380}
381
382#[cfg(test)]
383pub mod tests {
384    use std::{borrow::BorrowMut, time::Duration};
385
386    use crate::blocks::Tipset;
387    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
388    use crate::message::SignedMessage;
389    use crate::networks::ChainConfig;
390    use crate::shim::{
391        address::Address,
392        crypto::SignatureType,
393        econ::TokenAmount,
394        message::{Message, Message_v3},
395    };
396    use num_traits::Zero;
397    use test_provider::*;
398    use tokio::task::JoinSet;
399
400    use super::*;
401    use crate::message_pool::{
402        msg_chain::{Chains, create_message_chains},
403        msg_pool::MessagePool,
404    };
405
406    struct TestMpool {
407        mpool: MessagePool<TestApi>,
408        wallet: Wallet,
409        sender: Address,
410        target: Address,
411        services: JoinSet<anyhow::Result<()>>,
412        network_rx: flume::Receiver<NetworkMessage>,
413    }
414
415    fn make_test_mpool(
416        tma: TestApi,
417    ) -> (
418        MessagePool<TestApi>,
419        JoinSet<anyhow::Result<()>>,
420        flume::Receiver<NetworkMessage>,
421    ) {
422        let (tx, rx) = flume::bounded(50);
423        let mut services = JoinSet::new();
424        let mpool = MessagePool::new(
425            tma,
426            tx,
427            Default::default(),
428            Default::default(),
429            &mut services,
430        )
431        .unwrap();
432        (mpool, services, rx)
433    }
434
435    fn make_test_setup() -> TestMpool {
436        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
437        let mut wallet = Wallet::new(keystore);
438        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
439        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
440        let tma = TestApi::default();
441        tma.set_state_sequence(&sender, 0);
442        let (mpool, services, network_rx) = make_test_mpool(tma);
443        TestMpool {
444            mpool,
445            wallet,
446            sender,
447            target,
448            services,
449            network_rx,
450        }
451    }
452
453    #[tokio::test]
454    async fn test_per_actor_limit() {
455        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
456        let mut wallet = Wallet::new(keystore);
457        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
458        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
459        let tma = TestApi::with_max_actor_pending_messages(200);
460        tma.set_state_sequence(&sender, 0);
461        let (mpool, _services, _rx) = make_test_mpool(tma);
462
463        let mut smsg_vec = Vec::new();
464        for i in 0..(mpool.api.max_actor_pending_messages() + 1) {
465            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
466            smsg_vec.push(msg);
467        }
468
469        let (last, body) = smsg_vec.split_last().unwrap();
470        for smsg in body {
471            mpool.add(smsg.clone()).unwrap();
472        }
473        assert_eq!(
474            mpool.add(last.clone()),
475            Err(Error::TooManyPendingMessages(sender.to_string(), true))
476        );
477    }
478
479    pub fn create_smsg(
480        to: &Address,
481        from: &Address,
482        wallet: &mut Wallet,
483        sequence: u64,
484        gas_limit: i64,
485        gas_price: u64,
486    ) -> SignedMessage {
487        let umsg: Message = Message_v3 {
488            to: to.into(),
489            from: from.into(),
490            sequence,
491            gas_limit: gas_limit as u64,
492            gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
493            gas_premium: TokenAmount::from_atto(gas_price).into(),
494            ..Message_v3::default()
495        }
496        .into();
497        let msg_signing_bytes = umsg.cid().to_bytes();
498        let sig = wallet.sign(from, msg_signing_bytes.as_slice()).unwrap();
499        SignedMessage::new_unchecked(umsg, sig)
500    }
501
502    // Create a fake signed message with a dummy signature. While the signature is
503    // not valid, it has been added to the validation cache and the message will
504    // appear authentic.
505    pub fn create_fake_smsg(
506        pool: &MessagePool<TestApi>,
507        to: &Address,
508        from: &Address,
509        sequence: u64,
510        gas_limit: i64,
511        gas_price: u64,
512    ) -> SignedMessage {
513        let umsg: Message = Message_v3 {
514            to: to.into(),
515            from: from.into(),
516            sequence,
517            gas_limit: gas_limit as u64,
518            gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
519            gas_premium: TokenAmount::from_atto(gas_price).into(),
520            ..Message_v3::default()
521        }
522        .into();
523        let sig = Signature::new_secp256k1(vec![]);
524        let signed = SignedMessage::new_unchecked(umsg, sig);
525        let cid = signed.cid();
526        pool.sig_val_cache.push(cid.into(), ());
527        signed
528    }
529
530    #[tokio::test]
531    async fn test_message_pool() {
532        let TestMpool {
533            mpool,
534            mut wallet,
535            sender,
536            target,
537            ..
538        } = make_test_setup();
539
540        let mut smsg_vec = Vec::new();
541        for i in 0..2 {
542            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
543            smsg_vec.push(msg);
544        }
545
546        mpool.api.inner.lock().set_state_sequence(&sender, 0);
547        assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
548        mpool.add(smsg_vec[0].clone()).unwrap();
549        assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
550        mpool.add(smsg_vec[1].clone()).unwrap();
551        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
552
553        let a = mock_block(1, 1);
554
555        mpool.api.inner.lock().set_block_messages(&a, smsg_vec);
556        mpool
557            .apply_head_change(Vec::new(), vec![Tipset::from(a)])
558            .await
559            .unwrap();
560
561        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
562    }
563
564    #[tokio::test]
565    async fn test_revert_messages() {
566        let tma = TestApi::default();
567        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
568        let mut wallet = Wallet::new(keystore);
569
570        let a = mock_block(1, 1);
571        let tipset = Tipset::from(&a);
572        let b = mock_block_with_parents(&tipset, 1, 1);
573
574        let sender = wallet.generate_addr(SignatureType::Bls).unwrap();
575        let target = Address::new_id(1001);
576
577        let mut smsg_vec = Vec::new();
578
579        for i in 0..4 {
580            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
581            smsg_vec.push(msg);
582        }
583        let (mpool, _services, _rx) = make_test_mpool(tma);
584
585        {
586            let mut api_temp = mpool.api.inner.lock();
587            api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]);
588            api_temp.set_block_messages(&b.clone(), smsg_vec[1..4].to_vec());
589            api_temp.set_state_sequence(&sender, 0);
590            drop(api_temp);
591        }
592
593        mpool.add(smsg_vec[0].clone()).unwrap();
594        mpool.add(smsg_vec[1].clone()).unwrap();
595        mpool.add(smsg_vec[2].clone()).unwrap();
596        mpool.add(smsg_vec[3].clone()).unwrap();
597
598        mpool.api.set_state_sequence(&sender, 0);
599
600        mpool
601            .apply_head_change(Vec::new(), vec![Tipset::from(a)])
602            .await
603            .unwrap();
604
605        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
606
607        mpool.api.set_state_sequence(&sender, 1);
608
609        mpool
610            .apply_head_change(Vec::new(), vec![Tipset::from(&b)])
611            .await
612            .unwrap();
613
614        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
615
616        mpool.api.set_state_sequence(&sender, 0);
617
618        mpool
619            .apply_head_change(vec![Tipset::from(b)], Vec::new())
620            .await
621            .unwrap();
622
623        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
624
625        let (p, _) = mpool.pending();
626        assert_eq!(p.len(), 3);
627    }
628
629    #[tokio::test]
630    async fn test_get_sequence_resolves_id_address() {
631        let tma = TestApi::default();
632        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
633        let mut wallet = Wallet::new(keystore);
634        let key_addr = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
635        let id_addr = Address::new_id(999);
636
637        tma.set_key_address_mapping(&id_addr, &key_addr);
638        tma.set_state_sequence(&key_addr, 0);
639        let (mpool, _services, _rx) = make_test_mpool(tma);
640
641        let target = Address::new_id(1001);
642        for i in 0..3 {
643            let msg = create_smsg(&target, &key_addr, wallet.borrow_mut(), i, 1000000, 1);
644            mpool.add(msg).unwrap();
645        }
646
647        // get_sequence with ID address should see the same pending messages
648        assert_eq!(mpool.get_sequence(&id_addr).unwrap(), 3);
649        assert_eq!(mpool.get_sequence(&key_addr).unwrap(), 3);
650    }
651
652    #[tokio::test]
653    async fn test_pending_for_resolves_id_address() {
654        let tma = TestApi::default();
655        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
656        let mut wallet = Wallet::new(keystore);
657        let key_addr = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
658        let id_addr = Address::new_id(888);
659
660        tma.set_key_address_mapping(&id_addr, &key_addr);
661        tma.set_state_sequence(&key_addr, 0);
662        let (mpool, _services, _rx) = make_test_mpool(tma);
663
664        let target = Address::new_id(1001);
665        for i in 0..2 {
666            let msg = create_smsg(&target, &key_addr, wallet.borrow_mut(), i, 1000000, 1);
667            mpool.add(msg).unwrap();
668        }
669
670        // pending_for with ID address should find messages added under key address
671        let msgs = mpool
672            .pending_for(&id_addr)
673            .expect("should find pending messages");
674        assert_eq!(msgs.len(), 2);
675
676        // pending_for with key address should also work
677        let msgs2 = mpool
678            .pending_for(&key_addr)
679            .expect("should find pending messages");
680        assert_eq!(msgs2.len(), 2);
681    }
682
683    #[tokio::test]
684    async fn test_add_with_id_from_resolves_to_key_in_pending() {
685        let tma = TestApi::default();
686        let key_addr = Address::new_bls(&[11u8; 48]).unwrap();
687        let id_addr = Address::new_id(777);
688
689        tma.set_key_address_mapping(&id_addr, &key_addr);
690        tma.set_state_sequence(&key_addr, 0);
691        let (mpool, _services, _rx) = make_test_mpool(tma);
692
693        // Create a message with the ID address as sender and a fake signature
694        let msg = create_fake_smsg(&mpool, &Address::new_id(1001), &id_addr, 0, 1000000, 1);
695        mpool.add(msg).unwrap();
696
697        // Pending map should be keyed by key_addr, not id_addr.
698        assert!(
699            mpool.pending_store.snapshot_for(&key_addr).is_some(),
700            "pending should be keyed by resolved key address"
701        );
702        assert!(
703            mpool.pending_store.snapshot_for(&id_addr).is_none(),
704            "pending should NOT have entry under raw ID address"
705        );
706    }
707
708    #[tokio::test]
709    async fn test_head_change_removes_via_resolved_address() {
710        let tma = TestApi::default();
711        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
712        let mut wallet = Wallet::new(keystore);
713        let key_addr = wallet.generate_addr(SignatureType::Bls).unwrap();
714        let id_addr = Address::new_id(555);
715
716        tma.set_key_address_mapping(&id_addr, &key_addr);
717        tma.set_state_sequence(&key_addr, 0);
718
719        let a = mock_block(1, 1);
720        let (mpool, _services, _rx) = make_test_mpool(tma);
721
722        let target = Address::new_id(1001);
723        let msg0 = create_smsg(&target, &key_addr, wallet.borrow_mut(), 0, 1000000, 1);
724        let msg1 = create_smsg(&target, &key_addr, wallet.borrow_mut(), 1, 1000000, 1);
725        mpool.add(msg0.clone()).unwrap();
726        mpool.add(msg1).unwrap();
727        assert_eq!(mpool.get_sequence(&key_addr).unwrap(), 2);
728
729        // Block messages are stored under the key_addr (as would appear on chain).
730        // The head_change remove path resolves addresses before touching pending.
731        mpool.api.inner.lock().set_block_messages(&a, vec![msg0]);
732
733        mpool.api.set_state_sequence(&key_addr, 1);
734
735        mpool
736            .apply_head_change(Vec::new(), vec![Tipset::from(a)])
737            .await
738            .unwrap();
739
740        // msg0 was applied on chain, msg1 remains pending
741        assert_eq!(mpool.get_sequence(&id_addr).unwrap(), 2);
742        let msgs = mpool
743            .pending_for(&key_addr)
744            .expect("should have remaining msg");
745        assert_eq!(msgs.len(), 1);
746        assert_eq!(msgs[0].sequence(), 1);
747    }
748
749    #[tokio::test]
750    async fn test_async_message_pool() {
751        let TestMpool {
752            mpool,
753            mut wallet,
754            sender,
755            target,
756            services: _services,
757            network_rx: _network_rx,
758        } = make_test_setup();
759
760        let mut smsg_vec = Vec::new();
761        for i in 0..3 {
762            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
763            smsg_vec.push(msg);
764        }
765
766        assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
767        mpool.push(smsg_vec[0].clone()).await.unwrap();
768        assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
769        mpool.push(smsg_vec[1].clone()).await.unwrap();
770        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
771        mpool.push(smsg_vec[2].clone()).await.unwrap();
772        assert_eq!(mpool.get_sequence(&sender).unwrap(), 3);
773
774        let header = mock_block(1, 1);
775        let tipset = Tipset::from(&header.clone());
776
777        mpool.api.set_heaviest_tipset(tipset.clone());
778
779        // sleep allows for async block to update mpool's cur_tipset
780        tokio::time::sleep(Duration::new(2, 0)).await;
781
782        let cur_ts = mpool.current_tipset();
783        assert_eq!(cur_ts, tipset);
784    }
785
786    #[tokio::test]
787    async fn test_msg_chains() {
788        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
789        let mut wallet = Wallet::new(keystore);
790        let a1 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
791        let a2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
792        let tma = TestApi::default();
793        let gas_limit = 6955002;
794
795        let a = mock_block(1, 1);
796        let ts = Tipset::from(a);
797        let chain_config = ChainConfig::default();
798
799        // --- Test Chain Aggregations ---
800        // Test 1: 10 messages from a1 to a2, with increasing gasPerf; it should
801        // 	       make a single chain with 10 messages given enough balance
802        let mut mset = HashMap::new();
803        let mut smsg_vec = Vec::new();
804        for i in 0..10 {
805            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
806            smsg_vec.push(msg.clone());
807            mset.insert(i, msg);
808        }
809
810        let mut chains = Chains::new();
811        create_message_chains(
812            &tma,
813            &a1,
814            &mset,
815            &TokenAmount::zero(),
816            &ts,
817            &mut chains,
818            &chain_config,
819        )
820        .unwrap();
821        assert_eq!(chains.len(), 1, "expected a single chain");
822        assert_eq!(
823            chains[0].msgs.len(),
824            10,
825            "expected 10 messages in single chain, got: {}",
826            chains[0].msgs.len()
827        );
828        for (i, m) in chains[0].msgs.iter().enumerate() {
829            assert_eq!(
830                m.sequence(),
831                i as u64,
832                "expected sequence {} but got {}",
833                i,
834                m.sequence()
835            );
836        }
837
838        // Test 2: 10 messages from a1 to a2, with decreasing gasPerf; it should
839        // 	         make 10 chains with 1 message each
840        let mut mset = HashMap::new();
841        let mut smsg_vec = Vec::new();
842        for i in 0..10 {
843            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 10 - i);
844            smsg_vec.push(msg.clone());
845            mset.insert(i, msg);
846        }
847        let mut chains = Chains::new();
848        create_message_chains(
849            &tma,
850            &a1,
851            &mset,
852            &TokenAmount::zero(),
853            &ts,
854            &mut chains,
855            &chain_config,
856        )
857        .unwrap();
858        assert_eq!(chains.len(), 10, "expected 10 chains");
859
860        for i in 0..chains.len() {
861            assert_eq!(
862                chains[i].msgs.len(),
863                1,
864                "expected 1 message in chain {} but got {}",
865                i,
866                chains[i].msgs.len()
867            );
868        }
869
870        for i in 0..chains.len() {
871            let m = &chains[i].msgs[0];
872            assert_eq!(
873                m.sequence(),
874                i as u64,
875                "expected sequence {} but got {}",
876                i,
877                m.sequence()
878            );
879        }
880
881        // Test 3a: 10 messages from a1 to a2, with gasPerf increasing in groups of 3;
882        // it should          merge them in two chains, one with 9 messages and
883        // one with the last message
884        let mut mset = HashMap::new();
885        let mut smsg_vec = Vec::new();
886        for i in 0..10 {
887            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i % 3);
888            smsg_vec.push(msg.clone());
889            mset.insert(i, msg);
890        }
891        let mut chains = Chains::new();
892        create_message_chains(
893            &tma,
894            &a1,
895            &mset,
896            &TokenAmount::zero(),
897            &ts,
898            &mut chains,
899            &chain_config,
900        )
901        .unwrap();
902        assert_eq!(chains.len(), 2, "expected 2 chains");
903        assert_eq!(chains[0].msgs.len(), 9);
904        assert_eq!(chains[1].msgs.len(), 1);
905        let mut next_nonce = 0;
906        for i in 0..chains.len() {
907            for m in chains[i].msgs.iter() {
908                assert_eq!(
909                    next_nonce,
910                    m.sequence(),
911                    "expected nonce {} but got {}",
912                    next_nonce,
913                    m.sequence()
914                );
915                next_nonce += 1;
916            }
917        }
918
919        // Test 3b: 10 messages from a1 to a2, with gasPerf decreasing in groups of 3
920        // with a bias for the          earlier chains; it should make 4 chains,
921        // the first 3 with 3 messages and the last with          a single
922        // message
923        let mut mset = HashMap::new();
924        let mut smsg_vec = Vec::new();
925        for i in 0..10 {
926            let bias = (12 - i) / 3;
927            let msg = create_smsg(
928                &a2,
929                &a1,
930                wallet.borrow_mut(),
931                i,
932                gas_limit,
933                1 + i % 3 + bias,
934            );
935            smsg_vec.push(msg.clone());
936            mset.insert(i, msg);
937        }
938
939        let mut chains = Chains::new();
940        create_message_chains(
941            &tma,
942            &a1,
943            &mset,
944            &TokenAmount::zero(),
945            &ts,
946            &mut chains,
947            &chain_config,
948        )
949        .unwrap();
950
951        for i in 0..chains.len() {
952            let expected_len = if i > 2 { 1 } else { 3 };
953            assert_eq!(
954                chains[i].msgs.len(),
955                expected_len,
956                "expected {} message in chain {} but got {}",
957                expected_len,
958                i,
959                chains[i].msgs.len()
960            );
961        }
962
963        let mut next_nonce = 0;
964        for i in 0..chains.len() {
965            for m in chains[i].msgs.iter() {
966                assert_eq!(
967                    next_nonce,
968                    m.sequence(),
969                    "expected nonce {} but got {}",
970                    next_nonce,
971                    m.sequence()
972                );
973                next_nonce += 1;
974            }
975        }
976
977        // --- Test Chain Breaks ---
978        // Test 4: 10 messages with non-consecutive nonces; it should make a single
979        // chain with just         the first message
980        let mut mset = HashMap::new();
981        let mut smsg_vec = Vec::new();
982        for i in 0..10 {
983            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i * 2, gas_limit, 1 + i);
984            smsg_vec.push(msg.clone());
985            mset.insert(i, msg);
986        }
987
988        let mut chains = Chains::new();
989        create_message_chains(
990            &tma,
991            &a1,
992            &mset,
993            &TokenAmount::zero(),
994            &ts,
995            &mut chains,
996            &chain_config,
997        )
998        .unwrap();
999        assert_eq!(chains.len(), 1, "expected a single chain");
1000        for (i, m) in chains[0].msgs.iter().enumerate() {
1001            assert_eq!(
1002                m.sequence(),
1003                i as u64,
1004                "expected nonce {} but got {}",
1005                i,
1006                m.sequence()
1007            );
1008        }
1009
1010        // Test 5: 10 messages with increasing gasLimit, except for the 6th message
1011        // which has less than         the epoch gasLimit; it should create a
1012        // single chain with the first 5 messages
1013        let mut mset = HashMap::new();
1014        let mut smsg_vec = Vec::new();
1015        tma.set_state_balance_raw(&a1, TokenAmount::from_atto(1_000_000_000_000_000_000_u64));
1016        for i in 0..10 {
1017            let msg = if i != 5 {
1018                create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i)
1019            } else {
1020                create_smsg(&a2, &a1, wallet.borrow_mut(), i, 1, 1 + i)
1021            };
1022            smsg_vec.push(msg.clone());
1023            mset.insert(i, msg);
1024        }
1025        let mut chains = Chains::new();
1026        create_message_chains(
1027            &tma,
1028            &a1,
1029            &mset,
1030            &TokenAmount::zero(),
1031            &ts,
1032            &mut chains,
1033            &chain_config,
1034        )
1035        .unwrap();
1036        assert_eq!(chains.len(), 1, "expected a single chain");
1037        assert_eq!(chains[0].msgs.len(), 5);
1038        for (i, m) in chains[0].msgs.iter().enumerate() {
1039            assert_eq!(
1040                m.sequence(),
1041                i as u64,
1042                "expected nonce {} but got {}",
1043                i,
1044                m.sequence()
1045            );
1046        }
1047
1048        // Test 6: one more message than what can fit in a block according to gas limit,
1049        // with increasing         gasPerf; it should create a single chain with
1050        // the max messages
1051        let mut mset = HashMap::new();
1052        let mut smsg_vec = Vec::new();
1053        let max_messages = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / gas_limit;
1054        let n_messages = max_messages + 1;
1055        for i in 0..n_messages {
1056            let msg = create_smsg(
1057                &a2,
1058                &a1,
1059                wallet.borrow_mut(),
1060                i as u64,
1061                gas_limit,
1062                (1 + i) as u64,
1063            );
1064            smsg_vec.push(msg.clone());
1065            mset.insert(i as u64, msg);
1066        }
1067        let mut chains = Chains::new();
1068        create_message_chains(
1069            &tma,
1070            &a1,
1071            &mset,
1072            &TokenAmount::zero(),
1073            &ts,
1074            &mut chains,
1075            &chain_config,
1076        )
1077        .unwrap();
1078        assert_eq!(chains.len(), 1, "expected a single chain");
1079        assert_eq!(chains[0].msgs.len(), max_messages as usize);
1080        for (i, m) in chains[0].msgs.iter().enumerate() {
1081            assert_eq!(
1082                m.sequence(),
1083                i as u64,
1084                "expected nonce {} but got {}",
1085                i,
1086                m.sequence()
1087            );
1088        }
1089
1090        // Test 7: insufficient balance for all messages
1091        tma.set_state_balance_raw(&a1, TokenAmount::from_atto(300 * gas_limit + 1));
1092        let mut mset = HashMap::new();
1093        let mut smsg_vec = Vec::new();
1094        for i in 0..10 {
1095            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
1096            smsg_vec.push(msg.clone());
1097            mset.insert(i, msg);
1098        }
1099        let mut chains = Chains::new();
1100        create_message_chains(
1101            &tma,
1102            &a1,
1103            &mset,
1104            &TokenAmount::zero(),
1105            &ts,
1106            &mut chains,
1107            &chain_config,
1108        )
1109        .unwrap();
1110        assert_eq!(chains.len(), 1, "expected a single chain");
1111        assert_eq!(chains[0].msgs.len(), 2);
1112        for (i, m) in chains[0].msgs.iter().enumerate() {
1113            assert_eq!(
1114                m.sequence(),
1115                i as u64,
1116                "expected nonce {} but got {}",
1117                i,
1118                m.sequence()
1119            );
1120        }
1121    }
1122}