1pub(in crate::message_pool) mod metrics;
5pub(in crate::message_pool) mod msg_pool;
6pub(in crate::message_pool) mod provider;
7pub mod selection;
8#[cfg(test)]
9pub mod test_provider;
10pub(in crate::message_pool) mod utils;
11
12use std::{borrow::BorrowMut, cmp::Ordering};
13
14use crate::blocks::Tipset;
15use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
16use crate::message::{Message as MessageTrait, SignedMessage};
17use crate::networks::ChainConfig;
18use crate::shim::{address::Address, crypto::Signature};
19use crate::utils::cache::SizeTrackingLruCache;
20use crate::utils::get_size::CidWrapper;
21use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
22use cid::Cid;
23use fvm_ipld_encoding::to_vec;
24use parking_lot::RwLock as SyncRwLock;
25use tracing::error;
26use utils::{get_base_fee_lower_bound, recover_sig};
27
28use super::errors::Error;
29use crate::message_pool::{
30 msg_chain::{Chains, create_message_chains},
31 msg_pool::{MsgSet, add_helper, remove},
32 provider::Provider,
33};
34
35const REPLACE_BY_FEE_RATIO: f32 = 1.25;
36const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64;
37const RBF_DENOM: u64 = 256;
38const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100;
39const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
40const REPUB_MSG_LIMIT: usize = 30;
41const MIN_GAS: u64 = 1298450;
42
43fn get_state_sequence<T>(api: &T, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error>
46where
47 T: Provider,
48{
49 let actor = api.get_actor_after(addr, cur_ts)?;
50 let base_sequence = actor.sequence;
51
52 Ok(base_sequence)
53}
54
55#[allow(clippy::too_many_arguments)]
56async fn republish_pending_messages<T>(
57 api: &T,
58 network_sender: &flume::Sender<NetworkMessage>,
59 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
60 cur_tipset: &SyncRwLock<Tipset>,
61 republished: &SyncRwLock<HashSet<Cid>>,
62 local_addrs: &SyncRwLock<Vec<Address>>,
63 chain_config: &ChainConfig,
64) -> Result<(), Error>
65where
66 T: Provider,
67{
68 let ts = cur_tipset.read().clone();
69 let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
70
71 republished.write().clear();
72
73 for actor in local_addrs.read().iter() {
76 if let Some(mset) = pending.read().get(actor) {
77 if mset.msgs.is_empty() {
78 continue;
79 }
80 let mut pend: HashMap<u64, SignedMessage> = HashMap::with_capacity(mset.msgs.len());
81 for (nonce, m) in mset.msgs.clone().into_iter() {
82 pend.insert(nonce, m);
83 }
84 pending_map.insert(*actor, pend);
85 }
86 }
87
88 let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?;
89
90 let network_name = chain_config.network.genesis_name();
91 for m in msgs.iter() {
92 let mb = to_vec(m)?;
93 network_sender
94 .send_async(NetworkMessage::PubsubMessage {
95 topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
96 message: mb,
97 })
98 .await
99 .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
100 }
101
102 let mut republished_t = HashSet::new();
103 for m in msgs.iter() {
104 republished_t.insert(m.cid());
105 }
106 *republished.write() = republished_t;
107
108 Ok(())
109}
110
111fn select_messages_for_block<T>(
116 api: &T,
117 chain_config: &ChainConfig,
118 base: &Tipset,
119 pending: HashMap<Address, HashMap<u64, SignedMessage>>,
120) -> Result<Vec<SignedMessage>, Error>
121where
122 T: Provider,
123{
124 let mut msgs: Vec<SignedMessage> = vec![];
125
126 let base_fee = api.chain_compute_base_fee(base)?;
127 let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
128
129 if pending.is_empty() {
130 return Ok(msgs);
131 }
132
133 let mut chains = Chains::new();
134 for (actor, mset) in pending.iter() {
135 create_message_chains(
136 api,
137 actor,
138 mset,
139 &base_fee_lower_bound,
140 base,
141 &mut chains,
142 chain_config,
143 )?;
144 }
145
146 if chains.is_empty() {
147 return Ok(msgs);
148 }
149
150 chains.sort(false);
151
152 let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
153 let mut i = 0;
154 'l: while let Some(chain) = chains.get_mut_at(i) {
155 if msgs.len() > REPUB_MSG_LIMIT {
157 break;
158 }
159
160 if gas_limit <= MIN_GAS {
161 break;
162 }
163
164 if !chain.valid {
166 i += 1;
167 continue;
168 }
169
170 if chain.gas_limit <= gas_limit {
172 for m in chain.msgs.iter() {
175 if m.gas_fee_cap() < base_fee_lower_bound {
176 let key = chains.get_key_at(i);
177 chains.invalidate(key);
178 continue 'l;
179 }
180 gas_limit = gas_limit.saturating_sub(m.gas_limit());
181 msgs.push(m.clone());
182 }
183
184 i += 1;
185 continue;
186 }
187
188 chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
191 let mut j = i;
192 while j < chains.len() - 1 {
193 #[allow(clippy::indexing_slicing)]
194 if chains[j].compare(&chains[j + 1]) == Ordering::Less {
195 break;
196 }
197 chains.key_vec.swap(i, i + 1);
198 j += 1;
199 }
200 }
201
202 if msgs.len() > REPUB_MSG_LIMIT {
203 msgs.truncate(REPUB_MSG_LIMIT);
204 }
205
206 Ok(msgs)
207}
208
209#[allow(clippy::too_many_arguments)]
213pub async fn head_change<T>(
214 api: &T,
215 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
216 repub_trigger: flume::Sender<()>,
217 republished: &SyncRwLock<HashSet<Cid>>,
218 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
219 cur_tipset: &SyncRwLock<Tipset>,
220 revert: Vec<Tipset>,
221 apply: Vec<Tipset>,
222) -> Result<(), Error>
223where
224 T: Provider + 'static,
225{
226 let mut repub = false;
227 let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
228 for ts in revert {
229 let pts = api.load_tipset(ts.parents())?;
230 *cur_tipset.write() = pts;
231
232 let mut msgs: Vec<SignedMessage> = Vec::new();
233 for block in ts.block_headers() {
234 let (umsg, smsgs) = api.messages_for_block(block)?;
235 msgs.extend(smsgs);
236 for msg in umsg {
237 let smsg = recover_sig(bls_sig_cache, msg)?;
238 msgs.push(smsg)
239 }
240 }
241
242 for msg in msgs {
243 add_to_selected_msgs(msg, rmsgs.borrow_mut());
244 }
245 }
246
247 for ts in apply {
248 for b in ts.block_headers() {
249 let (msgs, smsgs) = api.messages_for_block(b)?;
250
251 for msg in smsgs {
252 remove_from_selected_msgs(
253 &msg.from(),
254 pending,
255 msg.sequence(),
256 rmsgs.borrow_mut(),
257 )?;
258 if !repub && republished.write().insert(msg.cid()) {
259 repub = true;
260 }
261 }
262 for msg in msgs {
263 remove_from_selected_msgs(&msg.from, pending, msg.sequence, rmsgs.borrow_mut())?;
264 if !repub && republished.write().insert(msg.cid()) {
265 repub = true;
266 }
267 }
268 }
269 *cur_tipset.write() = ts;
270 }
271 if repub {
272 repub_trigger
273 .send_async(())
274 .await
275 .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
276 }
277 for (_, hm) in rmsgs {
278 for (_, msg) in hm {
279 let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.read().clone())?;
280 if let Err(e) = add_helper(api, bls_sig_cache, pending, msg, sequence) {
281 error!("Failed to read message from reorg to mpool: {}", e);
282 }
283 }
284 }
285 Ok(())
286}
287
288pub(in crate::message_pool) fn remove_from_selected_msgs(
292 from: &Address,
293 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
294 sequence: u64,
295 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
296) -> Result<(), Error> {
297 if let Some(temp) = rmsgs.get_mut(from) {
298 if temp.get_mut(&sequence).is_some() {
299 temp.remove(&sequence);
300 } else {
301 remove(from, pending, sequence, true)?;
302 }
303 } else {
304 remove(from, pending, sequence, true)?;
305 }
306 Ok(())
307}
308
309pub(in crate::message_pool) fn add_to_selected_msgs(
312 m: SignedMessage,
313 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
314) {
315 let s = rmsgs.get_mut(&m.from());
316 if let Some(s) = s {
317 s.insert(m.sequence(), m);
318 } else {
319 rmsgs.insert(m.from(), HashMap::new());
320 rmsgs.get_mut(&m.from()).unwrap().insert(m.sequence(), m);
321 }
322}
323
324#[cfg(test)]
325pub mod tests {
326 use std::{borrow::BorrowMut, time::Duration};
327
328 use crate::blocks::Tipset;
329 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
330 use crate::message::SignedMessage;
331 use crate::networks::ChainConfig;
332 use crate::shim::{
333 address::Address,
334 crypto::SignatureType,
335 econ::TokenAmount,
336 message::{Message, Message_v3},
337 };
338 use num_traits::Zero;
339 use test_provider::*;
340 use tokio::task::JoinSet;
341
342 use super::*;
343 use crate::message_pool::{
344 msg_chain::{Chains, create_message_chains},
345 msg_pool::MessagePool,
346 };
347
348 #[tokio::test]
349 async fn test_per_actor_limit() {
350 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
351 let mut wallet = Wallet::new(keystore);
352 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
353 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
354 let tma = TestApi::with_max_actor_pending_messages(200);
355 tma.set_state_sequence(&sender, 0);
356
357 let (tx, _rx) = flume::bounded(50);
358 let mut services = JoinSet::new();
359 let mpool = MessagePool::new(
360 tma,
361 tx,
362 Default::default(),
363 Default::default(),
364 &mut services,
365 )
366 .unwrap();
367 let mut smsg_vec = Vec::new();
368 for i in 0..(mpool.api.max_actor_pending_messages() + 1) {
369 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
370 smsg_vec.push(msg);
371 }
372
373 let (last, body) = smsg_vec.split_last().unwrap();
374 for smsg in body {
375 mpool.add(smsg.clone()).unwrap();
376 }
377 assert_eq!(
378 mpool.add(last.clone()),
379 Err(Error::TooManyPendingMessages(sender.to_string(), true))
380 );
381 }
382
383 pub fn create_smsg(
384 to: &Address,
385 from: &Address,
386 wallet: &mut Wallet,
387 sequence: u64,
388 gas_limit: i64,
389 gas_price: u64,
390 ) -> SignedMessage {
391 let umsg: Message = Message_v3 {
392 to: to.into(),
393 from: from.into(),
394 sequence,
395 gas_limit: gas_limit as u64,
396 gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
397 gas_premium: TokenAmount::from_atto(gas_price).into(),
398 ..Message_v3::default()
399 }
400 .into();
401 let msg_signing_bytes = umsg.cid().to_bytes();
402 let sig = wallet.sign(from, msg_signing_bytes.as_slice()).unwrap();
403 SignedMessage::new_unchecked(umsg, sig)
404 }
405
406 pub fn create_fake_smsg(
410 pool: &MessagePool<TestApi>,
411 to: &Address,
412 from: &Address,
413 sequence: u64,
414 gas_limit: i64,
415 gas_price: u64,
416 ) -> SignedMessage {
417 let umsg: Message = Message_v3 {
418 to: to.into(),
419 from: from.into(),
420 sequence,
421 gas_limit: gas_limit as u64,
422 gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
423 gas_premium: TokenAmount::from_atto(gas_price).into(),
424 ..Message_v3::default()
425 }
426 .into();
427 let sig = Signature::new_secp256k1(vec![]);
428 let signed = SignedMessage::new_unchecked(umsg, sig);
429 let cid = signed.cid();
430 pool.sig_val_cache.push(cid.into(), ());
431 signed
432 }
433
434 #[tokio::test]
435 async fn test_message_pool() {
436 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
437 let mut wallet = Wallet::new(keystore);
438 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
439 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
440 let tma = TestApi::default();
441 tma.set_state_sequence(&sender, 0);
442
443 let (tx, _rx) = flume::bounded(50);
444 let mut services = JoinSet::new();
445 let mpool = MessagePool::new(
446 tma,
447 tx,
448 Default::default(),
449 Default::default(),
450 &mut services,
451 )
452 .unwrap();
453 let mut smsg_vec = Vec::new();
454 for i in 0..2 {
455 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
456 smsg_vec.push(msg);
457 }
458
459 mpool.api.inner.lock().set_state_sequence(&sender, 0);
460 assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
461 mpool.add(smsg_vec[0].clone()).unwrap();
462 assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
463 mpool.add(smsg_vec[1].clone()).unwrap();
464 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
465
466 let a = mock_block(1, 1);
467
468 mpool.api.inner.lock().set_block_messages(&a, smsg_vec);
469 let api = mpool.api.clone();
470 let bls_sig_cache = mpool.bls_sig_cache.clone();
471 let pending = mpool.pending.clone();
472 let cur_tipset = mpool.cur_tipset.clone();
473 let repub_trigger = mpool.repub_trigger.clone();
474 let republished = mpool.republished.clone();
475 head_change(
476 api.as_ref(),
477 bls_sig_cache.as_ref(),
478 repub_trigger,
479 republished.as_ref(),
480 pending.as_ref(),
481 cur_tipset.as_ref(),
482 Vec::new(),
483 vec![Tipset::from(a)],
484 )
485 .await
486 .unwrap();
487
488 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
489 }
490
491 #[tokio::test]
492 async fn test_revert_messages() {
493 let tma = TestApi::default();
494 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
495 let mut wallet = Wallet::new(keystore);
496
497 let a = mock_block(1, 1);
498 let tipset = Tipset::from(&a);
499 let b = mock_block_with_parents(&tipset, 1, 1);
500
501 let sender = wallet.generate_addr(SignatureType::Bls).unwrap();
502 let target = Address::new_id(1001);
503
504 let mut smsg_vec = Vec::new();
505
506 for i in 0..4 {
507 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
508 smsg_vec.push(msg);
509 }
510 let (tx, _rx) = flume::bounded(50);
511 let mut services = JoinSet::new();
512 let mpool = MessagePool::new(
513 tma,
514 tx,
515 Default::default(),
516 Default::default(),
517 &mut services,
518 )
519 .unwrap();
520
521 {
522 let mut api_temp = mpool.api.inner.lock();
523 api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]);
524 api_temp.set_block_messages(&b.clone(), smsg_vec[1..4].to_vec());
525 api_temp.set_state_sequence(&sender, 0);
526 drop(api_temp);
527 }
528
529 mpool.add(smsg_vec[0].clone()).unwrap();
530 mpool.add(smsg_vec[1].clone()).unwrap();
531 mpool.add(smsg_vec[2].clone()).unwrap();
532 mpool.add(smsg_vec[3].clone()).unwrap();
533
534 mpool.api.set_state_sequence(&sender, 0);
535
536 let api = mpool.api.clone();
537 let bls_sig_cache = mpool.bls_sig_cache.clone();
538 let pending = mpool.pending.clone();
539 let cur_tipset = mpool.cur_tipset.clone();
540 let repub_trigger = mpool.repub_trigger.clone();
541 let republished = mpool.republished.clone();
542 head_change(
543 api.as_ref(),
544 bls_sig_cache.as_ref(),
545 repub_trigger.clone(),
546 republished.as_ref(),
547 pending.as_ref(),
548 cur_tipset.as_ref(),
549 Vec::new(),
550 vec![Tipset::from(a)],
551 )
552 .await
553 .unwrap();
554
555 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
556
557 mpool.api.set_state_sequence(&sender, 1);
558
559 let api = mpool.api.clone();
560 let bls_sig_cache = mpool.bls_sig_cache.clone();
561 let pending = mpool.pending.clone();
562 let cur_tipset = mpool.cur_tipset.clone();
563
564 head_change(
565 api.as_ref(),
566 bls_sig_cache.as_ref(),
567 repub_trigger.clone(),
568 republished.as_ref(),
569 pending.as_ref(),
570 cur_tipset.as_ref(),
571 Vec::new(),
572 vec![Tipset::from(&b)],
573 )
574 .await
575 .unwrap();
576
577 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
578
579 mpool.api.set_state_sequence(&sender, 0);
580
581 head_change(
582 api.as_ref(),
583 bls_sig_cache.as_ref(),
584 repub_trigger.clone(),
585 republished.as_ref(),
586 pending.as_ref(),
587 cur_tipset.as_ref(),
588 vec![Tipset::from(b)],
589 Vec::new(),
590 )
591 .await
592 .unwrap();
593
594 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
595
596 let (p, _) = mpool.pending().unwrap();
597 assert_eq!(p.len(), 3);
598 }
599
600 #[tokio::test]
601 async fn test_async_message_pool() {
602 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
603 let mut wallet = Wallet::new(keystore);
604 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
605 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
606
607 let tma = TestApi::default();
608 tma.set_state_sequence(&sender, 0);
609 let (tx, _rx) = flume::bounded(50);
610 let mut services = JoinSet::new();
611 let mpool = MessagePool::new(
612 tma,
613 tx,
614 Default::default(),
615 Default::default(),
616 &mut services,
617 )
618 .unwrap();
619
620 let mut smsg_vec = Vec::new();
621 for i in 0..3 {
622 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
623 smsg_vec.push(msg);
624 }
625
626 assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
627 mpool.push(smsg_vec[0].clone()).await.unwrap();
628 assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
629 mpool.push(smsg_vec[1].clone()).await.unwrap();
630 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
631 mpool.push(smsg_vec[2].clone()).await.unwrap();
632 assert_eq!(mpool.get_sequence(&sender).unwrap(), 3);
633
634 let header = mock_block(1, 1);
635 let tipset = Tipset::from(&header.clone());
636
637 mpool.api.set_heaviest_tipset(tipset.clone());
638
639 tokio::time::sleep(Duration::new(2, 0)).await;
641
642 let cur_ts = mpool.current_tipset();
643 assert_eq!(cur_ts, tipset);
644 }
645
646 #[tokio::test]
647 async fn test_msg_chains() {
648 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
649 let mut wallet = Wallet::new(keystore);
650 let a1 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
651 let a2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
652 let tma = TestApi::default();
653 let gas_limit = 6955002;
654
655 let a = mock_block(1, 1);
656 let ts = Tipset::from(a);
657 let chain_config = ChainConfig::default();
658
659 let mut mset = HashMap::new();
663 let mut smsg_vec = Vec::new();
664 for i in 0..10 {
665 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
666 smsg_vec.push(msg.clone());
667 mset.insert(i, msg);
668 }
669
670 let mut chains = Chains::new();
671 create_message_chains(
672 &tma,
673 &a1,
674 &mset,
675 &TokenAmount::zero(),
676 &ts,
677 &mut chains,
678 &chain_config,
679 )
680 .unwrap();
681 assert_eq!(chains.len(), 1, "expected a single chain");
682 assert_eq!(
683 chains[0].msgs.len(),
684 10,
685 "expected 10 messages in single chain, got: {}",
686 chains[0].msgs.len()
687 );
688 for (i, m) in chains[0].msgs.iter().enumerate() {
689 assert_eq!(
690 m.sequence(),
691 i as u64,
692 "expected sequence {} but got {}",
693 i,
694 m.sequence()
695 );
696 }
697
698 let mut mset = HashMap::new();
701 let mut smsg_vec = Vec::new();
702 for i in 0..10 {
703 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 10 - i);
704 smsg_vec.push(msg.clone());
705 mset.insert(i, msg);
706 }
707 let mut chains = Chains::new();
708 create_message_chains(
709 &tma,
710 &a1,
711 &mset,
712 &TokenAmount::zero(),
713 &ts,
714 &mut chains,
715 &chain_config,
716 )
717 .unwrap();
718 assert_eq!(chains.len(), 10, "expected 10 chains");
719
720 for i in 0..chains.len() {
721 assert_eq!(
722 chains[i].msgs.len(),
723 1,
724 "expected 1 message in chain {} but got {}",
725 i,
726 chains[i].msgs.len()
727 );
728 }
729
730 for i in 0..chains.len() {
731 let m = &chains[i].msgs[0];
732 assert_eq!(
733 m.sequence(),
734 i as u64,
735 "expected sequence {} but got {}",
736 i,
737 m.sequence()
738 );
739 }
740
741 let mut mset = HashMap::new();
745 let mut smsg_vec = Vec::new();
746 for i in 0..10 {
747 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i % 3);
748 smsg_vec.push(msg.clone());
749 mset.insert(i, msg);
750 }
751 let mut chains = Chains::new();
752 create_message_chains(
753 &tma,
754 &a1,
755 &mset,
756 &TokenAmount::zero(),
757 &ts,
758 &mut chains,
759 &chain_config,
760 )
761 .unwrap();
762 assert_eq!(chains.len(), 2, "expected 2 chains");
763 assert_eq!(chains[0].msgs.len(), 9);
764 assert_eq!(chains[1].msgs.len(), 1);
765 let mut next_nonce = 0;
766 for i in 0..chains.len() {
767 for m in chains[i].msgs.iter() {
768 assert_eq!(
769 next_nonce,
770 m.sequence(),
771 "expected nonce {} but got {}",
772 next_nonce,
773 m.sequence()
774 );
775 next_nonce += 1;
776 }
777 }
778
779 let mut mset = HashMap::new();
784 let mut smsg_vec = Vec::new();
785 for i in 0..10 {
786 let bias = (12 - i) / 3;
787 let msg = create_smsg(
788 &a2,
789 &a1,
790 wallet.borrow_mut(),
791 i,
792 gas_limit,
793 1 + i % 3 + bias,
794 );
795 smsg_vec.push(msg.clone());
796 mset.insert(i, msg);
797 }
798
799 let mut chains = Chains::new();
800 create_message_chains(
801 &tma,
802 &a1,
803 &mset,
804 &TokenAmount::zero(),
805 &ts,
806 &mut chains,
807 &chain_config,
808 )
809 .unwrap();
810
811 for i in 0..chains.len() {
812 let expected_len = if i > 2 { 1 } else { 3 };
813 assert_eq!(
814 chains[i].msgs.len(),
815 expected_len,
816 "expected {} message in chain {} but got {}",
817 expected_len,
818 i,
819 chains[i].msgs.len()
820 );
821 }
822
823 let mut next_nonce = 0;
824 for i in 0..chains.len() {
825 for m in chains[i].msgs.iter() {
826 assert_eq!(
827 next_nonce,
828 m.sequence(),
829 "expected nonce {} but got {}",
830 next_nonce,
831 m.sequence()
832 );
833 next_nonce += 1;
834 }
835 }
836
837 let mut mset = HashMap::new();
841 let mut smsg_vec = Vec::new();
842 for i in 0..10 {
843 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i * 2, gas_limit, 1 + i);
844 smsg_vec.push(msg.clone());
845 mset.insert(i, msg);
846 }
847
848 let mut chains = Chains::new();
849 create_message_chains(
850 &tma,
851 &a1,
852 &mset,
853 &TokenAmount::zero(),
854 &ts,
855 &mut chains,
856 &chain_config,
857 )
858 .unwrap();
859 assert_eq!(chains.len(), 1, "expected a single chain");
860 for (i, m) in chains[0].msgs.iter().enumerate() {
861 assert_eq!(
862 m.sequence(),
863 i as u64,
864 "expected nonce {} but got {}",
865 i,
866 m.sequence()
867 );
868 }
869
870 let mut mset = HashMap::new();
874 let mut smsg_vec = Vec::new();
875 tma.set_state_balance_raw(&a1, TokenAmount::from_atto(1_000_000_000_000_000_000_u64));
876 for i in 0..10 {
877 let msg = if i != 5 {
878 create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i)
879 } else {
880 create_smsg(&a2, &a1, wallet.borrow_mut(), i, 1, 1 + i)
881 };
882 smsg_vec.push(msg.clone());
883 mset.insert(i, msg);
884 }
885 let mut chains = Chains::new();
886 create_message_chains(
887 &tma,
888 &a1,
889 &mset,
890 &TokenAmount::zero(),
891 &ts,
892 &mut chains,
893 &chain_config,
894 )
895 .unwrap();
896 assert_eq!(chains.len(), 1, "expected a single chain");
897 assert_eq!(chains[0].msgs.len(), 5);
898 for (i, m) in chains[0].msgs.iter().enumerate() {
899 assert_eq!(
900 m.sequence(),
901 i as u64,
902 "expected nonce {} but got {}",
903 i,
904 m.sequence()
905 );
906 }
907
908 let mut mset = HashMap::new();
912 let mut smsg_vec = Vec::new();
913 let max_messages = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / gas_limit;
914 let n_messages = max_messages + 1;
915 for i in 0..n_messages {
916 let msg = create_smsg(
917 &a2,
918 &a1,
919 wallet.borrow_mut(),
920 i as u64,
921 gas_limit,
922 (1 + i) as u64,
923 );
924 smsg_vec.push(msg.clone());
925 mset.insert(i as u64, msg);
926 }
927 let mut chains = Chains::new();
928 create_message_chains(
929 &tma,
930 &a1,
931 &mset,
932 &TokenAmount::zero(),
933 &ts,
934 &mut chains,
935 &chain_config,
936 )
937 .unwrap();
938 assert_eq!(chains.len(), 1, "expected a single chain");
939 assert_eq!(chains[0].msgs.len(), max_messages as usize);
940 for (i, m) in chains[0].msgs.iter().enumerate() {
941 assert_eq!(
942 m.sequence(),
943 i as u64,
944 "expected nonce {} but got {}",
945 i,
946 m.sequence()
947 );
948 }
949
950 tma.set_state_balance_raw(&a1, TokenAmount::from_atto(300 * gas_limit + 1));
952 let mut mset = HashMap::new();
953 let mut smsg_vec = Vec::new();
954 for i in 0..10 {
955 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
956 smsg_vec.push(msg.clone());
957 mset.insert(i, msg);
958 }
959 let mut chains = Chains::new();
960 create_message_chains(
961 &tma,
962 &a1,
963 &mset,
964 &TokenAmount::zero(),
965 &ts,
966 &mut chains,
967 &chain_config,
968 )
969 .unwrap();
970 assert_eq!(chains.len(), 1, "expected a single chain");
971 assert_eq!(chains[0].msgs.len(), 2);
972 for (i, m) in chains[0].msgs.iter().enumerate() {
973 assert_eq!(
974 m.sequence(),
975 i as u64,
976 "expected nonce {} but got {}",
977 i,
978 m.sequence()
979 );
980 }
981 }
982}