Skip to main content

forest/message_pool/msgpool/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub(in crate::message_pool) mod metrics;
5pub(in crate::message_pool) mod msg_pool;
6pub(in crate::message_pool) mod provider;
7pub mod selection;
8#[cfg(test)]
9pub mod test_provider;
10pub(in crate::message_pool) mod utils;
11
12use std::{borrow::BorrowMut, cmp::Ordering};
13
14use crate::blocks::Tipset;
15use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
16use crate::message::{Message as MessageTrait, SignedMessage};
17use crate::networks::ChainConfig;
18use crate::shim::{address::Address, crypto::Signature};
19use crate::utils::cache::SizeTrackingLruCache;
20use crate::utils::get_size::CidWrapper;
21use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
22use cid::Cid;
23use fvm_ipld_encoding::to_vec;
24use parking_lot::RwLock as SyncRwLock;
25use tracing::error;
26use utils::{get_base_fee_lower_bound, recover_sig};
27
28use super::errors::Error;
29use crate::message_pool::{
30    msg_chain::{Chains, create_message_chains},
31    msg_pool::{MsgSet, add_helper, remove},
32    provider::Provider,
33};
34
35const REPLACE_BY_FEE_RATIO: f32 = 1.25;
36const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64;
37const RBF_DENOM: u64 = 256;
38const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100;
39const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
40const REPUB_MSG_LIMIT: usize = 30;
41const MIN_GAS: u64 = 1298450;
42
43/// Get the state of the `base_sequence` for a given address in the current
44/// Tipset
45fn get_state_sequence<T>(api: &T, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error>
46where
47    T: Provider,
48{
49    let actor = api.get_actor_after(addr, cur_ts)?;
50    let base_sequence = actor.sequence;
51
52    Ok(base_sequence)
53}
54
55#[allow(clippy::too_many_arguments)]
56async fn republish_pending_messages<T>(
57    api: &T,
58    network_sender: &flume::Sender<NetworkMessage>,
59    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
60    cur_tipset: &SyncRwLock<Tipset>,
61    republished: &SyncRwLock<HashSet<Cid>>,
62    local_addrs: &SyncRwLock<Vec<Address>>,
63    chain_config: &ChainConfig,
64) -> Result<(), Error>
65where
66    T: Provider,
67{
68    let ts = cur_tipset.read().clone();
69    let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
70
71    republished.write().clear();
72
73    // Only republish messages from local addresses, ie. transactions which were
74    // sent to this node directly.
75    for actor in local_addrs.read().iter() {
76        if let Some(mset) = pending.read().get(actor) {
77            if mset.msgs.is_empty() {
78                continue;
79            }
80            let mut pend: HashMap<u64, SignedMessage> = HashMap::with_capacity(mset.msgs.len());
81            for (nonce, m) in mset.msgs.clone().into_iter() {
82                pend.insert(nonce, m);
83            }
84            pending_map.insert(*actor, pend);
85        }
86    }
87
88    let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?;
89
90    let network_name = chain_config.network.genesis_name();
91    for m in msgs.iter() {
92        let mb = to_vec(m)?;
93        network_sender
94            .send_async(NetworkMessage::PubsubMessage {
95                topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
96                message: mb,
97            })
98            .await
99            .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
100    }
101
102    let mut republished_t = HashSet::new();
103    for m in msgs.iter() {
104        republished_t.insert(m.cid());
105    }
106    *republished.write() = republished_t;
107
108    Ok(())
109}
110
111/// Select messages from the mempool to be included in the next block that
112/// builds on a given base tipset. The messages should be eligible for inclusion
113/// based on their sequences and the overall number of them should observe block
114/// gas limits.
115fn select_messages_for_block<T>(
116    api: &T,
117    chain_config: &ChainConfig,
118    base: &Tipset,
119    pending: HashMap<Address, HashMap<u64, SignedMessage>>,
120) -> Result<Vec<SignedMessage>, Error>
121where
122    T: Provider,
123{
124    let mut msgs: Vec<SignedMessage> = vec![];
125
126    let base_fee = api.chain_compute_base_fee(base)?;
127    let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
128
129    if pending.is_empty() {
130        return Ok(msgs);
131    }
132
133    let mut chains = Chains::new();
134    for (actor, mset) in pending.iter() {
135        create_message_chains(
136            api,
137            actor,
138            mset,
139            &base_fee_lower_bound,
140            base,
141            &mut chains,
142            chain_config,
143        )?;
144    }
145
146    if chains.is_empty() {
147        return Ok(msgs);
148    }
149
150    chains.sort(false);
151
152    let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
153    let mut i = 0;
154    'l: while let Some(chain) = chains.get_mut_at(i) {
155        // we can exceed this if we have picked (some) longer chain already
156        if msgs.len() > REPUB_MSG_LIMIT {
157            break;
158        }
159
160        if gas_limit <= MIN_GAS {
161            break;
162        }
163
164        // check if chain has been invalidated
165        if !chain.valid {
166            i += 1;
167            continue;
168        }
169
170        // check if fits in block
171        if chain.gas_limit <= gas_limit {
172            // check the baseFee lower bound -- only republish messages that can be included
173            // in the chain within the next 20 blocks.
174            for m in chain.msgs.iter() {
175                if m.gas_fee_cap() < base_fee_lower_bound {
176                    let key = chains.get_key_at(i);
177                    chains.invalidate(key);
178                    continue 'l;
179                }
180                gas_limit = gas_limit.saturating_sub(m.gas_limit());
181                msgs.push(m.clone());
182            }
183
184            i += 1;
185            continue;
186        }
187
188        // we can't fit the current chain but there is gas to spare
189        // trim it and push it down
190        chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
191        let mut j = i;
192        while j < chains.len() - 1 {
193            #[allow(clippy::indexing_slicing)]
194            if chains[j].compare(&chains[j + 1]) == Ordering::Less {
195                break;
196            }
197            chains.key_vec.swap(i, i + 1);
198            j += 1;
199        }
200    }
201
202    if msgs.len() > REPUB_MSG_LIMIT {
203        msgs.truncate(REPUB_MSG_LIMIT);
204    }
205
206    Ok(msgs)
207}
208
209/// This function will revert and/or apply tipsets to the message pool. This
210/// function should be called every time that there is a head change in the
211/// message pool.
212#[allow(clippy::too_many_arguments)]
213pub async fn head_change<T>(
214    api: &T,
215    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
216    repub_trigger: flume::Sender<()>,
217    republished: &SyncRwLock<HashSet<Cid>>,
218    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
219    cur_tipset: &SyncRwLock<Tipset>,
220    revert: Vec<Tipset>,
221    apply: Vec<Tipset>,
222) -> Result<(), Error>
223where
224    T: Provider + 'static,
225{
226    let mut repub = false;
227    let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
228    for ts in revert {
229        let pts = api.load_tipset(ts.parents())?;
230        *cur_tipset.write() = pts;
231
232        let mut msgs: Vec<SignedMessage> = Vec::new();
233        for block in ts.block_headers() {
234            let (umsg, smsgs) = api.messages_for_block(block)?;
235            msgs.extend(smsgs);
236            for msg in umsg {
237                let smsg = recover_sig(bls_sig_cache, msg)?;
238                msgs.push(smsg)
239            }
240        }
241
242        for msg in msgs {
243            add_to_selected_msgs(msg, rmsgs.borrow_mut());
244        }
245    }
246
247    for ts in apply {
248        for b in ts.block_headers() {
249            let (msgs, smsgs) = api.messages_for_block(b)?;
250
251            for msg in smsgs {
252                remove_from_selected_msgs(
253                    &msg.from(),
254                    pending,
255                    msg.sequence(),
256                    rmsgs.borrow_mut(),
257                )?;
258                if !repub && republished.write().insert(msg.cid()) {
259                    repub = true;
260                }
261            }
262            for msg in msgs {
263                remove_from_selected_msgs(&msg.from, pending, msg.sequence, rmsgs.borrow_mut())?;
264                if !repub && republished.write().insert(msg.cid()) {
265                    repub = true;
266                }
267            }
268        }
269        *cur_tipset.write() = ts;
270    }
271    if repub {
272        repub_trigger
273            .send_async(())
274            .await
275            .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
276    }
277    for (_, hm) in rmsgs {
278        for (_, msg) in hm {
279            let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.read().clone())?;
280            if let Err(e) = add_helper(api, bls_sig_cache, pending, msg, sequence) {
281                error!("Failed to read message from reorg to mpool: {}", e);
282            }
283        }
284    }
285    Ok(())
286}
287
288/// This is a helper function for `head_change`. This method will remove a
289/// sequence for a from address from the messages selected by priority hash-map.
290/// It also removes the 'from' address and sequence from the `MessagePool`.
291pub(in crate::message_pool) fn remove_from_selected_msgs(
292    from: &Address,
293    pending: &SyncRwLock<HashMap<Address, MsgSet>>,
294    sequence: u64,
295    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
296) -> Result<(), Error> {
297    if let Some(temp) = rmsgs.get_mut(from) {
298        if temp.get_mut(&sequence).is_some() {
299            temp.remove(&sequence);
300        } else {
301            remove(from, pending, sequence, true)?;
302        }
303    } else {
304        remove(from, pending, sequence, true)?;
305    }
306    Ok(())
307}
308
309/// This is a helper function for `head_change`. This method will add a signed
310/// message to the given messages selected by priority `HashMap`.
311pub(in crate::message_pool) fn add_to_selected_msgs(
312    m: SignedMessage,
313    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
314) {
315    let s = rmsgs.get_mut(&m.from());
316    if let Some(s) = s {
317        s.insert(m.sequence(), m);
318    } else {
319        rmsgs.insert(m.from(), HashMap::new());
320        rmsgs.get_mut(&m.from()).unwrap().insert(m.sequence(), m);
321    }
322}
323
324#[cfg(test)]
325pub mod tests {
326    use std::{borrow::BorrowMut, time::Duration};
327
328    use crate::blocks::Tipset;
329    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
330    use crate::message::SignedMessage;
331    use crate::networks::ChainConfig;
332    use crate::shim::{
333        address::Address,
334        crypto::SignatureType,
335        econ::TokenAmount,
336        message::{Message, Message_v3},
337    };
338    use num_traits::Zero;
339    use test_provider::*;
340    use tokio::task::JoinSet;
341
342    use super::*;
343    use crate::message_pool::{
344        msg_chain::{Chains, create_message_chains},
345        msg_pool::MessagePool,
346    };
347
348    #[tokio::test]
349    async fn test_per_actor_limit() {
350        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
351        let mut wallet = Wallet::new(keystore);
352        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
353        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
354        let tma = TestApi::with_max_actor_pending_messages(200);
355        tma.set_state_sequence(&sender, 0);
356
357        let (tx, _rx) = flume::bounded(50);
358        let mut services = JoinSet::new();
359        let mpool = MessagePool::new(
360            tma,
361            tx,
362            Default::default(),
363            Default::default(),
364            &mut services,
365        )
366        .unwrap();
367        let mut smsg_vec = Vec::new();
368        for i in 0..(mpool.api.max_actor_pending_messages() + 1) {
369            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
370            smsg_vec.push(msg);
371        }
372
373        let (last, body) = smsg_vec.split_last().unwrap();
374        for smsg in body {
375            mpool.add(smsg.clone()).unwrap();
376        }
377        assert_eq!(
378            mpool.add(last.clone()),
379            Err(Error::TooManyPendingMessages(sender.to_string(), true))
380        );
381    }
382
383    pub fn create_smsg(
384        to: &Address,
385        from: &Address,
386        wallet: &mut Wallet,
387        sequence: u64,
388        gas_limit: i64,
389        gas_price: u64,
390    ) -> SignedMessage {
391        let umsg: Message = Message_v3 {
392            to: to.into(),
393            from: from.into(),
394            sequence,
395            gas_limit: gas_limit as u64,
396            gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
397            gas_premium: TokenAmount::from_atto(gas_price).into(),
398            ..Message_v3::default()
399        }
400        .into();
401        let msg_signing_bytes = umsg.cid().to_bytes();
402        let sig = wallet.sign(from, msg_signing_bytes.as_slice()).unwrap();
403        SignedMessage::new_unchecked(umsg, sig)
404    }
405
406    // Create a fake signed message with a dummy signature. While the signature is
407    // not valid, it has been added to the validation cache and the message will
408    // appear authentic.
409    pub fn create_fake_smsg(
410        pool: &MessagePool<TestApi>,
411        to: &Address,
412        from: &Address,
413        sequence: u64,
414        gas_limit: i64,
415        gas_price: u64,
416    ) -> SignedMessage {
417        let umsg: Message = Message_v3 {
418            to: to.into(),
419            from: from.into(),
420            sequence,
421            gas_limit: gas_limit as u64,
422            gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
423            gas_premium: TokenAmount::from_atto(gas_price).into(),
424            ..Message_v3::default()
425        }
426        .into();
427        let sig = Signature::new_secp256k1(vec![]);
428        let signed = SignedMessage::new_unchecked(umsg, sig);
429        let cid = signed.cid();
430        pool.sig_val_cache.push(cid.into(), ());
431        signed
432    }
433
434    #[tokio::test]
435    async fn test_message_pool() {
436        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
437        let mut wallet = Wallet::new(keystore);
438        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
439        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
440        let tma = TestApi::default();
441        tma.set_state_sequence(&sender, 0);
442
443        let (tx, _rx) = flume::bounded(50);
444        let mut services = JoinSet::new();
445        let mpool = MessagePool::new(
446            tma,
447            tx,
448            Default::default(),
449            Default::default(),
450            &mut services,
451        )
452        .unwrap();
453        let mut smsg_vec = Vec::new();
454        for i in 0..2 {
455            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
456            smsg_vec.push(msg);
457        }
458
459        mpool.api.inner.lock().set_state_sequence(&sender, 0);
460        assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
461        mpool.add(smsg_vec[0].clone()).unwrap();
462        assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
463        mpool.add(smsg_vec[1].clone()).unwrap();
464        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
465
466        let a = mock_block(1, 1);
467
468        mpool.api.inner.lock().set_block_messages(&a, smsg_vec);
469        let api = mpool.api.clone();
470        let bls_sig_cache = mpool.bls_sig_cache.clone();
471        let pending = mpool.pending.clone();
472        let cur_tipset = mpool.cur_tipset.clone();
473        let repub_trigger = mpool.repub_trigger.clone();
474        let republished = mpool.republished.clone();
475        head_change(
476            api.as_ref(),
477            bls_sig_cache.as_ref(),
478            repub_trigger,
479            republished.as_ref(),
480            pending.as_ref(),
481            cur_tipset.as_ref(),
482            Vec::new(),
483            vec![Tipset::from(a)],
484        )
485        .await
486        .unwrap();
487
488        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
489    }
490
491    #[tokio::test]
492    async fn test_revert_messages() {
493        let tma = TestApi::default();
494        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
495        let mut wallet = Wallet::new(keystore);
496
497        let a = mock_block(1, 1);
498        let tipset = Tipset::from(&a);
499        let b = mock_block_with_parents(&tipset, 1, 1);
500
501        let sender = wallet.generate_addr(SignatureType::Bls).unwrap();
502        let target = Address::new_id(1001);
503
504        let mut smsg_vec = Vec::new();
505
506        for i in 0..4 {
507            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
508            smsg_vec.push(msg);
509        }
510        let (tx, _rx) = flume::bounded(50);
511        let mut services = JoinSet::new();
512        let mpool = MessagePool::new(
513            tma,
514            tx,
515            Default::default(),
516            Default::default(),
517            &mut services,
518        )
519        .unwrap();
520
521        {
522            let mut api_temp = mpool.api.inner.lock();
523            api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]);
524            api_temp.set_block_messages(&b.clone(), smsg_vec[1..4].to_vec());
525            api_temp.set_state_sequence(&sender, 0);
526            drop(api_temp);
527        }
528
529        mpool.add(smsg_vec[0].clone()).unwrap();
530        mpool.add(smsg_vec[1].clone()).unwrap();
531        mpool.add(smsg_vec[2].clone()).unwrap();
532        mpool.add(smsg_vec[3].clone()).unwrap();
533
534        mpool.api.set_state_sequence(&sender, 0);
535
536        let api = mpool.api.clone();
537        let bls_sig_cache = mpool.bls_sig_cache.clone();
538        let pending = mpool.pending.clone();
539        let cur_tipset = mpool.cur_tipset.clone();
540        let repub_trigger = mpool.repub_trigger.clone();
541        let republished = mpool.republished.clone();
542        head_change(
543            api.as_ref(),
544            bls_sig_cache.as_ref(),
545            repub_trigger.clone(),
546            republished.as_ref(),
547            pending.as_ref(),
548            cur_tipset.as_ref(),
549            Vec::new(),
550            vec![Tipset::from(a)],
551        )
552        .await
553        .unwrap();
554
555        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
556
557        mpool.api.set_state_sequence(&sender, 1);
558
559        let api = mpool.api.clone();
560        let bls_sig_cache = mpool.bls_sig_cache.clone();
561        let pending = mpool.pending.clone();
562        let cur_tipset = mpool.cur_tipset.clone();
563
564        head_change(
565            api.as_ref(),
566            bls_sig_cache.as_ref(),
567            repub_trigger.clone(),
568            republished.as_ref(),
569            pending.as_ref(),
570            cur_tipset.as_ref(),
571            Vec::new(),
572            vec![Tipset::from(&b)],
573        )
574        .await
575        .unwrap();
576
577        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
578
579        mpool.api.set_state_sequence(&sender, 0);
580
581        head_change(
582            api.as_ref(),
583            bls_sig_cache.as_ref(),
584            repub_trigger.clone(),
585            republished.as_ref(),
586            pending.as_ref(),
587            cur_tipset.as_ref(),
588            vec![Tipset::from(b)],
589            Vec::new(),
590        )
591        .await
592        .unwrap();
593
594        assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
595
596        let (p, _) = mpool.pending().unwrap();
597        assert_eq!(p.len(), 3);
598    }
599
600    #[tokio::test]
601    async fn test_async_message_pool() {
602        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
603        let mut wallet = Wallet::new(keystore);
604        let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
605        let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
606
607        let tma = TestApi::default();
608        tma.set_state_sequence(&sender, 0);
609        let (tx, _rx) = flume::bounded(50);
610        let mut services = JoinSet::new();
611        let mpool = MessagePool::new(
612            tma,
613            tx,
614            Default::default(),
615            Default::default(),
616            &mut services,
617        )
618        .unwrap();
619
620        let mut smsg_vec = Vec::new();
621        for i in 0..3 {
622            let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
623            smsg_vec.push(msg);
624        }
625
626        assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
627        mpool.push(smsg_vec[0].clone()).await.unwrap();
628        assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
629        mpool.push(smsg_vec[1].clone()).await.unwrap();
630        assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
631        mpool.push(smsg_vec[2].clone()).await.unwrap();
632        assert_eq!(mpool.get_sequence(&sender).unwrap(), 3);
633
634        let header = mock_block(1, 1);
635        let tipset = Tipset::from(&header.clone());
636
637        mpool.api.set_heaviest_tipset(tipset.clone());
638
639        // sleep allows for async block to update mpool's cur_tipset
640        tokio::time::sleep(Duration::new(2, 0)).await;
641
642        let cur_ts = mpool.current_tipset();
643        assert_eq!(cur_ts, tipset);
644    }
645
646    #[tokio::test]
647    async fn test_msg_chains() {
648        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
649        let mut wallet = Wallet::new(keystore);
650        let a1 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
651        let a2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
652        let tma = TestApi::default();
653        let gas_limit = 6955002;
654
655        let a = mock_block(1, 1);
656        let ts = Tipset::from(a);
657        let chain_config = ChainConfig::default();
658
659        // --- Test Chain Aggregations ---
660        // Test 1: 10 messages from a1 to a2, with increasing gasPerf; it should
661        // 	       make a single chain with 10 messages given enough balance
662        let mut mset = HashMap::new();
663        let mut smsg_vec = Vec::new();
664        for i in 0..10 {
665            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
666            smsg_vec.push(msg.clone());
667            mset.insert(i, msg);
668        }
669
670        let mut chains = Chains::new();
671        create_message_chains(
672            &tma,
673            &a1,
674            &mset,
675            &TokenAmount::zero(),
676            &ts,
677            &mut chains,
678            &chain_config,
679        )
680        .unwrap();
681        assert_eq!(chains.len(), 1, "expected a single chain");
682        assert_eq!(
683            chains[0].msgs.len(),
684            10,
685            "expected 10 messages in single chain, got: {}",
686            chains[0].msgs.len()
687        );
688        for (i, m) in chains[0].msgs.iter().enumerate() {
689            assert_eq!(
690                m.sequence(),
691                i as u64,
692                "expected sequence {} but got {}",
693                i,
694                m.sequence()
695            );
696        }
697
698        // Test 2: 10 messages from a1 to a2, with decreasing gasPerf; it should
699        // 	         make 10 chains with 1 message each
700        let mut mset = HashMap::new();
701        let mut smsg_vec = Vec::new();
702        for i in 0..10 {
703            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 10 - i);
704            smsg_vec.push(msg.clone());
705            mset.insert(i, msg);
706        }
707        let mut chains = Chains::new();
708        create_message_chains(
709            &tma,
710            &a1,
711            &mset,
712            &TokenAmount::zero(),
713            &ts,
714            &mut chains,
715            &chain_config,
716        )
717        .unwrap();
718        assert_eq!(chains.len(), 10, "expected 10 chains");
719
720        for i in 0..chains.len() {
721            assert_eq!(
722                chains[i].msgs.len(),
723                1,
724                "expected 1 message in chain {} but got {}",
725                i,
726                chains[i].msgs.len()
727            );
728        }
729
730        for i in 0..chains.len() {
731            let m = &chains[i].msgs[0];
732            assert_eq!(
733                m.sequence(),
734                i as u64,
735                "expected sequence {} but got {}",
736                i,
737                m.sequence()
738            );
739        }
740
741        // Test 3a: 10 messages from a1 to a2, with gasPerf increasing in groups of 3;
742        // it should          merge them in two chains, one with 9 messages and
743        // one with the last message
744        let mut mset = HashMap::new();
745        let mut smsg_vec = Vec::new();
746        for i in 0..10 {
747            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i % 3);
748            smsg_vec.push(msg.clone());
749            mset.insert(i, msg);
750        }
751        let mut chains = Chains::new();
752        create_message_chains(
753            &tma,
754            &a1,
755            &mset,
756            &TokenAmount::zero(),
757            &ts,
758            &mut chains,
759            &chain_config,
760        )
761        .unwrap();
762        assert_eq!(chains.len(), 2, "expected 2 chains");
763        assert_eq!(chains[0].msgs.len(), 9);
764        assert_eq!(chains[1].msgs.len(), 1);
765        let mut next_nonce = 0;
766        for i in 0..chains.len() {
767            for m in chains[i].msgs.iter() {
768                assert_eq!(
769                    next_nonce,
770                    m.sequence(),
771                    "expected nonce {} but got {}",
772                    next_nonce,
773                    m.sequence()
774                );
775                next_nonce += 1;
776            }
777        }
778
779        // Test 3b: 10 messages from a1 to a2, with gasPerf decreasing in groups of 3
780        // with a bias for the          earlier chains; it should make 4 chains,
781        // the first 3 with 3 messages and the last with          a single
782        // message
783        let mut mset = HashMap::new();
784        let mut smsg_vec = Vec::new();
785        for i in 0..10 {
786            let bias = (12 - i) / 3;
787            let msg = create_smsg(
788                &a2,
789                &a1,
790                wallet.borrow_mut(),
791                i,
792                gas_limit,
793                1 + i % 3 + bias,
794            );
795            smsg_vec.push(msg.clone());
796            mset.insert(i, msg);
797        }
798
799        let mut chains = Chains::new();
800        create_message_chains(
801            &tma,
802            &a1,
803            &mset,
804            &TokenAmount::zero(),
805            &ts,
806            &mut chains,
807            &chain_config,
808        )
809        .unwrap();
810
811        for i in 0..chains.len() {
812            let expected_len = if i > 2 { 1 } else { 3 };
813            assert_eq!(
814                chains[i].msgs.len(),
815                expected_len,
816                "expected {} message in chain {} but got {}",
817                expected_len,
818                i,
819                chains[i].msgs.len()
820            );
821        }
822
823        let mut next_nonce = 0;
824        for i in 0..chains.len() {
825            for m in chains[i].msgs.iter() {
826                assert_eq!(
827                    next_nonce,
828                    m.sequence(),
829                    "expected nonce {} but got {}",
830                    next_nonce,
831                    m.sequence()
832                );
833                next_nonce += 1;
834            }
835        }
836
837        // --- Test Chain Breaks ---
838        // Test 4: 10 messages with non-consecutive nonces; it should make a single
839        // chain with just         the first message
840        let mut mset = HashMap::new();
841        let mut smsg_vec = Vec::new();
842        for i in 0..10 {
843            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i * 2, gas_limit, 1 + i);
844            smsg_vec.push(msg.clone());
845            mset.insert(i, msg);
846        }
847
848        let mut chains = Chains::new();
849        create_message_chains(
850            &tma,
851            &a1,
852            &mset,
853            &TokenAmount::zero(),
854            &ts,
855            &mut chains,
856            &chain_config,
857        )
858        .unwrap();
859        assert_eq!(chains.len(), 1, "expected a single chain");
860        for (i, m) in chains[0].msgs.iter().enumerate() {
861            assert_eq!(
862                m.sequence(),
863                i as u64,
864                "expected nonce {} but got {}",
865                i,
866                m.sequence()
867            );
868        }
869
870        // Test 5: 10 messages with increasing gasLimit, except for the 6th message
871        // which has less than         the epoch gasLimit; it should create a
872        // single chain with the first 5 messages
873        let mut mset = HashMap::new();
874        let mut smsg_vec = Vec::new();
875        tma.set_state_balance_raw(&a1, TokenAmount::from_atto(1_000_000_000_000_000_000_u64));
876        for i in 0..10 {
877            let msg = if i != 5 {
878                create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i)
879            } else {
880                create_smsg(&a2, &a1, wallet.borrow_mut(), i, 1, 1 + i)
881            };
882            smsg_vec.push(msg.clone());
883            mset.insert(i, msg);
884        }
885        let mut chains = Chains::new();
886        create_message_chains(
887            &tma,
888            &a1,
889            &mset,
890            &TokenAmount::zero(),
891            &ts,
892            &mut chains,
893            &chain_config,
894        )
895        .unwrap();
896        assert_eq!(chains.len(), 1, "expected a single chain");
897        assert_eq!(chains[0].msgs.len(), 5);
898        for (i, m) in chains[0].msgs.iter().enumerate() {
899            assert_eq!(
900                m.sequence(),
901                i as u64,
902                "expected nonce {} but got {}",
903                i,
904                m.sequence()
905            );
906        }
907
908        // Test 6: one more message than what can fit in a block according to gas limit,
909        // with increasing         gasPerf; it should create a single chain with
910        // the max messages
911        let mut mset = HashMap::new();
912        let mut smsg_vec = Vec::new();
913        let max_messages = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / gas_limit;
914        let n_messages = max_messages + 1;
915        for i in 0..n_messages {
916            let msg = create_smsg(
917                &a2,
918                &a1,
919                wallet.borrow_mut(),
920                i as u64,
921                gas_limit,
922                (1 + i) as u64,
923            );
924            smsg_vec.push(msg.clone());
925            mset.insert(i as u64, msg);
926        }
927        let mut chains = Chains::new();
928        create_message_chains(
929            &tma,
930            &a1,
931            &mset,
932            &TokenAmount::zero(),
933            &ts,
934            &mut chains,
935            &chain_config,
936        )
937        .unwrap();
938        assert_eq!(chains.len(), 1, "expected a single chain");
939        assert_eq!(chains[0].msgs.len(), max_messages as usize);
940        for (i, m) in chains[0].msgs.iter().enumerate() {
941            assert_eq!(
942                m.sequence(),
943                i as u64,
944                "expected nonce {} but got {}",
945                i,
946                m.sequence()
947            );
948        }
949
950        // Test 7: insufficient balance for all messages
951        tma.set_state_balance_raw(&a1, TokenAmount::from_atto(300 * gas_limit + 1));
952        let mut mset = HashMap::new();
953        let mut smsg_vec = Vec::new();
954        for i in 0..10 {
955            let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
956            smsg_vec.push(msg.clone());
957            mset.insert(i, msg);
958        }
959        let mut chains = Chains::new();
960        create_message_chains(
961            &tma,
962            &a1,
963            &mset,
964            &TokenAmount::zero(),
965            &ts,
966            &mut chains,
967            &chain_config,
968        )
969        .unwrap();
970        assert_eq!(chains.len(), 1, "expected a single chain");
971        assert_eq!(chains[0].msgs.len(), 2);
972        for (i, m) in chains[0].msgs.iter().enumerate() {
973            assert_eq!(
974                m.sequence(),
975                i as u64,
976                "expected nonce {} but got {}",
977                i,
978                m.sequence()
979            );
980        }
981    }
982}