1pub(in crate::message_pool) mod metrics;
5pub(in crate::message_pool) mod msg_pool;
6pub(in crate::message_pool) mod provider;
7pub mod selection;
8#[cfg(test)]
9pub mod test_provider;
10pub(in crate::message_pool) mod utils;
11
12use std::{borrow::BorrowMut, cmp::Ordering};
13
14use crate::blocks::Tipset;
15use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
16use crate::message::{Message as MessageTrait, SignedMessage};
17use crate::networks::ChainConfig;
18use crate::shim::{address::Address, crypto::Signature};
19use crate::utils::cache::SizeTrackingLruCache;
20use crate::utils::get_size::CidWrapper;
21use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
22use cid::Cid;
23use fvm_ipld_encoding::to_vec;
24use parking_lot::RwLock as SyncRwLock;
25use tracing::error;
26use utils::{get_base_fee_lower_bound, recover_sig};
27
28use super::errors::Error;
29use crate::message_pool::{
30 msg_chain::{Chains, create_message_chains},
31 msg_pool::{MsgSet, TrustPolicy, add_helper, remove},
32 provider::Provider,
33};
34
35const REPLACE_BY_FEE_RATIO: f32 = 1.25;
36const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64;
37const RBF_DENOM: u64 = 256;
38const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100;
39const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
40const REPUB_MSG_LIMIT: usize = 30;
41const MIN_GAS: u64 = 1298450;
42
43fn get_state_sequence<T>(api: &T, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error>
46where
47 T: Provider,
48{
49 let actor = api.get_actor_after(addr, cur_ts)?;
50 let base_sequence = actor.sequence;
51
52 Ok(base_sequence)
53}
54
55#[allow(clippy::too_many_arguments)]
56async fn republish_pending_messages<T>(
57 api: &T,
58 network_sender: &flume::Sender<NetworkMessage>,
59 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
60 cur_tipset: &SyncRwLock<Tipset>,
61 republished: &SyncRwLock<HashSet<Cid>>,
62 local_addrs: &SyncRwLock<Vec<Address>>,
63 chain_config: &ChainConfig,
64) -> Result<(), Error>
65where
66 T: Provider,
67{
68 let ts = cur_tipset.read().clone();
69 let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
70
71 republished.write().clear();
72
73 for actor in local_addrs.read().iter() {
76 if let Some(mset) = pending.read().get(actor) {
77 if mset.msgs.is_empty() {
78 continue;
79 }
80 let mut pend: HashMap<u64, SignedMessage> = HashMap::with_capacity(mset.msgs.len());
81 for (nonce, m) in mset.msgs.clone().into_iter() {
82 pend.insert(nonce, m);
83 }
84 pending_map.insert(*actor, pend);
85 }
86 }
87
88 let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?;
89
90 let network_name = chain_config.network.genesis_name();
91 for m in msgs.iter() {
92 let mb = to_vec(m)?;
93 network_sender
94 .send_async(NetworkMessage::PubsubMessage {
95 topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
96 message: mb,
97 })
98 .await
99 .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
100 }
101
102 let mut republished_t = HashSet::new();
103 for m in msgs.iter() {
104 republished_t.insert(m.cid());
105 }
106 *republished.write() = republished_t;
107
108 Ok(())
109}
110
111fn select_messages_for_block<T>(
116 api: &T,
117 chain_config: &ChainConfig,
118 base: &Tipset,
119 pending: HashMap<Address, HashMap<u64, SignedMessage>>,
120) -> Result<Vec<SignedMessage>, Error>
121where
122 T: Provider,
123{
124 let mut msgs: Vec<SignedMessage> = vec![];
125
126 let base_fee = api.chain_compute_base_fee(base)?;
127 let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
128
129 if pending.is_empty() {
130 return Ok(msgs);
131 }
132
133 let mut chains = Chains::new();
134 for (actor, mset) in pending.iter() {
135 create_message_chains(
136 api,
137 actor,
138 mset,
139 &base_fee_lower_bound,
140 base,
141 &mut chains,
142 chain_config,
143 )?;
144 }
145
146 if chains.is_empty() {
147 return Ok(msgs);
148 }
149
150 chains.sort(false);
151
152 let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
153 let mut i = 0;
154 'l: while let Some(chain) = chains.get_mut_at(i) {
155 if msgs.len() > REPUB_MSG_LIMIT {
157 break;
158 }
159
160 if gas_limit <= MIN_GAS {
161 break;
162 }
163
164 if !chain.valid {
166 i += 1;
167 continue;
168 }
169
170 if chain.gas_limit <= gas_limit {
172 for m in chain.msgs.iter() {
175 if m.gas_fee_cap() < base_fee_lower_bound {
176 let key = chains.get_key_at(i);
177 chains.invalidate(key);
178 continue 'l;
179 }
180 gas_limit = gas_limit.saturating_sub(m.gas_limit());
181 msgs.push(m.clone());
182 }
183
184 i += 1;
185 continue;
186 }
187
188 chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
191 let mut j = i;
192 while j < chains.len() - 1 {
193 #[allow(clippy::indexing_slicing)]
194 if chains[j].compare(&chains[j + 1]) == Ordering::Less {
195 break;
196 }
197 chains.key_vec.swap(i, i + 1);
198 j += 1;
199 }
200 }
201
202 if msgs.len() > REPUB_MSG_LIMIT {
203 msgs.truncate(REPUB_MSG_LIMIT);
204 }
205
206 Ok(msgs)
207}
208
209#[allow(clippy::too_many_arguments)]
213pub async fn head_change<T>(
214 api: &T,
215 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
216 repub_trigger: flume::Sender<()>,
217 republished: &SyncRwLock<HashSet<Cid>>,
218 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
219 cur_tipset: &SyncRwLock<Tipset>,
220 revert: Vec<Tipset>,
221 apply: Vec<Tipset>,
222) -> Result<(), Error>
223where
224 T: Provider + 'static,
225{
226 let mut repub = false;
227 let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
228 for ts in revert {
229 let pts = api.load_tipset(ts.parents())?;
230 *cur_tipset.write() = pts;
231
232 let mut msgs: Vec<SignedMessage> = Vec::new();
233 for block in ts.block_headers() {
234 let (umsg, smsgs) = api.messages_for_block(block)?;
235 msgs.extend(smsgs);
236 for msg in umsg {
237 let smsg = recover_sig(bls_sig_cache, msg)?;
238 msgs.push(smsg)
239 }
240 }
241
242 for msg in msgs {
243 add_to_selected_msgs(msg, rmsgs.borrow_mut());
244 }
245 }
246
247 for ts in apply {
248 for b in ts.block_headers() {
249 let (msgs, smsgs) = api.messages_for_block(b)?;
250
251 for msg in smsgs {
252 remove_from_selected_msgs(
253 &msg.from(),
254 pending,
255 msg.sequence(),
256 rmsgs.borrow_mut(),
257 )?;
258 if !repub && republished.write().insert(msg.cid()) {
259 repub = true;
260 }
261 }
262 for msg in msgs {
263 remove_from_selected_msgs(&msg.from, pending, msg.sequence, rmsgs.borrow_mut())?;
264 if !repub && republished.write().insert(msg.cid()) {
265 repub = true;
266 }
267 }
268 }
269 *cur_tipset.write() = ts;
270 }
271 if repub {
272 repub_trigger
273 .send_async(())
274 .await
275 .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
276 }
277 for (_, hm) in rmsgs {
278 for (_, msg) in hm {
279 let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.read().clone())?;
280 if let Err(e) = add_helper(
281 api,
282 bls_sig_cache,
283 pending,
284 msg,
285 sequence,
286 TrustPolicy::Trusted,
287 ) {
288 error!("Failed to read message from reorg to mpool: {}", e);
289 }
290 }
291 }
292 Ok(())
293}
294
295pub(in crate::message_pool) fn remove_from_selected_msgs(
299 from: &Address,
300 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
301 sequence: u64,
302 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
303) -> Result<(), Error> {
304 if let Some(temp) = rmsgs.get_mut(from) {
305 if temp.get_mut(&sequence).is_some() {
306 temp.remove(&sequence);
307 } else {
308 remove(from, pending, sequence, true)?;
309 }
310 } else {
311 remove(from, pending, sequence, true)?;
312 }
313 Ok(())
314}
315
316pub(in crate::message_pool) fn add_to_selected_msgs(
319 m: SignedMessage,
320 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
321) {
322 let s = rmsgs.get_mut(&m.from());
323 if let Some(s) = s {
324 s.insert(m.sequence(), m);
325 } else {
326 rmsgs.insert(m.from(), HashMap::new());
327 rmsgs.get_mut(&m.from()).unwrap().insert(m.sequence(), m);
328 }
329}
330
331#[cfg(test)]
332pub mod tests {
333 use std::{borrow::BorrowMut, time::Duration};
334
335 use crate::blocks::Tipset;
336 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
337 use crate::message::SignedMessage;
338 use crate::networks::ChainConfig;
339 use crate::shim::{
340 address::Address,
341 crypto::SignatureType,
342 econ::TokenAmount,
343 message::{Message, Message_v3},
344 };
345 use num_traits::Zero;
346 use test_provider::*;
347 use tokio::task::JoinSet;
348
349 use super::*;
350 use crate::message_pool::{
351 msg_chain::{Chains, create_message_chains},
352 msg_pool::MessagePool,
353 };
354
355 #[tokio::test]
356 async fn test_per_actor_limit() {
357 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
358 let mut wallet = Wallet::new(keystore);
359 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
360 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
361 let tma = TestApi::with_max_actor_pending_messages(200);
362 tma.set_state_sequence(&sender, 0);
363
364 let (tx, _rx) = flume::bounded(50);
365 let mut services = JoinSet::new();
366 let mpool = MessagePool::new(
367 tma,
368 tx,
369 Default::default(),
370 Default::default(),
371 &mut services,
372 )
373 .unwrap();
374 let mut smsg_vec = Vec::new();
375 for i in 0..(mpool.api.max_actor_pending_messages() + 1) {
376 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
377 smsg_vec.push(msg);
378 }
379
380 let (last, body) = smsg_vec.split_last().unwrap();
381 for smsg in body {
382 mpool.add(smsg.clone()).unwrap();
383 }
384 assert_eq!(
385 mpool.add(last.clone()),
386 Err(Error::TooManyPendingMessages(sender.to_string(), true))
387 );
388 }
389
390 pub fn create_smsg(
391 to: &Address,
392 from: &Address,
393 wallet: &mut Wallet,
394 sequence: u64,
395 gas_limit: i64,
396 gas_price: u64,
397 ) -> SignedMessage {
398 let umsg: Message = Message_v3 {
399 to: to.into(),
400 from: from.into(),
401 sequence,
402 gas_limit: gas_limit as u64,
403 gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
404 gas_premium: TokenAmount::from_atto(gas_price).into(),
405 ..Message_v3::default()
406 }
407 .into();
408 let msg_signing_bytes = umsg.cid().to_bytes();
409 let sig = wallet.sign(from, msg_signing_bytes.as_slice()).unwrap();
410 SignedMessage::new_unchecked(umsg, sig)
411 }
412
413 pub fn create_fake_smsg(
417 pool: &MessagePool<TestApi>,
418 to: &Address,
419 from: &Address,
420 sequence: u64,
421 gas_limit: i64,
422 gas_price: u64,
423 ) -> SignedMessage {
424 let umsg: Message = Message_v3 {
425 to: to.into(),
426 from: from.into(),
427 sequence,
428 gas_limit: gas_limit as u64,
429 gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
430 gas_premium: TokenAmount::from_atto(gas_price).into(),
431 ..Message_v3::default()
432 }
433 .into();
434 let sig = Signature::new_secp256k1(vec![]);
435 let signed = SignedMessage::new_unchecked(umsg, sig);
436 let cid = signed.cid();
437 pool.sig_val_cache.push(cid.into(), ());
438 signed
439 }
440
441 #[tokio::test]
442 async fn test_message_pool() {
443 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
444 let mut wallet = Wallet::new(keystore);
445 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
446 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
447 let tma = TestApi::default();
448 tma.set_state_sequence(&sender, 0);
449
450 let (tx, _rx) = flume::bounded(50);
451 let mut services = JoinSet::new();
452 let mpool = MessagePool::new(
453 tma,
454 tx,
455 Default::default(),
456 Default::default(),
457 &mut services,
458 )
459 .unwrap();
460 let mut smsg_vec = Vec::new();
461 for i in 0..2 {
462 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
463 smsg_vec.push(msg);
464 }
465
466 mpool.api.inner.lock().set_state_sequence(&sender, 0);
467 assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
468 mpool.add(smsg_vec[0].clone()).unwrap();
469 assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
470 mpool.add(smsg_vec[1].clone()).unwrap();
471 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
472
473 let a = mock_block(1, 1);
474
475 mpool.api.inner.lock().set_block_messages(&a, smsg_vec);
476 let api = mpool.api.clone();
477 let bls_sig_cache = mpool.bls_sig_cache.clone();
478 let pending = mpool.pending.clone();
479 let cur_tipset = mpool.cur_tipset.clone();
480 let repub_trigger = mpool.repub_trigger.clone();
481 let republished = mpool.republished.clone();
482 head_change(
483 api.as_ref(),
484 bls_sig_cache.as_ref(),
485 repub_trigger,
486 republished.as_ref(),
487 pending.as_ref(),
488 cur_tipset.as_ref(),
489 Vec::new(),
490 vec![Tipset::from(a)],
491 )
492 .await
493 .unwrap();
494
495 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
496 }
497
498 #[tokio::test]
499 async fn test_revert_messages() {
500 let tma = TestApi::default();
501 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
502 let mut wallet = Wallet::new(keystore);
503
504 let a = mock_block(1, 1);
505 let tipset = Tipset::from(&a);
506 let b = mock_block_with_parents(&tipset, 1, 1);
507
508 let sender = wallet.generate_addr(SignatureType::Bls).unwrap();
509 let target = Address::new_id(1001);
510
511 let mut smsg_vec = Vec::new();
512
513 for i in 0..4 {
514 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
515 smsg_vec.push(msg);
516 }
517 let (tx, _rx) = flume::bounded(50);
518 let mut services = JoinSet::new();
519 let mpool = MessagePool::new(
520 tma,
521 tx,
522 Default::default(),
523 Default::default(),
524 &mut services,
525 )
526 .unwrap();
527
528 {
529 let mut api_temp = mpool.api.inner.lock();
530 api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]);
531 api_temp.set_block_messages(&b.clone(), smsg_vec[1..4].to_vec());
532 api_temp.set_state_sequence(&sender, 0);
533 drop(api_temp);
534 }
535
536 mpool.add(smsg_vec[0].clone()).unwrap();
537 mpool.add(smsg_vec[1].clone()).unwrap();
538 mpool.add(smsg_vec[2].clone()).unwrap();
539 mpool.add(smsg_vec[3].clone()).unwrap();
540
541 mpool.api.set_state_sequence(&sender, 0);
542
543 let api = mpool.api.clone();
544 let bls_sig_cache = mpool.bls_sig_cache.clone();
545 let pending = mpool.pending.clone();
546 let cur_tipset = mpool.cur_tipset.clone();
547 let repub_trigger = mpool.repub_trigger.clone();
548 let republished = mpool.republished.clone();
549 head_change(
550 api.as_ref(),
551 bls_sig_cache.as_ref(),
552 repub_trigger.clone(),
553 republished.as_ref(),
554 pending.as_ref(),
555 cur_tipset.as_ref(),
556 Vec::new(),
557 vec![Tipset::from(a)],
558 )
559 .await
560 .unwrap();
561
562 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
563
564 mpool.api.set_state_sequence(&sender, 1);
565
566 let api = mpool.api.clone();
567 let bls_sig_cache = mpool.bls_sig_cache.clone();
568 let pending = mpool.pending.clone();
569 let cur_tipset = mpool.cur_tipset.clone();
570
571 head_change(
572 api.as_ref(),
573 bls_sig_cache.as_ref(),
574 repub_trigger.clone(),
575 republished.as_ref(),
576 pending.as_ref(),
577 cur_tipset.as_ref(),
578 Vec::new(),
579 vec![Tipset::from(&b)],
580 )
581 .await
582 .unwrap();
583
584 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
585
586 mpool.api.set_state_sequence(&sender, 0);
587
588 head_change(
589 api.as_ref(),
590 bls_sig_cache.as_ref(),
591 repub_trigger.clone(),
592 republished.as_ref(),
593 pending.as_ref(),
594 cur_tipset.as_ref(),
595 vec![Tipset::from(b)],
596 Vec::new(),
597 )
598 .await
599 .unwrap();
600
601 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
602
603 let (p, _) = mpool.pending().unwrap();
604 assert_eq!(p.len(), 3);
605 }
606
607 #[tokio::test]
608 async fn test_async_message_pool() {
609 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
610 let mut wallet = Wallet::new(keystore);
611 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
612 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
613
614 let tma = TestApi::default();
615 tma.set_state_sequence(&sender, 0);
616 let (tx, _rx) = flume::bounded(50);
617 let mut services = JoinSet::new();
618 let mpool = MessagePool::new(
619 tma,
620 tx,
621 Default::default(),
622 Default::default(),
623 &mut services,
624 )
625 .unwrap();
626
627 let mut smsg_vec = Vec::new();
628 for i in 0..3 {
629 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
630 smsg_vec.push(msg);
631 }
632
633 assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
634 mpool.push(smsg_vec[0].clone()).await.unwrap();
635 assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
636 mpool.push(smsg_vec[1].clone()).await.unwrap();
637 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
638 mpool.push(smsg_vec[2].clone()).await.unwrap();
639 assert_eq!(mpool.get_sequence(&sender).unwrap(), 3);
640
641 let header = mock_block(1, 1);
642 let tipset = Tipset::from(&header.clone());
643
644 mpool.api.set_heaviest_tipset(tipset.clone());
645
646 tokio::time::sleep(Duration::new(2, 0)).await;
648
649 let cur_ts = mpool.current_tipset();
650 assert_eq!(cur_ts, tipset);
651 }
652
653 #[tokio::test]
654 async fn test_msg_chains() {
655 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
656 let mut wallet = Wallet::new(keystore);
657 let a1 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
658 let a2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
659 let tma = TestApi::default();
660 let gas_limit = 6955002;
661
662 let a = mock_block(1, 1);
663 let ts = Tipset::from(a);
664 let chain_config = ChainConfig::default();
665
666 let mut mset = HashMap::new();
670 let mut smsg_vec = Vec::new();
671 for i in 0..10 {
672 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
673 smsg_vec.push(msg.clone());
674 mset.insert(i, msg);
675 }
676
677 let mut chains = Chains::new();
678 create_message_chains(
679 &tma,
680 &a1,
681 &mset,
682 &TokenAmount::zero(),
683 &ts,
684 &mut chains,
685 &chain_config,
686 )
687 .unwrap();
688 assert_eq!(chains.len(), 1, "expected a single chain");
689 assert_eq!(
690 chains[0].msgs.len(),
691 10,
692 "expected 10 messages in single chain, got: {}",
693 chains[0].msgs.len()
694 );
695 for (i, m) in chains[0].msgs.iter().enumerate() {
696 assert_eq!(
697 m.sequence(),
698 i as u64,
699 "expected sequence {} but got {}",
700 i,
701 m.sequence()
702 );
703 }
704
705 let mut mset = HashMap::new();
708 let mut smsg_vec = Vec::new();
709 for i in 0..10 {
710 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 10 - i);
711 smsg_vec.push(msg.clone());
712 mset.insert(i, msg);
713 }
714 let mut chains = Chains::new();
715 create_message_chains(
716 &tma,
717 &a1,
718 &mset,
719 &TokenAmount::zero(),
720 &ts,
721 &mut chains,
722 &chain_config,
723 )
724 .unwrap();
725 assert_eq!(chains.len(), 10, "expected 10 chains");
726
727 for i in 0..chains.len() {
728 assert_eq!(
729 chains[i].msgs.len(),
730 1,
731 "expected 1 message in chain {} but got {}",
732 i,
733 chains[i].msgs.len()
734 );
735 }
736
737 for i in 0..chains.len() {
738 let m = &chains[i].msgs[0];
739 assert_eq!(
740 m.sequence(),
741 i as u64,
742 "expected sequence {} but got {}",
743 i,
744 m.sequence()
745 );
746 }
747
748 let mut mset = HashMap::new();
752 let mut smsg_vec = Vec::new();
753 for i in 0..10 {
754 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i % 3);
755 smsg_vec.push(msg.clone());
756 mset.insert(i, msg);
757 }
758 let mut chains = Chains::new();
759 create_message_chains(
760 &tma,
761 &a1,
762 &mset,
763 &TokenAmount::zero(),
764 &ts,
765 &mut chains,
766 &chain_config,
767 )
768 .unwrap();
769 assert_eq!(chains.len(), 2, "expected 2 chains");
770 assert_eq!(chains[0].msgs.len(), 9);
771 assert_eq!(chains[1].msgs.len(), 1);
772 let mut next_nonce = 0;
773 for i in 0..chains.len() {
774 for m in chains[i].msgs.iter() {
775 assert_eq!(
776 next_nonce,
777 m.sequence(),
778 "expected nonce {} but got {}",
779 next_nonce,
780 m.sequence()
781 );
782 next_nonce += 1;
783 }
784 }
785
786 let mut mset = HashMap::new();
791 let mut smsg_vec = Vec::new();
792 for i in 0..10 {
793 let bias = (12 - i) / 3;
794 let msg = create_smsg(
795 &a2,
796 &a1,
797 wallet.borrow_mut(),
798 i,
799 gas_limit,
800 1 + i % 3 + bias,
801 );
802 smsg_vec.push(msg.clone());
803 mset.insert(i, msg);
804 }
805
806 let mut chains = Chains::new();
807 create_message_chains(
808 &tma,
809 &a1,
810 &mset,
811 &TokenAmount::zero(),
812 &ts,
813 &mut chains,
814 &chain_config,
815 )
816 .unwrap();
817
818 for i in 0..chains.len() {
819 let expected_len = if i > 2 { 1 } else { 3 };
820 assert_eq!(
821 chains[i].msgs.len(),
822 expected_len,
823 "expected {} message in chain {} but got {}",
824 expected_len,
825 i,
826 chains[i].msgs.len()
827 );
828 }
829
830 let mut next_nonce = 0;
831 for i in 0..chains.len() {
832 for m in chains[i].msgs.iter() {
833 assert_eq!(
834 next_nonce,
835 m.sequence(),
836 "expected nonce {} but got {}",
837 next_nonce,
838 m.sequence()
839 );
840 next_nonce += 1;
841 }
842 }
843
844 let mut mset = HashMap::new();
848 let mut smsg_vec = Vec::new();
849 for i in 0..10 {
850 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i * 2, gas_limit, 1 + i);
851 smsg_vec.push(msg.clone());
852 mset.insert(i, msg);
853 }
854
855 let mut chains = Chains::new();
856 create_message_chains(
857 &tma,
858 &a1,
859 &mset,
860 &TokenAmount::zero(),
861 &ts,
862 &mut chains,
863 &chain_config,
864 )
865 .unwrap();
866 assert_eq!(chains.len(), 1, "expected a single chain");
867 for (i, m) in chains[0].msgs.iter().enumerate() {
868 assert_eq!(
869 m.sequence(),
870 i as u64,
871 "expected nonce {} but got {}",
872 i,
873 m.sequence()
874 );
875 }
876
877 let mut mset = HashMap::new();
881 let mut smsg_vec = Vec::new();
882 tma.set_state_balance_raw(&a1, TokenAmount::from_atto(1_000_000_000_000_000_000_u64));
883 for i in 0..10 {
884 let msg = if i != 5 {
885 create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i)
886 } else {
887 create_smsg(&a2, &a1, wallet.borrow_mut(), i, 1, 1 + i)
888 };
889 smsg_vec.push(msg.clone());
890 mset.insert(i, msg);
891 }
892 let mut chains = Chains::new();
893 create_message_chains(
894 &tma,
895 &a1,
896 &mset,
897 &TokenAmount::zero(),
898 &ts,
899 &mut chains,
900 &chain_config,
901 )
902 .unwrap();
903 assert_eq!(chains.len(), 1, "expected a single chain");
904 assert_eq!(chains[0].msgs.len(), 5);
905 for (i, m) in chains[0].msgs.iter().enumerate() {
906 assert_eq!(
907 m.sequence(),
908 i as u64,
909 "expected nonce {} but got {}",
910 i,
911 m.sequence()
912 );
913 }
914
915 let mut mset = HashMap::new();
919 let mut smsg_vec = Vec::new();
920 let max_messages = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / gas_limit;
921 let n_messages = max_messages + 1;
922 for i in 0..n_messages {
923 let msg = create_smsg(
924 &a2,
925 &a1,
926 wallet.borrow_mut(),
927 i as u64,
928 gas_limit,
929 (1 + i) as u64,
930 );
931 smsg_vec.push(msg.clone());
932 mset.insert(i as u64, msg);
933 }
934 let mut chains = Chains::new();
935 create_message_chains(
936 &tma,
937 &a1,
938 &mset,
939 &TokenAmount::zero(),
940 &ts,
941 &mut chains,
942 &chain_config,
943 )
944 .unwrap();
945 assert_eq!(chains.len(), 1, "expected a single chain");
946 assert_eq!(chains[0].msgs.len(), max_messages as usize);
947 for (i, m) in chains[0].msgs.iter().enumerate() {
948 assert_eq!(
949 m.sequence(),
950 i as u64,
951 "expected nonce {} but got {}",
952 i,
953 m.sequence()
954 );
955 }
956
957 tma.set_state_balance_raw(&a1, TokenAmount::from_atto(300 * gas_limit + 1));
959 let mut mset = HashMap::new();
960 let mut smsg_vec = Vec::new();
961 for i in 0..10 {
962 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
963 smsg_vec.push(msg.clone());
964 mset.insert(i, msg);
965 }
966 let mut chains = Chains::new();
967 create_message_chains(
968 &tma,
969 &a1,
970 &mset,
971 &TokenAmount::zero(),
972 &ts,
973 &mut chains,
974 &chain_config,
975 )
976 .unwrap();
977 assert_eq!(chains.len(), 1, "expected a single chain");
978 assert_eq!(chains[0].msgs.len(), 2);
979 for (i, m) in chains[0].msgs.iter().enumerate() {
980 assert_eq!(
981 m.sequence(),
982 i as u64,
983 "expected nonce {} but got {}",
984 i,
985 m.sequence()
986 );
987 }
988 }
989}