Skip to main content

forest/message_pool/msgpool/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub(in crate::message_pool) mod metrics;
5pub(in crate::message_pool) mod msg_pool;
6pub(in crate::message_pool) mod provider;
7pub mod selection;
8#[cfg(test)]
9pub mod test_provider;
10pub(in crate::message_pool) mod utils;
11
12use std::{borrow::BorrowMut, cmp::Ordering};
13
14use crate::blocks::Tipset;
15use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
16use crate::message::{MessageRead as _, SignedMessage};
17use crate::networks::ChainConfig;
18use crate::shim::{address::Address, crypto::Signature};
19use crate::state_manager::IdToAddressCache;
20use crate::utils::ShallowClone as _;
21use crate::utils::cache::SizeTrackingLruCache;
22use crate::utils::get_size::CidWrapper;
23use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
24use cid::Cid;
25use fvm_ipld_encoding::to_vec;
26use parking_lot::RwLock as SyncRwLock;
27use tracing::error;
28use utils::{get_base_fee_lower_bound, recover_sig};
29
30use super::errors::Error;
31use crate::message_pool::{
32    msg_chain::{Chains, create_message_chains},
33    msg_pool::{
34        MsgSet, StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, remove,
35        resolve_to_key,
36    },
37    provider::Provider,
38};
39
40const REPLACE_BY_FEE_RATIO: f32 = 1.25;
41const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64;
42const RBF_DENOM: u64 = 256;
43const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100;
44const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
45const REPUB_MSG_LIMIT: usize = 30;
46const MIN_GAS: u64 = 1298450;
47
48#[allow(clippy::too_many_arguments)]
49async fn republish_pending_messages<T>(
50    api: &T,
51    network_sender: &flume::Sender<NetworkMessage>,
52    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
53    cur_tipset: &SyncRwLock<Tipset>,
54    republished: &SyncRwLock<HashSet<Cid>>,
55    local_addrs: &SyncRwLock<Vec<Address>>,
56    key_cache: &IdToAddressCache,
57    chain_config: &ChainConfig,
58) -> Result<(), Error>
59where
60    T: Provider,
61{
62    let ts = cur_tipset.read().shallow_clone();
63    let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
64
65    republished.write().clear();
66
67    // Only republish messages from local addresses, ie. transactions which were
68    // sent to this node directly.
69    for actor in local_addrs.read().iter() {
70        let Ok(resolved) = resolve_to_key(api, key_cache, actor, &ts).inspect_err(|e| {
71            tracing::debug!(%actor, "republish: failed to resolve address: {e:#}");
72        }) else {
73            continue;
74        };
75        if let Some(mset) = pending.read().get(&resolved) {
76            if mset.msgs.is_empty() {
77                continue;
78            }
79            let mut pend: HashMap<u64, SignedMessage> = HashMap::with_capacity(mset.msgs.len());
80            for (nonce, m) in mset.msgs.clone().into_iter() {
81                pend.insert(nonce, m);
82            }
83            pending_map.insert(resolved, pend);
84        }
85    }
86
87    let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?;
88
89    let network_name = chain_config.network.genesis_name();
90    for m in msgs.iter() {
91        let mb = to_vec(m)?;
92        network_sender
93            .send_async(NetworkMessage::PubsubMessage {
94                topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
95                message: mb,
96            })
97            .await
98            .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
99    }
100
101    let mut republished_t = HashSet::new();
102    for m in msgs.iter() {
103        republished_t.insert(m.cid());
104    }
105    *republished.write() = republished_t;
106
107    Ok(())
108}
109
110/// Select messages from the mempool to be included in the next block that
111/// builds on a given base tipset. The messages should be eligible for inclusion
112/// based on their sequences and the overall number of them should observe block
113/// gas limits.
114fn select_messages_for_block<T>(
115    api: &T,
116    chain_config: &ChainConfig,
117    base: &Tipset,
118    pending: HashMap<Address, HashMap<u64, SignedMessage>>,
119) -> Result<Vec<SignedMessage>, Error>
120where
121    T: Provider,
122{
123    let mut msgs: Vec<SignedMessage> = vec![];
124
125    let base_fee = api.chain_compute_base_fee(base)?;
126    let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
127
128    if pending.is_empty() {
129        return Ok(msgs);
130    }
131
132    let mut chains = Chains::new();
133    for (actor, mset) in pending.iter() {
134        create_message_chains(
135            api,
136            actor,
137            mset,
138            &base_fee_lower_bound,
139            base,
140            &mut chains,
141            chain_config,
142        )?;
143    }
144
145    if chains.is_empty() {
146        return Ok(msgs);
147    }
148
149    chains.sort(false);
150
151    let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
152    let mut i = 0;
153    'l: while let Some(chain) = chains.get_mut_at(i) {
154        // we can exceed this if we have picked (some) longer chain already
155        if msgs.len() > REPUB_MSG_LIMIT {
156            break;
157        }
158
159        if gas_limit <= MIN_GAS {
160            break;
161        }
162
163        // check if chain has been invalidated
164        if !chain.valid {
165            i += 1;
166            continue;
167        }
168
169        // check if fits in block
170        if chain.gas_limit <= gas_limit {
171            // check the baseFee lower bound -- only republish messages that can be included
172            // in the chain within the next 20 blocks.
173            for m in chain.msgs.iter() {
174                if m.gas_fee_cap() < base_fee_lower_bound {
175                    let key = chains.get_key_at(i);
176                    chains.invalidate(key);
177                    continue 'l;
178                }
179                gas_limit = gas_limit.saturating_sub(m.gas_limit());
180                msgs.push(m.clone());
181            }
182
183            i += 1;
184            continue;
185        }
186
187        // we can't fit the current chain but there is gas to spare
188        // trim it and push it down
189        chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
190        let mut j = i;
191        while j < chains.len() - 1 {
192            #[allow(clippy::indexing_slicing)]
193            if chains[j].compare(&chains[j + 1]) == Ordering::Less {
194                break;
195            }
196            chains.key_vec.swap(i, i + 1);
197            j += 1;
198        }
199    }
200
201    if msgs.len() > REPUB_MSG_LIMIT {
202        msgs.truncate(REPUB_MSG_LIMIT);
203    }
204
205    Ok(msgs)
206}
207
208/// Revert and/or apply tipsets to the message pool. This function should be
209/// called every time that there is a head change in the message pool.
210///
211/// - **Apply**: messages included in the new tipset are removed from the pending
212///   pool via [`MsgSet::rm`] with `applied=true`.
213/// - **Revert**: messages from the reverted tipset are re-added to the pool with
214///   [`StrictnessPolicy::Relaxed`] and [`TrustPolicy::Trusted`], allowing them back without
215///   nonce gap restrictions.
216///
217/// The state nonce cache is naturally invalidated when the tipset changes, since
218/// it is keyed by [`TipsetKey`](crate::blocks::TipsetKey).
219#[allow(clippy::too_many_arguments)]
220pub async fn head_change<T>(
221    api: &T,
222    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
223    repub_trigger: flume::Sender<()>,
224    republished: &SyncRwLock<HashSet<Cid>>,
225    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
226    cur_tipset: &SyncRwLock<Tipset>,
227    key_cache: &IdToAddressCache,
228    state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
229    revert: Vec<Tipset>,
230    apply: Vec<Tipset>,
231) -> Result<(), Error>
232where
233    T: Provider + 'static,
234{
235    let mut repub = false;
236    let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
237    for ts in revert {
238        let Ok(pts) = api.load_tipset(ts.parents()) else {
239            tracing::error!("error loading reverted tipset parent");
240            continue;
241        };
242        *cur_tipset.write() = pts;
243
244        let mut msgs: Vec<SignedMessage> = Vec::new();
245        for block in ts.block_headers() {
246            let Ok((umsg, smsgs)) = api.messages_for_block(block) else {
247                tracing::error!("error retrieving messages for reverted block");
248                continue;
249            };
250            msgs.extend(smsgs);
251            for msg in umsg {
252                let msg_cid = msg.cid();
253                let Ok(smsg) = recover_sig(bls_sig_cache, msg) else {
254                    tracing::debug!("could not recover signature for bls message {}", msg_cid);
255                    continue;
256                };
257                msgs.push(smsg)
258            }
259        }
260
261        for msg in msgs {
262            add_to_selected_msgs(msg, rmsgs.borrow_mut());
263        }
264    }
265
266    for ts in apply {
267        let mpool_ctx = MpoolCtx {
268            api,
269            key_cache,
270            pending,
271            ts: &ts,
272        };
273        for b in ts.block_headers() {
274            let Ok((msgs, smsgs)) = api.messages_for_block(b) else {
275                tracing::error!("error retrieving messages for block");
276                continue;
277            };
278
279            for msg in smsgs {
280                mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?;
281                if !repub && republished.write().insert(msg.cid()) {
282                    repub = true;
283                }
284            }
285            for msg in msgs {
286                mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?;
287                if !repub && republished.write().insert(msg.cid()) {
288                    repub = true;
289                }
290            }
291        }
292        *cur_tipset.write() = ts;
293    }
294    if repub {
295        repub_trigger
296            .send_async(())
297            .await
298            .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
299    }
300    let cur_ts = cur_tipset.read().shallow_clone();
301    let mpool_ctx = MpoolCtx {
302        api,
303        key_cache,
304        pending,
305        ts: &cur_ts,
306    };
307    for (_, hm) in rmsgs {
308        for (_, msg) in hm {
309            let sequence = mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from())?;
310            if let Err(e) = add_helper(
311                api,
312                bls_sig_cache,
313                pending,
314                key_cache,
315                &cur_ts,
316                msg,
317                sequence,
318                TrustPolicy::Trusted,
319                StrictnessPolicy::Relaxed,
320            ) {
321                error!("Failed to read message from reorg to mpool: {}", e);
322            }
323        }
324    }
325    Ok(())
326}
327
328pub(in crate::message_pool) struct MpoolCtx<'a, T> {
329    pub api: &'a T,
330    pub key_cache: &'a IdToAddressCache,
331    pub pending: &'a SyncRwLock<HashMap<Address, MsgSet>>,
332    pub ts: &'a Tipset,
333}
334
335impl<T: Provider> MpoolCtx<'_, T> {
336    /// Removes a message from the selected messages map (`rmsgs`). If the
337    /// message is not found there, falls back to removing it from the pending set.
338    pub(in crate::message_pool) fn remove_from_selected_msgs(
339        &self,
340        from: &Address,
341        sequence: u64,
342        rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
343    ) -> Result<(), Error> {
344        if rmsgs
345            .get_mut(from)
346            .and_then(|temp| temp.remove(&sequence))
347            .is_none()
348            && let Ok(resolved) = resolve_to_key(self.api, self.key_cache, from, self.ts)
349                .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}"))
350        {
351            remove(&resolved, self.pending, sequence, true)?;
352        }
353        Ok(())
354    }
355
356    /// Get the state nonce for an address, accounting for messages already
357    /// included in the current tipset.
358    pub(in crate::message_pool) fn get_state_sequence(
359        &self,
360        state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
361        addr: &Address,
362    ) -> Result<u64, Error> {
363        msg_pool::get_state_sequence(self.api, self.key_cache, state_nonce_cache, addr, self.ts)
364    }
365}
366
367/// This is a helper function for `head_change`. This method will add a signed
368/// message to the given messages selected by priority `HashMap`.
369pub(in crate::message_pool) fn add_to_selected_msgs(
370    m: SignedMessage,
371    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
372) {
373    rmsgs.entry(m.from()).or_default().insert(m.sequence(), m);
374}
375
376#[cfg(test)]
377pub mod tests {
378    use std::{borrow::BorrowMut, time::Duration};
379
380    use crate::blocks::Tipset;
381    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
382    use crate::message::SignedMessage;
383    use crate::networks::ChainConfig;
384    use crate::shim::{
385        address::Address,
386        crypto::SignatureType,
387        econ::TokenAmount,
388        message::{Message, Message_v3},
389    };
390    use num_traits::Zero;
391    use test_provider::*;
392    use tokio::task::JoinSet;
393
394    use super::*;
395    use crate::message_pool::{
396        msg_chain::{Chains, create_message_chains},
397        msg_pool::MessagePool,
398    };
399
400    struct TestMpool {
401        mpool: MessagePool<TestApi>,
402        wallet: Wallet,
403        sender: Address,
404        target: Address,
405        services: JoinSet<anyhow::Result<()>>,
406        network_rx: flume::Receiver<NetworkMessage>,
407    }
408
409    fn make_test_mpool(
410        tma: TestApi,
411    ) -> (
412        MessagePool<TestApi>,
413        JoinSet<anyhow::Result<()>>,
414        flume::Receiver<NetworkMessage>,
415    ) {
416        let (tx, rx) = flume::bounded(50);
417        let mut services = JoinSet::new();
418        let mpool = MessagePool::new(
419            tma,
420            tx,
421            Default::default(),
422            Default::default(),
423            &mut services,
424        )
425        .unwrap();
426        (mpool, services, rx)
427    }
428
429    fn make_test_setup() -> TestMpool {
430        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
431        let mut wallet = Wallet::new(keystore);
432        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
433        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
434        let tma = TestApi::default();
435        tma.set_state_sequence(&sender, 0);
436        let (mpool, services, network_rx) = make_test_mpool(tma);
437        TestMpool {
438            mpool,
439            wallet,
440            sender,
441            target,
442            services,
443            network_rx,
444        }
445    }
446
447    #[tokio::test]
448    async fn test_per_actor_limit() {
449        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
450        let mut wallet = Wallet::new(keystore);
451        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
452        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
453        let tma = TestApi::with_max_actor_pending_messages(200);
454        tma.set_state_sequence(&sender, 0);
455        let (mpool, _services, _rx) = make_test_mpool(tma);
456
457        let mut smsg_vec = Vec::new();
458        for i in 0..(mpool.api.max_actor_pending_messages() + 1) {
459            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
460            smsg_vec.push(msg);
461        }
462
463        let (last, body) = smsg_vec.split_last().unwrap();
464        for smsg in body {
465            mpool.add(smsg.clone()).unwrap();
466        }
467        assert_eq!(
468            mpool.add(last.clone()),
469            Err(Error::TooManyPendingMessages(sender.to_string(), true))
470        );
471    }
472
473    pub fn create_smsg(
474        to: &Address,
475        from: &Address,
476        wallet: &mut Wallet,
477        sequence: u64,
478        gas_limit: i64,
479        gas_price: u64,
480    ) -> SignedMessage {
481        let umsg: Message = Message_v3 {
482            to: to.into(),
483            from: from.into(),
484            sequence,
485            gas_limit: gas_limit as u64,
486            gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
487            gas_premium: TokenAmount::from_atto(gas_price).into(),
488            ..Message_v3::default()
489        }
490        .into();
491        let msg_signing_bytes = umsg.cid().to_bytes();
492        let sig = wallet.sign(from, msg_signing_bytes.as_slice()).unwrap();
493        SignedMessage::new_unchecked(umsg, sig)
494    }
495
496    // Create a fake signed message with a dummy signature. While the signature is
497    // not valid, it has been added to the validation cache and the message will
498    // appear authentic.
499    pub fn create_fake_smsg(
500        pool: &MessagePool<TestApi>,
501        to: &Address,
502        from: &Address,
503        sequence: u64,
504        gas_limit: i64,
505        gas_price: u64,
506    ) -> SignedMessage {
507        let umsg: Message = Message_v3 {
508            to: to.into(),
509            from: from.into(),
510            sequence,
511            gas_limit: gas_limit as u64,
512            gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
513            gas_premium: TokenAmount::from_atto(gas_price).into(),
514            ..Message_v3::default()
515        }
516        .into();
517        let sig = Signature::new_secp256k1(vec![]);
518        let signed = SignedMessage::new_unchecked(umsg, sig);
519        let cid = signed.cid();
520        pool.sig_val_cache.push(cid.into(), ());
521        signed
522    }
523
524    #[tokio::test]
525    async fn test_message_pool() {
526        let TestMpool {
527            mpool,
528            mut wallet,
529            sender,
530            target,
531            ..
532        } = make_test_setup();
533
534        let mut smsg_vec = Vec::new();
535        for i in 0..2 {
536            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
537            smsg_vec.push(msg);
538        }
539
540        mpool.api.inner.lock().set_state_sequence(&sender, 0);
541        assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
542        mpool.add(smsg_vec[0].clone()).unwrap();
543        assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
544        mpool.add(smsg_vec[1].clone()).unwrap();
545        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
546
547        let a = mock_block(1, 1);
548
549        mpool.api.inner.lock().set_block_messages(&a, smsg_vec);
550        mpool
551            .apply_head_change(Vec::new(), vec![Tipset::from(a)])
552            .await
553            .unwrap();
554
555        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
556    }
557
558    #[tokio::test]
559    async fn test_revert_messages() {
560        let tma = TestApi::default();
561        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
562        let mut wallet = Wallet::new(keystore);
563
564        let a = mock_block(1, 1);
565        let tipset = Tipset::from(&a);
566        let b = mock_block_with_parents(&tipset, 1, 1);
567
568        let sender = wallet.generate_addr(SignatureType::Bls).unwrap();
569        let target = Address::new_id(1001);
570
571        let mut smsg_vec = Vec::new();
572
573        for i in 0..4 {
574            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
575            smsg_vec.push(msg);
576        }
577        let (mpool, _services, _rx) = make_test_mpool(tma);
578
579        {
580            let mut api_temp = mpool.api.inner.lock();
581            api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]);
582            api_temp.set_block_messages(&b.clone(), smsg_vec[1..4].to_vec());
583            api_temp.set_state_sequence(&sender, 0);
584            drop(api_temp);
585        }
586
587        mpool.add(smsg_vec[0].clone()).unwrap();
588        mpool.add(smsg_vec[1].clone()).unwrap();
589        mpool.add(smsg_vec[2].clone()).unwrap();
590        mpool.add(smsg_vec[3].clone()).unwrap();
591
592        mpool.api.set_state_sequence(&sender, 0);
593
594        mpool
595            .apply_head_change(Vec::new(), vec![Tipset::from(a)])
596            .await
597            .unwrap();
598
599        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
600
601        mpool.api.set_state_sequence(&sender, 1);
602
603        mpool
604            .apply_head_change(Vec::new(), vec![Tipset::from(&b)])
605            .await
606            .unwrap();
607
608        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
609
610        mpool.api.set_state_sequence(&sender, 0);
611
612        mpool
613            .apply_head_change(vec![Tipset::from(b)], Vec::new())
614            .await
615            .unwrap();
616
617        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
618
619        let (p, _) = mpool.pending();
620        assert_eq!(p.len(), 3);
621    }
622
623    #[tokio::test]
624    async fn test_get_sequence_resolves_id_address() {
625        let tma = TestApi::default();
626        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
627        let mut wallet = Wallet::new(keystore);
628        let key_addr = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
629        let id_addr = Address::new_id(999);
630
631        tma.set_key_address_mapping(&id_addr, &key_addr);
632        tma.set_state_sequence(&key_addr, 0);
633        let (mpool, _services, _rx) = make_test_mpool(tma);
634
635        let target = Address::new_id(1001);
636        for i in 0..3 {
637            let msg = create_smsg(&target, &key_addr, wallet.borrow_mut(), i, 1000000, 1);
638            mpool.add(msg).unwrap();
639        }
640
641        // get_sequence with ID address should see the same pending messages
642        assert_eq!(mpool.get_sequence(&id_addr).unwrap(), 3);
643        assert_eq!(mpool.get_sequence(&key_addr).unwrap(), 3);
644    }
645
646    #[tokio::test]
647    async fn test_pending_for_resolves_id_address() {
648        let tma = TestApi::default();
649        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
650        let mut wallet = Wallet::new(keystore);
651        let key_addr = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
652        let id_addr = Address::new_id(888);
653
654        tma.set_key_address_mapping(&id_addr, &key_addr);
655        tma.set_state_sequence(&key_addr, 0);
656        let (mpool, _services, _rx) = make_test_mpool(tma);
657
658        let target = Address::new_id(1001);
659        for i in 0..2 {
660            let msg = create_smsg(&target, &key_addr, wallet.borrow_mut(), i, 1000000, 1);
661            mpool.add(msg).unwrap();
662        }
663
664        // pending_for with ID address should find messages added under key address
665        let msgs = mpool
666            .pending_for(&id_addr)
667            .expect("should find pending messages");
668        assert_eq!(msgs.len(), 2);
669
670        // pending_for with key address should also work
671        let msgs2 = mpool
672            .pending_for(&key_addr)
673            .expect("should find pending messages");
674        assert_eq!(msgs2.len(), 2);
675    }
676
677    #[tokio::test]
678    async fn test_add_with_id_from_resolves_to_key_in_pending() {
679        let tma = TestApi::default();
680        let key_addr = Address::new_bls(&[11u8; 48]).unwrap();
681        let id_addr = Address::new_id(777);
682
683        tma.set_key_address_mapping(&id_addr, &key_addr);
684        tma.set_state_sequence(&key_addr, 0);
685        let (mpool, _services, _rx) = make_test_mpool(tma);
686
687        // Create a message with the ID address as sender and a fake signature
688        let msg = create_fake_smsg(&mpool, &Address::new_id(1001), &id_addr, 0, 1000000, 1);
689        mpool.add(msg).unwrap();
690
691        // Pending map should be keyed by key_addr, not id_addr
692        let pending = mpool.pending.read();
693        assert!(
694            pending.get(&key_addr).is_some(),
695            "pending should be keyed by resolved key address"
696        );
697        assert!(
698            pending.get(&id_addr).is_none(),
699            "pending should NOT have entry under raw ID address"
700        );
701    }
702
703    #[tokio::test]
704    async fn test_head_change_removes_via_resolved_address() {
705        let tma = TestApi::default();
706        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
707        let mut wallet = Wallet::new(keystore);
708        let key_addr = wallet.generate_addr(SignatureType::Bls).unwrap();
709        let id_addr = Address::new_id(555);
710
711        tma.set_key_address_mapping(&id_addr, &key_addr);
712        tma.set_state_sequence(&key_addr, 0);
713
714        let a = mock_block(1, 1);
715        let (mpool, _services, _rx) = make_test_mpool(tma);
716
717        let target = Address::new_id(1001);
718        let msg0 = create_smsg(&target, &key_addr, wallet.borrow_mut(), 0, 1000000, 1);
719        let msg1 = create_smsg(&target, &key_addr, wallet.borrow_mut(), 1, 1000000, 1);
720        mpool.add(msg0.clone()).unwrap();
721        mpool.add(msg1).unwrap();
722        assert_eq!(mpool.get_sequence(&key_addr).unwrap(), 2);
723
724        // Block messages are stored under the key_addr (as would appear on chain).
725        // The head_change remove path resolves addresses before touching pending.
726        mpool.api.inner.lock().set_block_messages(&a, vec![msg0]);
727
728        mpool.api.set_state_sequence(&key_addr, 1);
729
730        mpool
731            .apply_head_change(Vec::new(), vec![Tipset::from(a)])
732            .await
733            .unwrap();
734
735        // msg0 was applied on chain, msg1 remains pending
736        assert_eq!(mpool.get_sequence(&id_addr).unwrap(), 2);
737        let msgs = mpool
738            .pending_for(&key_addr)
739            .expect("should have remaining msg");
740        assert_eq!(msgs.len(), 1);
741        assert_eq!(msgs[0].sequence(), 1);
742    }
743
744    #[tokio::test]
745    async fn test_async_message_pool() {
746        let TestMpool {
747            mpool,
748            mut wallet,
749            sender,
750            target,
751            services: _services,
752            network_rx: _network_rx,
753        } = make_test_setup();
754
755        let mut smsg_vec = Vec::new();
756        for i in 0..3 {
757            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
758            smsg_vec.push(msg);
759        }
760
761        assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
762        mpool.push(smsg_vec[0].clone()).await.unwrap();
763        assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
764        mpool.push(smsg_vec[1].clone()).await.unwrap();
765        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
766        mpool.push(smsg_vec[2].clone()).await.unwrap();
767        assert_eq!(mpool.get_sequence(&sender).unwrap(), 3);
768
769        let header = mock_block(1, 1);
770        let tipset = Tipset::from(&header.clone());
771
772        mpool.api.set_heaviest_tipset(tipset.clone());
773
774        // sleep allows for async block to update mpool's cur_tipset
775        tokio::time::sleep(Duration::new(2, 0)).await;
776
777        let cur_ts = mpool.current_tipset();
778        assert_eq!(cur_ts, tipset);
779    }
780
781    #[tokio::test]
782    async fn test_msg_chains() {
783        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
784        let mut wallet = Wallet::new(keystore);
785        let a1 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
786        let a2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
787        let tma = TestApi::default();
788        let gas_limit = 6955002;
789
790        let a = mock_block(1, 1);
791        let ts = Tipset::from(a);
792        let chain_config = ChainConfig::default();
793
794        // --- Test Chain Aggregations ---
795        // Test 1: 10 messages from a1 to a2, with increasing gasPerf; it should
796        // 	       make a single chain with 10 messages given enough balance
797        let mut mset = HashMap::new();
798        let mut smsg_vec = Vec::new();
799        for i in 0..10 {
800            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
801            smsg_vec.push(msg.clone());
802            mset.insert(i, msg);
803        }
804
805        let mut chains = Chains::new();
806        create_message_chains(
807            &tma,
808            &a1,
809            &mset,
810            &TokenAmount::zero(),
811            &ts,
812            &mut chains,
813            &chain_config,
814        )
815        .unwrap();
816        assert_eq!(chains.len(), 1, "expected a single chain");
817        assert_eq!(
818            chains[0].msgs.len(),
819            10,
820            "expected 10 messages in single chain, got: {}",
821            chains[0].msgs.len()
822        );
823        for (i, m) in chains[0].msgs.iter().enumerate() {
824            assert_eq!(
825                m.sequence(),
826                i as u64,
827                "expected sequence {} but got {}",
828                i,
829                m.sequence()
830            );
831        }
832
833        // Test 2: 10 messages from a1 to a2, with decreasing gasPerf; it should
834        // 	         make 10 chains with 1 message each
835        let mut mset = HashMap::new();
836        let mut smsg_vec = Vec::new();
837        for i in 0..10 {
838            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 10 - i);
839            smsg_vec.push(msg.clone());
840            mset.insert(i, msg);
841        }
842        let mut chains = Chains::new();
843        create_message_chains(
844            &tma,
845            &a1,
846            &mset,
847            &TokenAmount::zero(),
848            &ts,
849            &mut chains,
850            &chain_config,
851        )
852        .unwrap();
853        assert_eq!(chains.len(), 10, "expected 10 chains");
854
855        for i in 0..chains.len() {
856            assert_eq!(
857                chains[i].msgs.len(),
858                1,
859                "expected 1 message in chain {} but got {}",
860                i,
861                chains[i].msgs.len()
862            );
863        }
864
865        for i in 0..chains.len() {
866            let m = &chains[i].msgs[0];
867            assert_eq!(
868                m.sequence(),
869                i as u64,
870                "expected sequence {} but got {}",
871                i,
872                m.sequence()
873            );
874        }
875
876        // Test 3a: 10 messages from a1 to a2, with gasPerf increasing in groups of 3;
877        // it should          merge them in two chains, one with 9 messages and
878        // one with the last message
879        let mut mset = HashMap::new();
880        let mut smsg_vec = Vec::new();
881        for i in 0..10 {
882            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i % 3);
883            smsg_vec.push(msg.clone());
884            mset.insert(i, msg);
885        }
886        let mut chains = Chains::new();
887        create_message_chains(
888            &tma,
889            &a1,
890            &mset,
891            &TokenAmount::zero(),
892            &ts,
893            &mut chains,
894            &chain_config,
895        )
896        .unwrap();
897        assert_eq!(chains.len(), 2, "expected 2 chains");
898        assert_eq!(chains[0].msgs.len(), 9);
899        assert_eq!(chains[1].msgs.len(), 1);
900        let mut next_nonce = 0;
901        for i in 0..chains.len() {
902            for m in chains[i].msgs.iter() {
903                assert_eq!(
904                    next_nonce,
905                    m.sequence(),
906                    "expected nonce {} but got {}",
907                    next_nonce,
908                    m.sequence()
909                );
910                next_nonce += 1;
911            }
912        }
913
914        // Test 3b: 10 messages from a1 to a2, with gasPerf decreasing in groups of 3
915        // with a bias for the          earlier chains; it should make 4 chains,
916        // the first 3 with 3 messages and the last with          a single
917        // message
918        let mut mset = HashMap::new();
919        let mut smsg_vec = Vec::new();
920        for i in 0..10 {
921            let bias = (12 - i) / 3;
922            let msg = create_smsg(
923                &a2,
924                &a1,
925                wallet.borrow_mut(),
926                i,
927                gas_limit,
928                1 + i % 3 + bias,
929            );
930            smsg_vec.push(msg.clone());
931            mset.insert(i, msg);
932        }
933
934        let mut chains = Chains::new();
935        create_message_chains(
936            &tma,
937            &a1,
938            &mset,
939            &TokenAmount::zero(),
940            &ts,
941            &mut chains,
942            &chain_config,
943        )
944        .unwrap();
945
946        for i in 0..chains.len() {
947            let expected_len = if i > 2 { 1 } else { 3 };
948            assert_eq!(
949                chains[i].msgs.len(),
950                expected_len,
951                "expected {} message in chain {} but got {}",
952                expected_len,
953                i,
954                chains[i].msgs.len()
955            );
956        }
957
958        let mut next_nonce = 0;
959        for i in 0..chains.len() {
960            for m in chains[i].msgs.iter() {
961                assert_eq!(
962                    next_nonce,
963                    m.sequence(),
964                    "expected nonce {} but got {}",
965                    next_nonce,
966                    m.sequence()
967                );
968                next_nonce += 1;
969            }
970        }
971
972        // --- Test Chain Breaks ---
973        // Test 4: 10 messages with non-consecutive nonces; it should make a single
974        // chain with just         the first message
975        let mut mset = HashMap::new();
976        let mut smsg_vec = Vec::new();
977        for i in 0..10 {
978            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i * 2, gas_limit, 1 + i);
979            smsg_vec.push(msg.clone());
980            mset.insert(i, msg);
981        }
982
983        let mut chains = Chains::new();
984        create_message_chains(
985            &tma,
986            &a1,
987            &mset,
988            &TokenAmount::zero(),
989            &ts,
990            &mut chains,
991            &chain_config,
992        )
993        .unwrap();
994        assert_eq!(chains.len(), 1, "expected a single chain");
995        for (i, m) in chains[0].msgs.iter().enumerate() {
996            assert_eq!(
997                m.sequence(),
998                i as u64,
999                "expected nonce {} but got {}",
1000                i,
1001                m.sequence()
1002            );
1003        }
1004
1005        // Test 5: 10 messages with increasing gasLimit, except for the 6th message
1006        // which has less than         the epoch gasLimit; it should create a
1007        // single chain with the first 5 messages
1008        let mut mset = HashMap::new();
1009        let mut smsg_vec = Vec::new();
1010        tma.set_state_balance_raw(&a1, TokenAmount::from_atto(1_000_000_000_000_000_000_u64));
1011        for i in 0..10 {
1012            let msg = if i != 5 {
1013                create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i)
1014            } else {
1015                create_smsg(&a2, &a1, wallet.borrow_mut(), i, 1, 1 + i)
1016            };
1017            smsg_vec.push(msg.clone());
1018            mset.insert(i, msg);
1019        }
1020        let mut chains = Chains::new();
1021        create_message_chains(
1022            &tma,
1023            &a1,
1024            &mset,
1025            &TokenAmount::zero(),
1026            &ts,
1027            &mut chains,
1028            &chain_config,
1029        )
1030        .unwrap();
1031        assert_eq!(chains.len(), 1, "expected a single chain");
1032        assert_eq!(chains[0].msgs.len(), 5);
1033        for (i, m) in chains[0].msgs.iter().enumerate() {
1034            assert_eq!(
1035                m.sequence(),
1036                i as u64,
1037                "expected nonce {} but got {}",
1038                i,
1039                m.sequence()
1040            );
1041        }
1042
1043        // Test 6: one more message than what can fit in a block according to gas limit,
1044        // with increasing         gasPerf; it should create a single chain with
1045        // the max messages
1046        let mut mset = HashMap::new();
1047        let mut smsg_vec = Vec::new();
1048        let max_messages = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / gas_limit;
1049        let n_messages = max_messages + 1;
1050        for i in 0..n_messages {
1051            let msg = create_smsg(
1052                &a2,
1053                &a1,
1054                wallet.borrow_mut(),
1055                i as u64,
1056                gas_limit,
1057                (1 + i) as u64,
1058            );
1059            smsg_vec.push(msg.clone());
1060            mset.insert(i as u64, msg);
1061        }
1062        let mut chains = Chains::new();
1063        create_message_chains(
1064            &tma,
1065            &a1,
1066            &mset,
1067            &TokenAmount::zero(),
1068            &ts,
1069            &mut chains,
1070            &chain_config,
1071        )
1072        .unwrap();
1073        assert_eq!(chains.len(), 1, "expected a single chain");
1074        assert_eq!(chains[0].msgs.len(), max_messages as usize);
1075        for (i, m) in chains[0].msgs.iter().enumerate() {
1076            assert_eq!(
1077                m.sequence(),
1078                i as u64,
1079                "expected nonce {} but got {}",
1080                i,
1081                m.sequence()
1082            );
1083        }
1084
1085        // Test 7: insufficient balance for all messages
1086        tma.set_state_balance_raw(&a1, TokenAmount::from_atto(300 * gas_limit + 1));
1087        let mut mset = HashMap::new();
1088        let mut smsg_vec = Vec::new();
1089        for i in 0..10 {
1090            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
1091            smsg_vec.push(msg.clone());
1092            mset.insert(i, msg);
1093        }
1094        let mut chains = Chains::new();
1095        create_message_chains(
1096            &tma,
1097            &a1,
1098            &mset,
1099            &TokenAmount::zero(),
1100            &ts,
1101            &mut chains,
1102            &chain_config,
1103        )
1104        .unwrap();
1105        assert_eq!(chains.len(), 1, "expected a single chain");
1106        assert_eq!(chains[0].msgs.len(), 2);
1107        for (i, m) in chains[0].msgs.iter().enumerate() {
1108            assert_eq!(
1109                m.sequence(),
1110                i as u64,
1111                "expected nonce {} but got {}",
1112                i,
1113                m.sequence()
1114            );
1115        }
1116    }
1117}