Skip to main content

forest/message_pool/msgpool/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub(in crate::message_pool) mod metrics;
5pub(in crate::message_pool) mod msg_pool;
6pub(in crate::message_pool) mod provider;
7pub mod selection;
8#[cfg(test)]
9pub mod test_provider;
10pub(in crate::message_pool) mod utils;
11
12use std::{borrow::BorrowMut, cmp::Ordering};
13
14use crate::blocks::Tipset;
15use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
16use crate::message::{Message as MessageTrait, SignedMessage};
17use crate::networks::ChainConfig;
18use crate::shim::{address::Address, crypto::Signature};
19use crate::utils::cache::SizeTrackingLruCache;
20use crate::utils::get_size::CidWrapper;
21use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
22use cid::Cid;
23use fvm_ipld_encoding::to_vec;
24use parking_lot::RwLock as SyncRwLock;
25use tracing::error;
26use utils::{get_base_fee_lower_bound, recover_sig};
27
28use super::errors::Error;
29use crate::message_pool::{
30    msg_chain::{Chains, create_message_chains},
31    msg_pool::{MsgSet, TrustPolicy, add_helper, remove},
32    provider::Provider,
33};
34
35const REPLACE_BY_FEE_RATIO: f32 = 1.25;
36const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64;
37const RBF_DENOM: u64 = 256;
38const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100;
39const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
40const REPUB_MSG_LIMIT: usize = 30;
41const MIN_GAS: u64 = 1298450;
42
43/// Get the state of the `base_sequence` for a given address in the current
44/// Tipset
45fn get_state_sequence<T>(api: &T, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error>
46where
47    T: Provider,
48{
49    let actor = api.get_actor_after(addr, cur_ts)?;
50    let base_sequence = actor.sequence;
51
52    Ok(base_sequence)
53}
54
55#[allow(clippy::too_many_arguments)]
56async fn republish_pending_messages<T>(
57    api: &T,
58    network_sender: &flume::Sender<NetworkMessage>,
59    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
60    cur_tipset: &SyncRwLock<Tipset>,
61    republished: &SyncRwLock<HashSet<Cid>>,
62    local_addrs: &SyncRwLock<Vec<Address>>,
63    chain_config: &ChainConfig,
64) -> Result<(), Error>
65where
66    T: Provider,
67{
68    let ts = cur_tipset.read().clone();
69    let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
70
71    republished.write().clear();
72
73    // Only republish messages from local addresses, ie. transactions which were
74    // sent to this node directly.
75    for actor in local_addrs.read().iter() {
76        if let Some(mset) = pending.read().get(actor) {
77            if mset.msgs.is_empty() {
78                continue;
79            }
80            let mut pend: HashMap<u64, SignedMessage> = HashMap::with_capacity(mset.msgs.len());
81            for (nonce, m) in mset.msgs.clone().into_iter() {
82                pend.insert(nonce, m);
83            }
84            pending_map.insert(*actor, pend);
85        }
86    }
87
88    let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?;
89
90    let network_name = chain_config.network.genesis_name();
91    for m in msgs.iter() {
92        let mb = to_vec(m)?;
93        network_sender
94            .send_async(NetworkMessage::PubsubMessage {
95                topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
96                message: mb,
97            })
98            .await
99            .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
100    }
101
102    let mut republished_t = HashSet::new();
103    for m in msgs.iter() {
104        republished_t.insert(m.cid());
105    }
106    *republished.write() = republished_t;
107
108    Ok(())
109}
110
111/// Select messages from the mempool to be included in the next block that
112/// builds on a given base tipset. The messages should be eligible for inclusion
113/// based on their sequences and the overall number of them should observe block
114/// gas limits.
115fn select_messages_for_block<T>(
116    api: &T,
117    chain_config: &ChainConfig,
118    base: &Tipset,
119    pending: HashMap<Address, HashMap<u64, SignedMessage>>,
120) -> Result<Vec<SignedMessage>, Error>
121where
122    T: Provider,
123{
124    let mut msgs: Vec<SignedMessage> = vec![];
125
126    let base_fee = api.chain_compute_base_fee(base)?;
127    let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
128
129    if pending.is_empty() {
130        return Ok(msgs);
131    }
132
133    let mut chains = Chains::new();
134    for (actor, mset) in pending.iter() {
135        create_message_chains(
136            api,
137            actor,
138            mset,
139            &base_fee_lower_bound,
140            base,
141            &mut chains,
142            chain_config,
143        )?;
144    }
145
146    if chains.is_empty() {
147        return Ok(msgs);
148    }
149
150    chains.sort(false);
151
152    let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
153    let mut i = 0;
154    'l: while let Some(chain) = chains.get_mut_at(i) {
155        // we can exceed this if we have picked (some) longer chain already
156        if msgs.len() > REPUB_MSG_LIMIT {
157            break;
158        }
159
160        if gas_limit <= MIN_GAS {
161            break;
162        }
163
164        // check if chain has been invalidated
165        if !chain.valid {
166            i += 1;
167            continue;
168        }
169
170        // check if fits in block
171        if chain.gas_limit <= gas_limit {
172            // check the baseFee lower bound -- only republish messages that can be included
173            // in the chain within the next 20 blocks.
174            for m in chain.msgs.iter() {
175                if m.gas_fee_cap() < base_fee_lower_bound {
176                    let key = chains.get_key_at(i);
177                    chains.invalidate(key);
178                    continue 'l;
179                }
180                gas_limit = gas_limit.saturating_sub(m.gas_limit());
181                msgs.push(m.clone());
182            }
183
184            i += 1;
185            continue;
186        }
187
188        // we can't fit the current chain but there is gas to spare
189        // trim it and push it down
190        chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
191        let mut j = i;
192        while j < chains.len() - 1 {
193            #[allow(clippy::indexing_slicing)]
194            if chains[j].compare(&chains[j + 1]) == Ordering::Less {
195                break;
196            }
197            chains.key_vec.swap(i, i + 1);
198            j += 1;
199        }
200    }
201
202    if msgs.len() > REPUB_MSG_LIMIT {
203        msgs.truncate(REPUB_MSG_LIMIT);
204    }
205
206    Ok(msgs)
207}
208
209/// This function will revert and/or apply tipsets to the message pool. This
210/// function should be called every time that there is a head change in the
211/// message pool.
212#[allow(clippy::too_many_arguments)]
213pub async fn head_change<T>(
214    api: &T,
215    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
216    repub_trigger: flume::Sender<()>,
217    republished: &SyncRwLock<HashSet<Cid>>,
218    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
219    cur_tipset: &SyncRwLock<Tipset>,
220    revert: Vec<Tipset>,
221    apply: Vec<Tipset>,
222) -> Result<(), Error>
223where
224    T: Provider + 'static,
225{
226    let mut repub = false;
227    let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
228    for ts in revert {
229        let pts = api.load_tipset(ts.parents())?;
230        *cur_tipset.write() = pts;
231
232        let mut msgs: Vec<SignedMessage> = Vec::new();
233        for block in ts.block_headers() {
234            let (umsg, smsgs) = api.messages_for_block(block)?;
235            msgs.extend(smsgs);
236            for msg in umsg {
237                let smsg = recover_sig(bls_sig_cache, msg)?;
238                msgs.push(smsg)
239            }
240        }
241
242        for msg in msgs {
243            add_to_selected_msgs(msg, rmsgs.borrow_mut());
244        }
245    }
246
247    for ts in apply {
248        for b in ts.block_headers() {
249            let (msgs, smsgs) = api.messages_for_block(b)?;
250
251            for msg in smsgs {
252                remove_from_selected_msgs(
253                    &msg.from(),
254                    pending,
255                    msg.sequence(),
256                    rmsgs.borrow_mut(),
257                )?;
258                if !repub && republished.write().insert(msg.cid()) {
259                    repub = true;
260                }
261            }
262            for msg in msgs {
263                remove_from_selected_msgs(&msg.from, pending, msg.sequence, rmsgs.borrow_mut())?;
264                if !repub && republished.write().insert(msg.cid()) {
265                    repub = true;
266                }
267            }
268        }
269        *cur_tipset.write() = ts;
270    }
271    if repub {
272        repub_trigger
273            .send_async(())
274            .await
275            .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
276    }
277    for (_, hm) in rmsgs {
278        for (_, msg) in hm {
279            let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.read().clone())?;
280            if let Err(e) = add_helper(
281                api,
282                bls_sig_cache,
283                pending,
284                msg,
285                sequence,
286                TrustPolicy::Trusted,
287            ) {
288                error!("Failed to read message from reorg to mpool: {}", e);
289            }
290        }
291    }
292    Ok(())
293}
294
295/// This is a helper function for `head_change`. This method will remove a
296/// sequence for a from address from the messages selected by priority hash-map.
297/// It also removes the 'from' address and sequence from the `MessagePool`.
298pub(in crate::message_pool) fn remove_from_selected_msgs(
299    from: &Address,
300    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
301    sequence: u64,
302    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
303) -> Result<(), Error> {
304    if let Some(temp) = rmsgs.get_mut(from) {
305        if temp.get_mut(&sequence).is_some() {
306            temp.remove(&sequence);
307        } else {
308            remove(from, pending, sequence, true)?;
309        }
310    } else {
311        remove(from, pending, sequence, true)?;
312    }
313    Ok(())
314}
315
316/// This is a helper function for `head_change`. This method will add a signed
317/// message to the given messages selected by priority `HashMap`.
318pub(in crate::message_pool) fn add_to_selected_msgs(
319    m: SignedMessage,
320    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
321) {
322    let s = rmsgs.get_mut(&m.from());
323    if let Some(s) = s {
324        s.insert(m.sequence(), m);
325    } else {
326        rmsgs.insert(m.from(), HashMap::new());
327        rmsgs.get_mut(&m.from()).unwrap().insert(m.sequence(), m);
328    }
329}
330
331#[cfg(test)]
332pub mod tests {
333    use std::{borrow::BorrowMut, time::Duration};
334
335    use crate::blocks::Tipset;
336    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
337    use crate::message::SignedMessage;
338    use crate::networks::ChainConfig;
339    use crate::shim::{
340        address::Address,
341        crypto::SignatureType,
342        econ::TokenAmount,
343        message::{Message, Message_v3},
344    };
345    use num_traits::Zero;
346    use test_provider::*;
347    use tokio::task::JoinSet;
348
349    use super::*;
350    use crate::message_pool::{
351        msg_chain::{Chains, create_message_chains},
352        msg_pool::MessagePool,
353    };
354
355    #[tokio::test]
356    async fn test_per_actor_limit() {
357        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
358        let mut wallet = Wallet::new(keystore);
359        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
360        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
361        let tma = TestApi::with_max_actor_pending_messages(200);
362        tma.set_state_sequence(&sender, 0);
363
364        let (tx, _rx) = flume::bounded(50);
365        let mut services = JoinSet::new();
366        let mpool = MessagePool::new(
367            tma,
368            tx,
369            Default::default(),
370            Default::default(),
371            &mut services,
372        )
373        .unwrap();
374        let mut smsg_vec = Vec::new();
375        for i in 0..(mpool.api.max_actor_pending_messages() + 1) {
376            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
377            smsg_vec.push(msg);
378        }
379
380        let (last, body) = smsg_vec.split_last().unwrap();
381        for smsg in body {
382            mpool.add(smsg.clone()).unwrap();
383        }
384        assert_eq!(
385            mpool.add(last.clone()),
386            Err(Error::TooManyPendingMessages(sender.to_string(), true))
387        );
388    }
389
390    pub fn create_smsg(
391        to: &Address,
392        from: &Address,
393        wallet: &mut Wallet,
394        sequence: u64,
395        gas_limit: i64,
396        gas_price: u64,
397    ) -> SignedMessage {
398        let umsg: Message = Message_v3 {
399            to: to.into(),
400            from: from.into(),
401            sequence,
402            gas_limit: gas_limit as u64,
403            gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
404            gas_premium: TokenAmount::from_atto(gas_price).into(),
405            ..Message_v3::default()
406        }
407        .into();
408        let msg_signing_bytes = umsg.cid().to_bytes();
409        let sig = wallet.sign(from, msg_signing_bytes.as_slice()).unwrap();
410        SignedMessage::new_unchecked(umsg, sig)
411    }
412
413    // Create a fake signed message with a dummy signature. While the signature is
414    // not valid, it has been added to the validation cache and the message will
415    // appear authentic.
416    pub fn create_fake_smsg(
417        pool: &MessagePool<TestApi>,
418        to: &Address,
419        from: &Address,
420        sequence: u64,
421        gas_limit: i64,
422        gas_price: u64,
423    ) -> SignedMessage {
424        let umsg: Message = Message_v3 {
425            to: to.into(),
426            from: from.into(),
427            sequence,
428            gas_limit: gas_limit as u64,
429            gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
430            gas_premium: TokenAmount::from_atto(gas_price).into(),
431            ..Message_v3::default()
432        }
433        .into();
434        let sig = Signature::new_secp256k1(vec![]);
435        let signed = SignedMessage::new_unchecked(umsg, sig);
436        let cid = signed.cid();
437        pool.sig_val_cache.push(cid.into(), ());
438        signed
439    }
440
441    #[tokio::test]
442    async fn test_message_pool() {
443        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
444        let mut wallet = Wallet::new(keystore);
445        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
446        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
447        let tma = TestApi::default();
448        tma.set_state_sequence(&sender, 0);
449
450        let (tx, _rx) = flume::bounded(50);
451        let mut services = JoinSet::new();
452        let mpool = MessagePool::new(
453            tma,
454            tx,
455            Default::default(),
456            Default::default(),
457            &mut services,
458        )
459        .unwrap();
460        let mut smsg_vec = Vec::new();
461        for i in 0..2 {
462            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
463            smsg_vec.push(msg);
464        }
465
466        mpool.api.inner.lock().set_state_sequence(&sender, 0);
467        assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
468        mpool.add(smsg_vec[0].clone()).unwrap();
469        assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
470        mpool.add(smsg_vec[1].clone()).unwrap();
471        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
472
473        let a = mock_block(1, 1);
474
475        mpool.api.inner.lock().set_block_messages(&a, smsg_vec);
476        let api = mpool.api.clone();
477        let bls_sig_cache = mpool.bls_sig_cache.clone();
478        let pending = mpool.pending.clone();
479        let cur_tipset = mpool.cur_tipset.clone();
480        let repub_trigger = mpool.repub_trigger.clone();
481        let republished = mpool.republished.clone();
482        head_change(
483            api.as_ref(),
484            bls_sig_cache.as_ref(),
485            repub_trigger,
486            republished.as_ref(),
487            pending.as_ref(),
488            cur_tipset.as_ref(),
489            Vec::new(),
490            vec![Tipset::from(a)],
491        )
492        .await
493        .unwrap();
494
495        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
496    }
497
498    #[tokio::test]
499    async fn test_revert_messages() {
500        let tma = TestApi::default();
501        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
502        let mut wallet = Wallet::new(keystore);
503
504        let a = mock_block(1, 1);
505        let tipset = Tipset::from(&a);
506        let b = mock_block_with_parents(&tipset, 1, 1);
507
508        let sender = wallet.generate_addr(SignatureType::Bls).unwrap();
509        let target = Address::new_id(1001);
510
511        let mut smsg_vec = Vec::new();
512
513        for i in 0..4 {
514            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
515            smsg_vec.push(msg);
516        }
517        let (tx, _rx) = flume::bounded(50);
518        let mut services = JoinSet::new();
519        let mpool = MessagePool::new(
520            tma,
521            tx,
522            Default::default(),
523            Default::default(),
524            &mut services,
525        )
526        .unwrap();
527
528        {
529            let mut api_temp = mpool.api.inner.lock();
530            api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]);
531            api_temp.set_block_messages(&b.clone(), smsg_vec[1..4].to_vec());
532            api_temp.set_state_sequence(&sender, 0);
533            drop(api_temp);
534        }
535
536        mpool.add(smsg_vec[0].clone()).unwrap();
537        mpool.add(smsg_vec[1].clone()).unwrap();
538        mpool.add(smsg_vec[2].clone()).unwrap();
539        mpool.add(smsg_vec[3].clone()).unwrap();
540
541        mpool.api.set_state_sequence(&sender, 0);
542
543        let api = mpool.api.clone();
544        let bls_sig_cache = mpool.bls_sig_cache.clone();
545        let pending = mpool.pending.clone();
546        let cur_tipset = mpool.cur_tipset.clone();
547        let repub_trigger = mpool.repub_trigger.clone();
548        let republished = mpool.republished.clone();
549        head_change(
550            api.as_ref(),
551            bls_sig_cache.as_ref(),
552            repub_trigger.clone(),
553            republished.as_ref(),
554            pending.as_ref(),
555            cur_tipset.as_ref(),
556            Vec::new(),
557            vec![Tipset::from(a)],
558        )
559        .await
560        .unwrap();
561
562        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
563
564        mpool.api.set_state_sequence(&sender, 1);
565
566        let api = mpool.api.clone();
567        let bls_sig_cache = mpool.bls_sig_cache.clone();
568        let pending = mpool.pending.clone();
569        let cur_tipset = mpool.cur_tipset.clone();
570
571        head_change(
572            api.as_ref(),
573            bls_sig_cache.as_ref(),
574            repub_trigger.clone(),
575            republished.as_ref(),
576            pending.as_ref(),
577            cur_tipset.as_ref(),
578            Vec::new(),
579            vec![Tipset::from(&b)],
580        )
581        .await
582        .unwrap();
583
584        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
585
586        mpool.api.set_state_sequence(&sender, 0);
587
588        head_change(
589            api.as_ref(),
590            bls_sig_cache.as_ref(),
591            repub_trigger.clone(),
592            republished.as_ref(),
593            pending.as_ref(),
594            cur_tipset.as_ref(),
595            vec![Tipset::from(b)],
596            Vec::new(),
597        )
598        .await
599        .unwrap();
600
601        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
602
603        let (p, _) = mpool.pending().unwrap();
604        assert_eq!(p.len(), 3);
605    }
606
607    #[tokio::test]
608    async fn test_async_message_pool() {
609        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
610        let mut wallet = Wallet::new(keystore);
611        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
612        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
613
614        let tma = TestApi::default();
615        tma.set_state_sequence(&sender, 0);
616        let (tx, _rx) = flume::bounded(50);
617        let mut services = JoinSet::new();
618        let mpool = MessagePool::new(
619            tma,
620            tx,
621            Default::default(),
622            Default::default(),
623            &mut services,
624        )
625        .unwrap();
626
627        let mut smsg_vec = Vec::new();
628        for i in 0..3 {
629            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
630            smsg_vec.push(msg);
631        }
632
633        assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
634        mpool.push(smsg_vec[0].clone()).await.unwrap();
635        assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
636        mpool.push(smsg_vec[1].clone()).await.unwrap();
637        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
638        mpool.push(smsg_vec[2].clone()).await.unwrap();
639        assert_eq!(mpool.get_sequence(&sender).unwrap(), 3);
640
641        let header = mock_block(1, 1);
642        let tipset = Tipset::from(&header.clone());
643
644        mpool.api.set_heaviest_tipset(tipset.clone());
645
646        // sleep allows for async block to update mpool's cur_tipset
647        tokio::time::sleep(Duration::new(2, 0)).await;
648
649        let cur_ts = mpool.current_tipset();
650        assert_eq!(cur_ts, tipset);
651    }
652
653    #[tokio::test]
654    async fn test_msg_chains() {
655        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
656        let mut wallet = Wallet::new(keystore);
657        let a1 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
658        let a2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
659        let tma = TestApi::default();
660        let gas_limit = 6955002;
661
662        let a = mock_block(1, 1);
663        let ts = Tipset::from(a);
664        let chain_config = ChainConfig::default();
665
666        // --- Test Chain Aggregations ---
667        // Test 1: 10 messages from a1 to a2, with increasing gasPerf; it should
668        // 	       make a single chain with 10 messages given enough balance
669        let mut mset = HashMap::new();
670        let mut smsg_vec = Vec::new();
671        for i in 0..10 {
672            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
673            smsg_vec.push(msg.clone());
674            mset.insert(i, msg);
675        }
676
677        let mut chains = Chains::new();
678        create_message_chains(
679            &tma,
680            &a1,
681            &mset,
682            &TokenAmount::zero(),
683            &ts,
684            &mut chains,
685            &chain_config,
686        )
687        .unwrap();
688        assert_eq!(chains.len(), 1, "expected a single chain");
689        assert_eq!(
690            chains[0].msgs.len(),
691            10,
692            "expected 10 messages in single chain, got: {}",
693            chains[0].msgs.len()
694        );
695        for (i, m) in chains[0].msgs.iter().enumerate() {
696            assert_eq!(
697                m.sequence(),
698                i as u64,
699                "expected sequence {} but got {}",
700                i,
701                m.sequence()
702            );
703        }
704
705        // Test 2: 10 messages from a1 to a2, with decreasing gasPerf; it should
706        // 	         make 10 chains with 1 message each
707        let mut mset = HashMap::new();
708        let mut smsg_vec = Vec::new();
709        for i in 0..10 {
710            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 10 - i);
711            smsg_vec.push(msg.clone());
712            mset.insert(i, msg);
713        }
714        let mut chains = Chains::new();
715        create_message_chains(
716            &tma,
717            &a1,
718            &mset,
719            &TokenAmount::zero(),
720            &ts,
721            &mut chains,
722            &chain_config,
723        )
724        .unwrap();
725        assert_eq!(chains.len(), 10, "expected 10 chains");
726
727        for i in 0..chains.len() {
728            assert_eq!(
729                chains[i].msgs.len(),
730                1,
731                "expected 1 message in chain {} but got {}",
732                i,
733                chains[i].msgs.len()
734            );
735        }
736
737        for i in 0..chains.len() {
738            let m = &chains[i].msgs[0];
739            assert_eq!(
740                m.sequence(),
741                i as u64,
742                "expected sequence {} but got {}",
743                i,
744                m.sequence()
745            );
746        }
747
748        // Test 3a: 10 messages from a1 to a2, with gasPerf increasing in groups of 3;
749        // it should          merge them in two chains, one with 9 messages and
750        // one with the last message
751        let mut mset = HashMap::new();
752        let mut smsg_vec = Vec::new();
753        for i in 0..10 {
754            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i % 3);
755            smsg_vec.push(msg.clone());
756            mset.insert(i, msg);
757        }
758        let mut chains = Chains::new();
759        create_message_chains(
760            &tma,
761            &a1,
762            &mset,
763            &TokenAmount::zero(),
764            &ts,
765            &mut chains,
766            &chain_config,
767        )
768        .unwrap();
769        assert_eq!(chains.len(), 2, "expected 2 chains");
770        assert_eq!(chains[0].msgs.len(), 9);
771        assert_eq!(chains[1].msgs.len(), 1);
772        let mut next_nonce = 0;
773        for i in 0..chains.len() {
774            for m in chains[i].msgs.iter() {
775                assert_eq!(
776                    next_nonce,
777                    m.sequence(),
778                    "expected nonce {} but got {}",
779                    next_nonce,
780                    m.sequence()
781                );
782                next_nonce += 1;
783            }
784        }
785
786        // Test 3b: 10 messages from a1 to a2, with gasPerf decreasing in groups of 3
787        // with a bias for the          earlier chains; it should make 4 chains,
788        // the first 3 with 3 messages and the last with          a single
789        // message
790        let mut mset = HashMap::new();
791        let mut smsg_vec = Vec::new();
792        for i in 0..10 {
793            let bias = (12 - i) / 3;
794            let msg = create_smsg(
795                &a2,
796                &a1,
797                wallet.borrow_mut(),
798                i,
799                gas_limit,
800                1 + i % 3 + bias,
801            );
802            smsg_vec.push(msg.clone());
803            mset.insert(i, msg);
804        }
805
806        let mut chains = Chains::new();
807        create_message_chains(
808            &tma,
809            &a1,
810            &mset,
811            &TokenAmount::zero(),
812            &ts,
813            &mut chains,
814            &chain_config,
815        )
816        .unwrap();
817
818        for i in 0..chains.len() {
819            let expected_len = if i > 2 { 1 } else { 3 };
820            assert_eq!(
821                chains[i].msgs.len(),
822                expected_len,
823                "expected {} message in chain {} but got {}",
824                expected_len,
825                i,
826                chains[i].msgs.len()
827            );
828        }
829
830        let mut next_nonce = 0;
831        for i in 0..chains.len() {
832            for m in chains[i].msgs.iter() {
833                assert_eq!(
834                    next_nonce,
835                    m.sequence(),
836                    "expected nonce {} but got {}",
837                    next_nonce,
838                    m.sequence()
839                );
840                next_nonce += 1;
841            }
842        }
843
844        // --- Test Chain Breaks ---
845        // Test 4: 10 messages with non-consecutive nonces; it should make a single
846        // chain with just         the first message
847        let mut mset = HashMap::new();
848        let mut smsg_vec = Vec::new();
849        for i in 0..10 {
850            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i * 2, gas_limit, 1 + i);
851            smsg_vec.push(msg.clone());
852            mset.insert(i, msg);
853        }
854
855        let mut chains = Chains::new();
856        create_message_chains(
857            &tma,
858            &a1,
859            &mset,
860            &TokenAmount::zero(),
861            &ts,
862            &mut chains,
863            &chain_config,
864        )
865        .unwrap();
866        assert_eq!(chains.len(), 1, "expected a single chain");
867        for (i, m) in chains[0].msgs.iter().enumerate() {
868            assert_eq!(
869                m.sequence(),
870                i as u64,
871                "expected nonce {} but got {}",
872                i,
873                m.sequence()
874            );
875        }
876
877        // Test 5: 10 messages with increasing gasLimit, except for the 6th message
878        // which has less than         the epoch gasLimit; it should create a
879        // single chain with the first 5 messages
880        let mut mset = HashMap::new();
881        let mut smsg_vec = Vec::new();
882        tma.set_state_balance_raw(&a1, TokenAmount::from_atto(1_000_000_000_000_000_000_u64));
883        for i in 0..10 {
884            let msg = if i != 5 {
885                create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i)
886            } else {
887                create_smsg(&a2, &a1, wallet.borrow_mut(), i, 1, 1 + i)
888            };
889            smsg_vec.push(msg.clone());
890            mset.insert(i, msg);
891        }
892        let mut chains = Chains::new();
893        create_message_chains(
894            &tma,
895            &a1,
896            &mset,
897            &TokenAmount::zero(),
898            &ts,
899            &mut chains,
900            &chain_config,
901        )
902        .unwrap();
903        assert_eq!(chains.len(), 1, "expected a single chain");
904        assert_eq!(chains[0].msgs.len(), 5);
905        for (i, m) in chains[0].msgs.iter().enumerate() {
906            assert_eq!(
907                m.sequence(),
908                i as u64,
909                "expected nonce {} but got {}",
910                i,
911                m.sequence()
912            );
913        }
914
915        // Test 6: one more message than what can fit in a block according to gas limit,
916        // with increasing         gasPerf; it should create a single chain with
917        // the max messages
918        let mut mset = HashMap::new();
919        let mut smsg_vec = Vec::new();
920        let max_messages = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / gas_limit;
921        let n_messages = max_messages + 1;
922        for i in 0..n_messages {
923            let msg = create_smsg(
924                &a2,
925                &a1,
926                wallet.borrow_mut(),
927                i as u64,
928                gas_limit,
929                (1 + i) as u64,
930            );
931            smsg_vec.push(msg.clone());
932            mset.insert(i as u64, msg);
933        }
934        let mut chains = Chains::new();
935        create_message_chains(
936            &tma,
937            &a1,
938            &mset,
939            &TokenAmount::zero(),
940            &ts,
941            &mut chains,
942            &chain_config,
943        )
944        .unwrap();
945        assert_eq!(chains.len(), 1, "expected a single chain");
946        assert_eq!(chains[0].msgs.len(), max_messages as usize);
947        for (i, m) in chains[0].msgs.iter().enumerate() {
948            assert_eq!(
949                m.sequence(),
950                i as u64,
951                "expected nonce {} but got {}",
952                i,
953                m.sequence()
954            );
955        }
956
957        // Test 7: insufficient balance for all messages
958        tma.set_state_balance_raw(&a1, TokenAmount::from_atto(300 * gas_limit + 1));
959        let mut mset = HashMap::new();
960        let mut smsg_vec = Vec::new();
961        for i in 0..10 {
962            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
963            smsg_vec.push(msg.clone());
964            mset.insert(i, msg);
965        }
966        let mut chains = Chains::new();
967        create_message_chains(
968            &tma,
969            &a1,
970            &mset,
971            &TokenAmount::zero(),
972            &ts,
973            &mut chains,
974            &chain_config,
975        )
976        .unwrap();
977        assert_eq!(chains.len(), 1, "expected a single chain");
978        assert_eq!(chains[0].msgs.len(), 2);
979        for (i, m) in chains[0].msgs.iter().enumerate() {
980            assert_eq!(
981                m.sequence(),
982                i as u64,
983                "expected nonce {} but got {}",
984                i,
985                m.sequence()
986            );
987        }
988    }
989}