1pub(in crate::message_pool) mod metrics;
5pub(in crate::message_pool) mod msg_pool;
6pub(in crate::message_pool) mod provider;
7pub mod selection;
8#[cfg(test)]
9pub mod test_provider;
10pub(in crate::message_pool) mod utils;
11
12use std::{borrow::BorrowMut, cmp::Ordering};
13
14use crate::blocks::Tipset;
15use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
16use crate::message::{MessageRead as _, SignedMessage};
17use crate::networks::ChainConfig;
18use crate::shim::{address::Address, crypto::Signature};
19use crate::state_manager::IdToAddressCache;
20use crate::utils::ShallowClone as _;
21use crate::utils::cache::SizeTrackingLruCache;
22use crate::utils::get_size::CidWrapper;
23use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
24use cid::Cid;
25use fvm_ipld_encoding::to_vec;
26use parking_lot::RwLock as SyncRwLock;
27use tracing::error;
28use utils::{get_base_fee_lower_bound, recover_sig};
29
30use super::errors::Error;
31use crate::message_pool::{
32 msg_chain::{Chains, create_message_chains},
33 msg_pool::{
34 MsgSet, StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, remove,
35 resolve_to_key,
36 },
37 provider::Provider,
38};
39
40const REPLACE_BY_FEE_RATIO: f32 = 1.25;
41const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64;
42const RBF_DENOM: u64 = 256;
43const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100;
44const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
45const REPUB_MSG_LIMIT: usize = 30;
46const MIN_GAS: u64 = 1298450;
47
48#[allow(clippy::too_many_arguments)]
49async fn republish_pending_messages<T>(
50 api: &T,
51 network_sender: &flume::Sender<NetworkMessage>,
52 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
53 cur_tipset: &SyncRwLock<Tipset>,
54 republished: &SyncRwLock<HashSet<Cid>>,
55 local_addrs: &SyncRwLock<Vec<Address>>,
56 key_cache: &IdToAddressCache,
57 chain_config: &ChainConfig,
58) -> Result<(), Error>
59where
60 T: Provider,
61{
62 let ts = cur_tipset.read().shallow_clone();
63 let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
64
65 republished.write().clear();
66
67 for actor in local_addrs.read().iter() {
70 let Ok(resolved) = resolve_to_key(api, key_cache, actor, &ts).inspect_err(|e| {
71 tracing::debug!(%actor, "republish: failed to resolve address: {e:#}");
72 }) else {
73 continue;
74 };
75 if let Some(mset) = pending.read().get(&resolved) {
76 if mset.msgs.is_empty() {
77 continue;
78 }
79 let mut pend: HashMap<u64, SignedMessage> = HashMap::with_capacity(mset.msgs.len());
80 for (nonce, m) in mset.msgs.clone().into_iter() {
81 pend.insert(nonce, m);
82 }
83 pending_map.insert(resolved, pend);
84 }
85 }
86
87 let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?;
88
89 let network_name = chain_config.network.genesis_name();
90 for m in msgs.iter() {
91 let mb = to_vec(m)?;
92 network_sender
93 .send_async(NetworkMessage::PubsubMessage {
94 topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
95 message: mb,
96 })
97 .await
98 .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
99 }
100
101 let mut republished_t = HashSet::new();
102 for m in msgs.iter() {
103 republished_t.insert(m.cid());
104 }
105 *republished.write() = republished_t;
106
107 Ok(())
108}
109
110fn select_messages_for_block<T>(
115 api: &T,
116 chain_config: &ChainConfig,
117 base: &Tipset,
118 pending: HashMap<Address, HashMap<u64, SignedMessage>>,
119) -> Result<Vec<SignedMessage>, Error>
120where
121 T: Provider,
122{
123 let mut msgs: Vec<SignedMessage> = vec![];
124
125 let base_fee = api.chain_compute_base_fee(base)?;
126 let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
127
128 if pending.is_empty() {
129 return Ok(msgs);
130 }
131
132 let mut chains = Chains::new();
133 for (actor, mset) in pending.iter() {
134 create_message_chains(
135 api,
136 actor,
137 mset,
138 &base_fee_lower_bound,
139 base,
140 &mut chains,
141 chain_config,
142 )?;
143 }
144
145 if chains.is_empty() {
146 return Ok(msgs);
147 }
148
149 chains.sort(false);
150
151 let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
152 let mut i = 0;
153 'l: while let Some(chain) = chains.get_mut_at(i) {
154 if msgs.len() > REPUB_MSG_LIMIT {
156 break;
157 }
158
159 if gas_limit <= MIN_GAS {
160 break;
161 }
162
163 if !chain.valid {
165 i += 1;
166 continue;
167 }
168
169 if chain.gas_limit <= gas_limit {
171 for m in chain.msgs.iter() {
174 if m.gas_fee_cap() < base_fee_lower_bound {
175 let key = chains.get_key_at(i);
176 chains.invalidate(key);
177 continue 'l;
178 }
179 gas_limit = gas_limit.saturating_sub(m.gas_limit());
180 msgs.push(m.clone());
181 }
182
183 i += 1;
184 continue;
185 }
186
187 chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
190 let mut j = i;
191 while j < chains.len() - 1 {
192 #[allow(clippy::indexing_slicing)]
193 if chains[j].compare(&chains[j + 1]) == Ordering::Less {
194 break;
195 }
196 chains.key_vec.swap(i, i + 1);
197 j += 1;
198 }
199 }
200
201 if msgs.len() > REPUB_MSG_LIMIT {
202 msgs.truncate(REPUB_MSG_LIMIT);
203 }
204
205 Ok(msgs)
206}
207
208#[allow(clippy::too_many_arguments)]
220pub async fn head_change<T>(
221 api: &T,
222 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
223 repub_trigger: flume::Sender<()>,
224 republished: &SyncRwLock<HashSet<Cid>>,
225 pending: &SyncRwLock<HashMap<Address, MsgSet>>,
226 cur_tipset: &SyncRwLock<Tipset>,
227 key_cache: &IdToAddressCache,
228 state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
229 revert: Vec<Tipset>,
230 apply: Vec<Tipset>,
231) -> Result<(), Error>
232where
233 T: Provider + 'static,
234{
235 let mut repub = false;
236 let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
237 for ts in revert {
238 let Ok(pts) = api.load_tipset(ts.parents()) else {
239 tracing::error!("error loading reverted tipset parent");
240 continue;
241 };
242 *cur_tipset.write() = pts;
243
244 let mut msgs: Vec<SignedMessage> = Vec::new();
245 for block in ts.block_headers() {
246 let Ok((umsg, smsgs)) = api.messages_for_block(block) else {
247 tracing::error!("error retrieving messages for reverted block");
248 continue;
249 };
250 msgs.extend(smsgs);
251 for msg in umsg {
252 let msg_cid = msg.cid();
253 let Ok(smsg) = recover_sig(bls_sig_cache, msg) else {
254 tracing::debug!("could not recover signature for bls message {}", msg_cid);
255 continue;
256 };
257 msgs.push(smsg)
258 }
259 }
260
261 for msg in msgs {
262 add_to_selected_msgs(msg, rmsgs.borrow_mut());
263 }
264 }
265
266 for ts in apply {
267 let mpool_ctx = MpoolCtx {
268 api,
269 key_cache,
270 pending,
271 ts: &ts,
272 };
273 for b in ts.block_headers() {
274 let Ok((msgs, smsgs)) = api.messages_for_block(b) else {
275 tracing::error!("error retrieving messages for block");
276 continue;
277 };
278
279 for msg in smsgs {
280 mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?;
281 if !repub && republished.write().insert(msg.cid()) {
282 repub = true;
283 }
284 }
285 for msg in msgs {
286 mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?;
287 if !repub && republished.write().insert(msg.cid()) {
288 repub = true;
289 }
290 }
291 }
292 *cur_tipset.write() = ts;
293 }
294 if repub {
295 repub_trigger
296 .send_async(())
297 .await
298 .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
299 }
300 let cur_ts = cur_tipset.read().shallow_clone();
301 let mpool_ctx = MpoolCtx {
302 api,
303 key_cache,
304 pending,
305 ts: &cur_ts,
306 };
307 for (_, hm) in rmsgs {
308 for (_, msg) in hm {
309 let sequence = mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from())?;
310 if let Err(e) = add_helper(
311 api,
312 bls_sig_cache,
313 pending,
314 key_cache,
315 &cur_ts,
316 msg,
317 sequence,
318 TrustPolicy::Trusted,
319 StrictnessPolicy::Relaxed,
320 ) {
321 error!("Failed to read message from reorg to mpool: {}", e);
322 }
323 }
324 }
325 Ok(())
326}
327
328pub(in crate::message_pool) struct MpoolCtx<'a, T> {
329 pub api: &'a T,
330 pub key_cache: &'a IdToAddressCache,
331 pub pending: &'a SyncRwLock<HashMap<Address, MsgSet>>,
332 pub ts: &'a Tipset,
333}
334
335impl<T: Provider> MpoolCtx<'_, T> {
336 pub(in crate::message_pool) fn remove_from_selected_msgs(
339 &self,
340 from: &Address,
341 sequence: u64,
342 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
343 ) -> Result<(), Error> {
344 if rmsgs
345 .get_mut(from)
346 .and_then(|temp| temp.remove(&sequence))
347 .is_none()
348 && let Ok(resolved) = resolve_to_key(self.api, self.key_cache, from, self.ts)
349 .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}"))
350 {
351 remove(&resolved, self.pending, sequence, true)?;
352 }
353 Ok(())
354 }
355
356 pub(in crate::message_pool) fn get_state_sequence(
359 &self,
360 state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
361 addr: &Address,
362 ) -> Result<u64, Error> {
363 msg_pool::get_state_sequence(self.api, self.key_cache, state_nonce_cache, addr, self.ts)
364 }
365}
366
367pub(in crate::message_pool) fn add_to_selected_msgs(
370 m: SignedMessage,
371 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
372) {
373 rmsgs.entry(m.from()).or_default().insert(m.sequence(), m);
374}
375
376#[cfg(test)]
377pub mod tests {
378 use std::{borrow::BorrowMut, time::Duration};
379
380 use crate::blocks::Tipset;
381 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
382 use crate::message::SignedMessage;
383 use crate::networks::ChainConfig;
384 use crate::shim::{
385 address::Address,
386 crypto::SignatureType,
387 econ::TokenAmount,
388 message::{Message, Message_v3},
389 };
390 use num_traits::Zero;
391 use test_provider::*;
392 use tokio::task::JoinSet;
393
394 use super::*;
395 use crate::message_pool::{
396 msg_chain::{Chains, create_message_chains},
397 msg_pool::MessagePool,
398 };
399
400 struct TestMpool {
401 mpool: MessagePool<TestApi>,
402 wallet: Wallet,
403 sender: Address,
404 target: Address,
405 services: JoinSet<anyhow::Result<()>>,
406 network_rx: flume::Receiver<NetworkMessage>,
407 }
408
409 fn make_test_mpool(
410 tma: TestApi,
411 ) -> (
412 MessagePool<TestApi>,
413 JoinSet<anyhow::Result<()>>,
414 flume::Receiver<NetworkMessage>,
415 ) {
416 let (tx, rx) = flume::bounded(50);
417 let mut services = JoinSet::new();
418 let mpool = MessagePool::new(
419 tma,
420 tx,
421 Default::default(),
422 Default::default(),
423 &mut services,
424 )
425 .unwrap();
426 (mpool, services, rx)
427 }
428
429 fn make_test_setup() -> TestMpool {
430 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
431 let mut wallet = Wallet::new(keystore);
432 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
433 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
434 let tma = TestApi::default();
435 tma.set_state_sequence(&sender, 0);
436 let (mpool, services, network_rx) = make_test_mpool(tma);
437 TestMpool {
438 mpool,
439 wallet,
440 sender,
441 target,
442 services,
443 network_rx,
444 }
445 }
446
447 #[tokio::test]
448 async fn test_per_actor_limit() {
449 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
450 let mut wallet = Wallet::new(keystore);
451 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
452 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
453 let tma = TestApi::with_max_actor_pending_messages(200);
454 tma.set_state_sequence(&sender, 0);
455 let (mpool, _services, _rx) = make_test_mpool(tma);
456
457 let mut smsg_vec = Vec::new();
458 for i in 0..(mpool.api.max_actor_pending_messages() + 1) {
459 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
460 smsg_vec.push(msg);
461 }
462
463 let (last, body) = smsg_vec.split_last().unwrap();
464 for smsg in body {
465 mpool.add(smsg.clone()).unwrap();
466 }
467 assert_eq!(
468 mpool.add(last.clone()),
469 Err(Error::TooManyPendingMessages(sender.to_string(), true))
470 );
471 }
472
473 pub fn create_smsg(
474 to: &Address,
475 from: &Address,
476 wallet: &mut Wallet,
477 sequence: u64,
478 gas_limit: i64,
479 gas_price: u64,
480 ) -> SignedMessage {
481 let umsg: Message = Message_v3 {
482 to: to.into(),
483 from: from.into(),
484 sequence,
485 gas_limit: gas_limit as u64,
486 gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
487 gas_premium: TokenAmount::from_atto(gas_price).into(),
488 ..Message_v3::default()
489 }
490 .into();
491 let msg_signing_bytes = umsg.cid().to_bytes();
492 let sig = wallet.sign(from, msg_signing_bytes.as_slice()).unwrap();
493 SignedMessage::new_unchecked(umsg, sig)
494 }
495
496 pub fn create_fake_smsg(
500 pool: &MessagePool<TestApi>,
501 to: &Address,
502 from: &Address,
503 sequence: u64,
504 gas_limit: i64,
505 gas_price: u64,
506 ) -> SignedMessage {
507 let umsg: Message = Message_v3 {
508 to: to.into(),
509 from: from.into(),
510 sequence,
511 gas_limit: gas_limit as u64,
512 gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
513 gas_premium: TokenAmount::from_atto(gas_price).into(),
514 ..Message_v3::default()
515 }
516 .into();
517 let sig = Signature::new_secp256k1(vec![]);
518 let signed = SignedMessage::new_unchecked(umsg, sig);
519 let cid = signed.cid();
520 pool.sig_val_cache.push(cid.into(), ());
521 signed
522 }
523
524 #[tokio::test]
525 async fn test_message_pool() {
526 let TestMpool {
527 mpool,
528 mut wallet,
529 sender,
530 target,
531 ..
532 } = make_test_setup();
533
534 let mut smsg_vec = Vec::new();
535 for i in 0..2 {
536 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
537 smsg_vec.push(msg);
538 }
539
540 mpool.api.inner.lock().set_state_sequence(&sender, 0);
541 assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
542 mpool.add(smsg_vec[0].clone()).unwrap();
543 assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
544 mpool.add(smsg_vec[1].clone()).unwrap();
545 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
546
547 let a = mock_block(1, 1);
548
549 mpool.api.inner.lock().set_block_messages(&a, smsg_vec);
550 mpool
551 .apply_head_change(Vec::new(), vec![Tipset::from(a)])
552 .await
553 .unwrap();
554
555 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
556 }
557
558 #[tokio::test]
559 async fn test_revert_messages() {
560 let tma = TestApi::default();
561 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
562 let mut wallet = Wallet::new(keystore);
563
564 let a = mock_block(1, 1);
565 let tipset = Tipset::from(&a);
566 let b = mock_block_with_parents(&tipset, 1, 1);
567
568 let sender = wallet.generate_addr(SignatureType::Bls).unwrap();
569 let target = Address::new_id(1001);
570
571 let mut smsg_vec = Vec::new();
572
573 for i in 0..4 {
574 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
575 smsg_vec.push(msg);
576 }
577 let (mpool, _services, _rx) = make_test_mpool(tma);
578
579 {
580 let mut api_temp = mpool.api.inner.lock();
581 api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]);
582 api_temp.set_block_messages(&b.clone(), smsg_vec[1..4].to_vec());
583 api_temp.set_state_sequence(&sender, 0);
584 drop(api_temp);
585 }
586
587 mpool.add(smsg_vec[0].clone()).unwrap();
588 mpool.add(smsg_vec[1].clone()).unwrap();
589 mpool.add(smsg_vec[2].clone()).unwrap();
590 mpool.add(smsg_vec[3].clone()).unwrap();
591
592 mpool.api.set_state_sequence(&sender, 0);
593
594 mpool
595 .apply_head_change(Vec::new(), vec![Tipset::from(a)])
596 .await
597 .unwrap();
598
599 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
600
601 mpool.api.set_state_sequence(&sender, 1);
602
603 mpool
604 .apply_head_change(Vec::new(), vec![Tipset::from(&b)])
605 .await
606 .unwrap();
607
608 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
609
610 mpool.api.set_state_sequence(&sender, 0);
611
612 mpool
613 .apply_head_change(vec![Tipset::from(b)], Vec::new())
614 .await
615 .unwrap();
616
617 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
618
619 let (p, _) = mpool.pending();
620 assert_eq!(p.len(), 3);
621 }
622
623 #[tokio::test]
624 async fn test_get_sequence_resolves_id_address() {
625 let tma = TestApi::default();
626 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
627 let mut wallet = Wallet::new(keystore);
628 let key_addr = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
629 let id_addr = Address::new_id(999);
630
631 tma.set_key_address_mapping(&id_addr, &key_addr);
632 tma.set_state_sequence(&key_addr, 0);
633 let (mpool, _services, _rx) = make_test_mpool(tma);
634
635 let target = Address::new_id(1001);
636 for i in 0..3 {
637 let msg = create_smsg(&target, &key_addr, wallet.borrow_mut(), i, 1000000, 1);
638 mpool.add(msg).unwrap();
639 }
640
641 assert_eq!(mpool.get_sequence(&id_addr).unwrap(), 3);
643 assert_eq!(mpool.get_sequence(&key_addr).unwrap(), 3);
644 }
645
646 #[tokio::test]
647 async fn test_pending_for_resolves_id_address() {
648 let tma = TestApi::default();
649 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
650 let mut wallet = Wallet::new(keystore);
651 let key_addr = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
652 let id_addr = Address::new_id(888);
653
654 tma.set_key_address_mapping(&id_addr, &key_addr);
655 tma.set_state_sequence(&key_addr, 0);
656 let (mpool, _services, _rx) = make_test_mpool(tma);
657
658 let target = Address::new_id(1001);
659 for i in 0..2 {
660 let msg = create_smsg(&target, &key_addr, wallet.borrow_mut(), i, 1000000, 1);
661 mpool.add(msg).unwrap();
662 }
663
664 let msgs = mpool
666 .pending_for(&id_addr)
667 .expect("should find pending messages");
668 assert_eq!(msgs.len(), 2);
669
670 let msgs2 = mpool
672 .pending_for(&key_addr)
673 .expect("should find pending messages");
674 assert_eq!(msgs2.len(), 2);
675 }
676
677 #[tokio::test]
678 async fn test_add_with_id_from_resolves_to_key_in_pending() {
679 let tma = TestApi::default();
680 let key_addr = Address::new_bls(&[11u8; 48]).unwrap();
681 let id_addr = Address::new_id(777);
682
683 tma.set_key_address_mapping(&id_addr, &key_addr);
684 tma.set_state_sequence(&key_addr, 0);
685 let (mpool, _services, _rx) = make_test_mpool(tma);
686
687 let msg = create_fake_smsg(&mpool, &Address::new_id(1001), &id_addr, 0, 1000000, 1);
689 mpool.add(msg).unwrap();
690
691 let pending = mpool.pending.read();
693 assert!(
694 pending.get(&key_addr).is_some(),
695 "pending should be keyed by resolved key address"
696 );
697 assert!(
698 pending.get(&id_addr).is_none(),
699 "pending should NOT have entry under raw ID address"
700 );
701 }
702
703 #[tokio::test]
704 async fn test_head_change_removes_via_resolved_address() {
705 let tma = TestApi::default();
706 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
707 let mut wallet = Wallet::new(keystore);
708 let key_addr = wallet.generate_addr(SignatureType::Bls).unwrap();
709 let id_addr = Address::new_id(555);
710
711 tma.set_key_address_mapping(&id_addr, &key_addr);
712 tma.set_state_sequence(&key_addr, 0);
713
714 let a = mock_block(1, 1);
715 let (mpool, _services, _rx) = make_test_mpool(tma);
716
717 let target = Address::new_id(1001);
718 let msg0 = create_smsg(&target, &key_addr, wallet.borrow_mut(), 0, 1000000, 1);
719 let msg1 = create_smsg(&target, &key_addr, wallet.borrow_mut(), 1, 1000000, 1);
720 mpool.add(msg0.clone()).unwrap();
721 mpool.add(msg1).unwrap();
722 assert_eq!(mpool.get_sequence(&key_addr).unwrap(), 2);
723
724 mpool.api.inner.lock().set_block_messages(&a, vec![msg0]);
727
728 mpool.api.set_state_sequence(&key_addr, 1);
729
730 mpool
731 .apply_head_change(Vec::new(), vec![Tipset::from(a)])
732 .await
733 .unwrap();
734
735 assert_eq!(mpool.get_sequence(&id_addr).unwrap(), 2);
737 let msgs = mpool
738 .pending_for(&key_addr)
739 .expect("should have remaining msg");
740 assert_eq!(msgs.len(), 1);
741 assert_eq!(msgs[0].sequence(), 1);
742 }
743
744 #[tokio::test]
745 async fn test_async_message_pool() {
746 let TestMpool {
747 mpool,
748 mut wallet,
749 sender,
750 target,
751 services: _services,
752 network_rx: _network_rx,
753 } = make_test_setup();
754
755 let mut smsg_vec = Vec::new();
756 for i in 0..3 {
757 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
758 smsg_vec.push(msg);
759 }
760
761 assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
762 mpool.push(smsg_vec[0].clone()).await.unwrap();
763 assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
764 mpool.push(smsg_vec[1].clone()).await.unwrap();
765 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
766 mpool.push(smsg_vec[2].clone()).await.unwrap();
767 assert_eq!(mpool.get_sequence(&sender).unwrap(), 3);
768
769 let header = mock_block(1, 1);
770 let tipset = Tipset::from(&header.clone());
771
772 mpool.api.set_heaviest_tipset(tipset.clone());
773
774 tokio::time::sleep(Duration::new(2, 0)).await;
776
777 let cur_ts = mpool.current_tipset();
778 assert_eq!(cur_ts, tipset);
779 }
780
781 #[tokio::test]
782 async fn test_msg_chains() {
783 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
784 let mut wallet = Wallet::new(keystore);
785 let a1 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
786 let a2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
787 let tma = TestApi::default();
788 let gas_limit = 6955002;
789
790 let a = mock_block(1, 1);
791 let ts = Tipset::from(a);
792 let chain_config = ChainConfig::default();
793
794 let mut mset = HashMap::new();
798 let mut smsg_vec = Vec::new();
799 for i in 0..10 {
800 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
801 smsg_vec.push(msg.clone());
802 mset.insert(i, msg);
803 }
804
805 let mut chains = Chains::new();
806 create_message_chains(
807 &tma,
808 &a1,
809 &mset,
810 &TokenAmount::zero(),
811 &ts,
812 &mut chains,
813 &chain_config,
814 )
815 .unwrap();
816 assert_eq!(chains.len(), 1, "expected a single chain");
817 assert_eq!(
818 chains[0].msgs.len(),
819 10,
820 "expected 10 messages in single chain, got: {}",
821 chains[0].msgs.len()
822 );
823 for (i, m) in chains[0].msgs.iter().enumerate() {
824 assert_eq!(
825 m.sequence(),
826 i as u64,
827 "expected sequence {} but got {}",
828 i,
829 m.sequence()
830 );
831 }
832
833 let mut mset = HashMap::new();
836 let mut smsg_vec = Vec::new();
837 for i in 0..10 {
838 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 10 - i);
839 smsg_vec.push(msg.clone());
840 mset.insert(i, msg);
841 }
842 let mut chains = Chains::new();
843 create_message_chains(
844 &tma,
845 &a1,
846 &mset,
847 &TokenAmount::zero(),
848 &ts,
849 &mut chains,
850 &chain_config,
851 )
852 .unwrap();
853 assert_eq!(chains.len(), 10, "expected 10 chains");
854
855 for i in 0..chains.len() {
856 assert_eq!(
857 chains[i].msgs.len(),
858 1,
859 "expected 1 message in chain {} but got {}",
860 i,
861 chains[i].msgs.len()
862 );
863 }
864
865 for i in 0..chains.len() {
866 let m = &chains[i].msgs[0];
867 assert_eq!(
868 m.sequence(),
869 i as u64,
870 "expected sequence {} but got {}",
871 i,
872 m.sequence()
873 );
874 }
875
876 let mut mset = HashMap::new();
880 let mut smsg_vec = Vec::new();
881 for i in 0..10 {
882 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i % 3);
883 smsg_vec.push(msg.clone());
884 mset.insert(i, msg);
885 }
886 let mut chains = Chains::new();
887 create_message_chains(
888 &tma,
889 &a1,
890 &mset,
891 &TokenAmount::zero(),
892 &ts,
893 &mut chains,
894 &chain_config,
895 )
896 .unwrap();
897 assert_eq!(chains.len(), 2, "expected 2 chains");
898 assert_eq!(chains[0].msgs.len(), 9);
899 assert_eq!(chains[1].msgs.len(), 1);
900 let mut next_nonce = 0;
901 for i in 0..chains.len() {
902 for m in chains[i].msgs.iter() {
903 assert_eq!(
904 next_nonce,
905 m.sequence(),
906 "expected nonce {} but got {}",
907 next_nonce,
908 m.sequence()
909 );
910 next_nonce += 1;
911 }
912 }
913
914 let mut mset = HashMap::new();
919 let mut smsg_vec = Vec::new();
920 for i in 0..10 {
921 let bias = (12 - i) / 3;
922 let msg = create_smsg(
923 &a2,
924 &a1,
925 wallet.borrow_mut(),
926 i,
927 gas_limit,
928 1 + i % 3 + bias,
929 );
930 smsg_vec.push(msg.clone());
931 mset.insert(i, msg);
932 }
933
934 let mut chains = Chains::new();
935 create_message_chains(
936 &tma,
937 &a1,
938 &mset,
939 &TokenAmount::zero(),
940 &ts,
941 &mut chains,
942 &chain_config,
943 )
944 .unwrap();
945
946 for i in 0..chains.len() {
947 let expected_len = if i > 2 { 1 } else { 3 };
948 assert_eq!(
949 chains[i].msgs.len(),
950 expected_len,
951 "expected {} message in chain {} but got {}",
952 expected_len,
953 i,
954 chains[i].msgs.len()
955 );
956 }
957
958 let mut next_nonce = 0;
959 for i in 0..chains.len() {
960 for m in chains[i].msgs.iter() {
961 assert_eq!(
962 next_nonce,
963 m.sequence(),
964 "expected nonce {} but got {}",
965 next_nonce,
966 m.sequence()
967 );
968 next_nonce += 1;
969 }
970 }
971
972 let mut mset = HashMap::new();
976 let mut smsg_vec = Vec::new();
977 for i in 0..10 {
978 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i * 2, gas_limit, 1 + i);
979 smsg_vec.push(msg.clone());
980 mset.insert(i, msg);
981 }
982
983 let mut chains = Chains::new();
984 create_message_chains(
985 &tma,
986 &a1,
987 &mset,
988 &TokenAmount::zero(),
989 &ts,
990 &mut chains,
991 &chain_config,
992 )
993 .unwrap();
994 assert_eq!(chains.len(), 1, "expected a single chain");
995 for (i, m) in chains[0].msgs.iter().enumerate() {
996 assert_eq!(
997 m.sequence(),
998 i as u64,
999 "expected nonce {} but got {}",
1000 i,
1001 m.sequence()
1002 );
1003 }
1004
1005 let mut mset = HashMap::new();
1009 let mut smsg_vec = Vec::new();
1010 tma.set_state_balance_raw(&a1, TokenAmount::from_atto(1_000_000_000_000_000_000_u64));
1011 for i in 0..10 {
1012 let msg = if i != 5 {
1013 create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i)
1014 } else {
1015 create_smsg(&a2, &a1, wallet.borrow_mut(), i, 1, 1 + i)
1016 };
1017 smsg_vec.push(msg.clone());
1018 mset.insert(i, msg);
1019 }
1020 let mut chains = Chains::new();
1021 create_message_chains(
1022 &tma,
1023 &a1,
1024 &mset,
1025 &TokenAmount::zero(),
1026 &ts,
1027 &mut chains,
1028 &chain_config,
1029 )
1030 .unwrap();
1031 assert_eq!(chains.len(), 1, "expected a single chain");
1032 assert_eq!(chains[0].msgs.len(), 5);
1033 for (i, m) in chains[0].msgs.iter().enumerate() {
1034 assert_eq!(
1035 m.sequence(),
1036 i as u64,
1037 "expected nonce {} but got {}",
1038 i,
1039 m.sequence()
1040 );
1041 }
1042
1043 let mut mset = HashMap::new();
1047 let mut smsg_vec = Vec::new();
1048 let max_messages = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / gas_limit;
1049 let n_messages = max_messages + 1;
1050 for i in 0..n_messages {
1051 let msg = create_smsg(
1052 &a2,
1053 &a1,
1054 wallet.borrow_mut(),
1055 i as u64,
1056 gas_limit,
1057 (1 + i) as u64,
1058 );
1059 smsg_vec.push(msg.clone());
1060 mset.insert(i as u64, msg);
1061 }
1062 let mut chains = Chains::new();
1063 create_message_chains(
1064 &tma,
1065 &a1,
1066 &mset,
1067 &TokenAmount::zero(),
1068 &ts,
1069 &mut chains,
1070 &chain_config,
1071 )
1072 .unwrap();
1073 assert_eq!(chains.len(), 1, "expected a single chain");
1074 assert_eq!(chains[0].msgs.len(), max_messages as usize);
1075 for (i, m) in chains[0].msgs.iter().enumerate() {
1076 assert_eq!(
1077 m.sequence(),
1078 i as u64,
1079 "expected nonce {} but got {}",
1080 i,
1081 m.sequence()
1082 );
1083 }
1084
1085 tma.set_state_balance_raw(&a1, TokenAmount::from_atto(300 * gas_limit + 1));
1087 let mut mset = HashMap::new();
1088 let mut smsg_vec = Vec::new();
1089 for i in 0..10 {
1090 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
1091 smsg_vec.push(msg.clone());
1092 mset.insert(i, msg);
1093 }
1094 let mut chains = Chains::new();
1095 create_message_chains(
1096 &tma,
1097 &a1,
1098 &mset,
1099 &TokenAmount::zero(),
1100 &ts,
1101 &mut chains,
1102 &chain_config,
1103 )
1104 .unwrap();
1105 assert_eq!(chains.len(), 1, "expected a single chain");
1106 assert_eq!(chains[0].msgs.len(), 2);
1107 for (i, m) in chains[0].msgs.iter().enumerate() {
1108 assert_eq!(
1109 m.sequence(),
1110 i as u64,
1111 "expected nonce {} but got {}",
1112 i,
1113 m.sequence()
1114 );
1115 }
1116 }
1117}