1pub(in crate::message_pool) mod events;
5pub(in crate::message_pool) mod metrics;
6pub(in crate::message_pool) mod msg_pool;
7pub(in crate::message_pool) mod msg_set;
8pub(in crate::message_pool) mod pending_store;
9pub(in crate::message_pool) mod provider;
10pub mod selection;
11#[cfg(test)]
12pub mod test_provider;
13pub(in crate::message_pool) mod utils;
14#[allow(unused_imports)]
16pub use events::MpoolUpdate;
17
18use std::{borrow::BorrowMut, cmp::Ordering};
19
20use crate::blocks::Tipset;
21use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
22use crate::message::{MessageRead as _, SignedMessage};
23use crate::networks::ChainConfig;
24use crate::shim::{address::Address, crypto::Signature};
25use crate::state_manager::IdToAddressCache;
26use crate::utils::ShallowClone as _;
27use crate::utils::cache::SizeTrackingLruCache;
28use crate::utils::get_size::CidWrapper;
29use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
30use cid::Cid;
31use fvm_ipld_encoding::to_vec;
32use parking_lot::RwLock as SyncRwLock;
33use tracing::error;
34use utils::{get_base_fee_lower_bound, recover_sig};
35
36use super::errors::Error;
37use crate::message_pool::{
38 msg_chain::{Chains, create_message_chains},
39 msg_pool::{StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key},
40 msgpool::pending_store::PendingStore,
41 provider::Provider,
42};
43
44const REPLACE_BY_FEE_RATIO: f32 = 1.25;
45const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64;
46const RBF_DENOM: u64 = 256;
47const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100;
48const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10;
49const REPUB_MSG_LIMIT: usize = 30;
50const MIN_GAS: u64 = 1298450;
51
52#[allow(clippy::too_many_arguments)]
53async fn republish_pending_messages<T>(
54 api: &T,
55 network_sender: &flume::Sender<NetworkMessage>,
56 pending_store: &PendingStore,
57 cur_tipset: &SyncRwLock<Tipset>,
58 republished: &SyncRwLock<HashSet<Cid>>,
59 local_addrs: &SyncRwLock<Vec<Address>>,
60 key_cache: &IdToAddressCache,
61 chain_config: &ChainConfig,
62) -> Result<(), Error>
63where
64 T: Provider,
65{
66 let ts = cur_tipset.read().shallow_clone();
67 let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
68
69 republished.write().clear();
70
71 for actor in local_addrs.read().iter() {
74 let Ok(resolved) = resolve_to_key(api, key_cache, actor, &ts).inspect_err(|e| {
75 tracing::debug!(%actor, "republish: failed to resolve address: {e:#}");
76 }) else {
77 continue;
78 };
79 if let Some(mset) = pending_store.snapshot_for(&resolved) {
80 if mset.msgs.is_empty() {
81 continue;
82 }
83 pending_map.insert(resolved, mset.msgs);
84 }
85 }
86
87 let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?;
88
89 let network_name = chain_config.network.genesis_name();
90 for m in msgs.iter() {
91 let mb = to_vec(m)?;
92 network_sender
93 .send_async(NetworkMessage::PubsubMessage {
94 topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")),
95 message: mb,
96 })
97 .await
98 .map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
99 }
100
101 let mut republished_t = HashSet::new();
102 for m in msgs.iter() {
103 republished_t.insert(m.cid());
104 }
105 *republished.write() = republished_t;
106
107 Ok(())
108}
109
110fn select_messages_for_block<T>(
115 api: &T,
116 chain_config: &ChainConfig,
117 base: &Tipset,
118 pending: HashMap<Address, HashMap<u64, SignedMessage>>,
119) -> Result<Vec<SignedMessage>, Error>
120where
121 T: Provider,
122{
123 let mut msgs: Vec<SignedMessage> = vec![];
124
125 let base_fee = api.chain_compute_base_fee(base)?;
126 let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR);
127
128 if pending.is_empty() {
129 return Ok(msgs);
130 }
131
132 let mut chains = Chains::new();
133 for (actor, mset) in pending.iter() {
134 create_message_chains(
135 api,
136 actor,
137 mset,
138 &base_fee_lower_bound,
139 base,
140 &mut chains,
141 chain_config,
142 )?;
143 }
144
145 if chains.is_empty() {
146 return Ok(msgs);
147 }
148
149 chains.sort(false);
150
151 let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
152 let mut i = 0;
153 'l: while let Some(chain) = chains.get_mut_at(i) {
154 if msgs.len() > REPUB_MSG_LIMIT {
156 break;
157 }
158
159 if gas_limit <= MIN_GAS {
160 break;
161 }
162
163 if !chain.valid {
165 i += 1;
166 continue;
167 }
168
169 if chain.gas_limit <= gas_limit {
171 for m in chain.msgs.iter() {
174 if m.gas_fee_cap() < base_fee_lower_bound {
175 let key = chains.get_key_at(i);
176 chains.invalidate(key);
177 continue 'l;
178 }
179 gas_limit = gas_limit.saturating_sub(m.gas_limit());
180 msgs.push(m.clone());
181 }
182
183 i += 1;
184 continue;
185 }
186
187 chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee);
190 let mut j = i;
191 while j < chains.len() - 1 {
192 #[allow(clippy::indexing_slicing)]
193 if chains[j].compare(&chains[j + 1]) == Ordering::Less {
194 break;
195 }
196 chains.key_vec.swap(i, i + 1);
197 j += 1;
198 }
199 }
200
201 if msgs.len() > REPUB_MSG_LIMIT {
202 msgs.truncate(REPUB_MSG_LIMIT);
203 }
204
205 Ok(msgs)
206}
207
208#[allow(clippy::too_many_arguments)]
220pub(in crate::message_pool) async fn head_change<T>(
221 api: &T,
222 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
223 repub_trigger: flume::Sender<()>,
224 republished: &SyncRwLock<HashSet<Cid>>,
225 pending_store: &PendingStore,
226 cur_tipset: &SyncRwLock<Tipset>,
227 key_cache: &IdToAddressCache,
228 state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
229 revert: Vec<Tipset>,
230 apply: Vec<Tipset>,
231) -> Result<(), Error>
232where
233 T: Provider + 'static,
234{
235 let mut repub = false;
236 let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
237 for ts in revert {
238 let Ok(pts) = api.load_tipset(ts.parents()) else {
239 tracing::error!("error loading reverted tipset parent");
240 continue;
241 };
242 *cur_tipset.write() = pts;
243
244 let mut msgs: Vec<SignedMessage> = Vec::new();
245 for block in ts.block_headers() {
246 let Ok((umsg, smsgs)) = api.messages_for_block(block) else {
247 tracing::error!("error retrieving messages for reverted block");
248 continue;
249 };
250 msgs.extend(smsgs);
251 for msg in umsg {
252 let msg_cid = msg.cid();
253 let Ok(smsg) = recover_sig(bls_sig_cache, msg) else {
254 tracing::debug!("could not recover signature for bls message {}", msg_cid);
255 continue;
256 };
257 msgs.push(smsg)
258 }
259 }
260
261 for msg in msgs {
262 add_to_selected_msgs(msg, rmsgs.borrow_mut());
263 }
264 }
265
266 for ts in apply {
267 let mpool_ctx = MpoolCtx {
268 api,
269 key_cache,
270 pending_store,
271 ts: &ts,
272 };
273 for b in ts.block_headers() {
274 let Ok((msgs, smsgs)) = api.messages_for_block(b) else {
275 tracing::error!("error retrieving messages for block");
276 continue;
277 };
278
279 for msg in smsgs {
280 mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?;
281 if !repub && republished.write().insert(msg.cid()) {
282 repub = true;
283 }
284 }
285 for msg in msgs {
286 mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?;
287 if !repub && republished.write().insert(msg.cid()) {
288 repub = true;
289 }
290 }
291 }
292 *cur_tipset.write() = ts;
293 }
294 if repub {
295 repub_trigger
296 .send_async(())
297 .await
298 .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
299 }
300 let cur_ts = cur_tipset.read().shallow_clone();
301 let mpool_ctx = MpoolCtx {
302 api,
303 key_cache,
304 pending_store,
305 ts: &cur_ts,
306 };
307 for (_, hm) in rmsgs {
308 for (_, msg) in hm {
309 let sequence = match mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from()) {
310 Ok(seq) => seq,
311 Err(e) => {
312 tracing::debug!("Failed to get the state sequence: {}", e);
313 continue;
314 }
315 };
316 if let Err(e) = add_helper(
317 api,
318 bls_sig_cache,
319 pending_store,
320 key_cache,
321 &cur_ts,
322 msg,
323 sequence,
324 TrustPolicy::Trusted,
325 StrictnessPolicy::Relaxed,
326 ) {
327 error!("Failed to read message from reorg to mpool: {}", e);
328 }
329 }
330 }
331 Ok(())
332}
333
334pub(in crate::message_pool) struct MpoolCtx<'a, T> {
335 pub api: &'a T,
336 pub key_cache: &'a IdToAddressCache,
337 pub pending_store: &'a PendingStore,
338 pub ts: &'a Tipset,
339}
340
341impl<T: Provider> MpoolCtx<'_, T> {
342 pub(in crate::message_pool) fn remove_from_selected_msgs(
345 &self,
346 from: &Address,
347 sequence: u64,
348 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
349 ) -> Result<(), Error> {
350 if rmsgs
351 .get_mut(from)
352 .and_then(|temp| temp.remove(&sequence))
353 .is_none()
354 && let Ok(resolved) = resolve_to_key(self.api, self.key_cache, from, self.ts)
355 .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}"))
356 {
357 let _ = self.pending_store.remove(&resolved, sequence, true);
358 }
359 Ok(())
360 }
361
362 pub(in crate::message_pool) fn get_state_sequence(
365 &self,
366 state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
367 addr: &Address,
368 ) -> Result<u64, Error> {
369 msg_pool::get_state_sequence(self.api, self.key_cache, state_nonce_cache, addr, self.ts)
370 }
371}
372
373pub(in crate::message_pool) fn add_to_selected_msgs(
376 m: SignedMessage,
377 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
378) {
379 rmsgs.entry(m.from()).or_default().insert(m.sequence(), m);
380}
381
382#[cfg(test)]
383pub mod tests {
384 use std::{borrow::BorrowMut, time::Duration};
385
386 use crate::blocks::Tipset;
387 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
388 use crate::message::SignedMessage;
389 use crate::networks::ChainConfig;
390 use crate::shim::{
391 address::Address,
392 crypto::SignatureType,
393 econ::TokenAmount,
394 message::{Message, Message_v3},
395 };
396 use num_traits::Zero;
397 use test_provider::*;
398 use tokio::task::JoinSet;
399
400 use super::*;
401 use crate::message_pool::{
402 msg_chain::{Chains, create_message_chains},
403 msg_pool::MessagePool,
404 };
405
406 struct TestMpool {
407 mpool: MessagePool<TestApi>,
408 wallet: Wallet,
409 sender: Address,
410 target: Address,
411 services: JoinSet<anyhow::Result<()>>,
412 network_rx: flume::Receiver<NetworkMessage>,
413 }
414
415 fn make_test_mpool(
416 tma: TestApi,
417 ) -> (
418 MessagePool<TestApi>,
419 JoinSet<anyhow::Result<()>>,
420 flume::Receiver<NetworkMessage>,
421 ) {
422 let (tx, rx) = flume::bounded(50);
423 let mut services = JoinSet::new();
424 let mpool = MessagePool::new(
425 tma,
426 tx,
427 Default::default(),
428 Default::default(),
429 &mut services,
430 )
431 .unwrap();
432 (mpool, services, rx)
433 }
434
435 fn make_test_setup() -> TestMpool {
436 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
437 let mut wallet = Wallet::new(keystore);
438 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
439 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
440 let tma = TestApi::default();
441 tma.set_state_sequence(&sender, 0);
442 let (mpool, services, network_rx) = make_test_mpool(tma);
443 TestMpool {
444 mpool,
445 wallet,
446 sender,
447 target,
448 services,
449 network_rx,
450 }
451 }
452
453 #[tokio::test]
454 async fn test_per_actor_limit() {
455 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
456 let mut wallet = Wallet::new(keystore);
457 let sender = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
458 let target = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
459 let tma = TestApi::with_max_actor_pending_messages(200);
460 tma.set_state_sequence(&sender, 0);
461 let (mpool, _services, _rx) = make_test_mpool(tma);
462
463 let mut smsg_vec = Vec::new();
464 for i in 0..(mpool.api.max_actor_pending_messages() + 1) {
465 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
466 smsg_vec.push(msg);
467 }
468
469 let (last, body) = smsg_vec.split_last().unwrap();
470 for smsg in body {
471 mpool.add(smsg.clone()).unwrap();
472 }
473 assert_eq!(
474 mpool.add(last.clone()),
475 Err(Error::TooManyPendingMessages(sender.to_string(), true))
476 );
477 }
478
479 pub fn create_smsg(
480 to: &Address,
481 from: &Address,
482 wallet: &mut Wallet,
483 sequence: u64,
484 gas_limit: i64,
485 gas_price: u64,
486 ) -> SignedMessage {
487 let umsg: Message = Message_v3 {
488 to: to.into(),
489 from: from.into(),
490 sequence,
491 gas_limit: gas_limit as u64,
492 gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
493 gas_premium: TokenAmount::from_atto(gas_price).into(),
494 ..Message_v3::default()
495 }
496 .into();
497 let msg_signing_bytes = umsg.cid().to_bytes();
498 let sig = wallet.sign(from, msg_signing_bytes.as_slice()).unwrap();
499 SignedMessage::new_unchecked(umsg, sig)
500 }
501
502 pub fn create_fake_smsg(
506 pool: &MessagePool<TestApi>,
507 to: &Address,
508 from: &Address,
509 sequence: u64,
510 gas_limit: i64,
511 gas_price: u64,
512 ) -> SignedMessage {
513 let umsg: Message = Message_v3 {
514 to: to.into(),
515 from: from.into(),
516 sequence,
517 gas_limit: gas_limit as u64,
518 gas_fee_cap: TokenAmount::from_atto(gas_price + 100).into(),
519 gas_premium: TokenAmount::from_atto(gas_price).into(),
520 ..Message_v3::default()
521 }
522 .into();
523 let sig = Signature::new_secp256k1(vec![]);
524 let signed = SignedMessage::new_unchecked(umsg, sig);
525 let cid = signed.cid();
526 pool.sig_val_cache.push(cid.into(), ());
527 signed
528 }
529
530 #[tokio::test]
531 async fn test_message_pool() {
532 let TestMpool {
533 mpool,
534 mut wallet,
535 sender,
536 target,
537 ..
538 } = make_test_setup();
539
540 let mut smsg_vec = Vec::new();
541 for i in 0..2 {
542 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
543 smsg_vec.push(msg);
544 }
545
546 mpool.api.inner.lock().set_state_sequence(&sender, 0);
547 assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
548 mpool.add(smsg_vec[0].clone()).unwrap();
549 assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
550 mpool.add(smsg_vec[1].clone()).unwrap();
551 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
552
553 let a = mock_block(1, 1);
554
555 mpool.api.inner.lock().set_block_messages(&a, smsg_vec);
556 mpool
557 .apply_head_change(Vec::new(), vec![Tipset::from(a)])
558 .await
559 .unwrap();
560
561 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
562 }
563
564 #[tokio::test]
565 async fn test_revert_messages() {
566 let tma = TestApi::default();
567 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
568 let mut wallet = Wallet::new(keystore);
569
570 let a = mock_block(1, 1);
571 let tipset = Tipset::from(&a);
572 let b = mock_block_with_parents(&tipset, 1, 1);
573
574 let sender = wallet.generate_addr(SignatureType::Bls).unwrap();
575 let target = Address::new_id(1001);
576
577 let mut smsg_vec = Vec::new();
578
579 for i in 0..4 {
580 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
581 smsg_vec.push(msg);
582 }
583 let (mpool, _services, _rx) = make_test_mpool(tma);
584
585 {
586 let mut api_temp = mpool.api.inner.lock();
587 api_temp.set_block_messages(&a, vec![smsg_vec[0].clone()]);
588 api_temp.set_block_messages(&b.clone(), smsg_vec[1..4].to_vec());
589 api_temp.set_state_sequence(&sender, 0);
590 drop(api_temp);
591 }
592
593 mpool.add(smsg_vec[0].clone()).unwrap();
594 mpool.add(smsg_vec[1].clone()).unwrap();
595 mpool.add(smsg_vec[2].clone()).unwrap();
596 mpool.add(smsg_vec[3].clone()).unwrap();
597
598 mpool.api.set_state_sequence(&sender, 0);
599
600 mpool
601 .apply_head_change(Vec::new(), vec![Tipset::from(a)])
602 .await
603 .unwrap();
604
605 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
606
607 mpool.api.set_state_sequence(&sender, 1);
608
609 mpool
610 .apply_head_change(Vec::new(), vec![Tipset::from(&b)])
611 .await
612 .unwrap();
613
614 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
615
616 mpool.api.set_state_sequence(&sender, 0);
617
618 mpool
619 .apply_head_change(vec![Tipset::from(b)], Vec::new())
620 .await
621 .unwrap();
622
623 assert_eq!(mpool.get_sequence(&sender).unwrap(), 4);
624
625 let (p, _) = mpool.pending();
626 assert_eq!(p.len(), 3);
627 }
628
629 #[tokio::test]
630 async fn test_get_sequence_resolves_id_address() {
631 let tma = TestApi::default();
632 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
633 let mut wallet = Wallet::new(keystore);
634 let key_addr = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
635 let id_addr = Address::new_id(999);
636
637 tma.set_key_address_mapping(&id_addr, &key_addr);
638 tma.set_state_sequence(&key_addr, 0);
639 let (mpool, _services, _rx) = make_test_mpool(tma);
640
641 let target = Address::new_id(1001);
642 for i in 0..3 {
643 let msg = create_smsg(&target, &key_addr, wallet.borrow_mut(), i, 1000000, 1);
644 mpool.add(msg).unwrap();
645 }
646
647 assert_eq!(mpool.get_sequence(&id_addr).unwrap(), 3);
649 assert_eq!(mpool.get_sequence(&key_addr).unwrap(), 3);
650 }
651
652 #[tokio::test]
653 async fn test_pending_for_resolves_id_address() {
654 let tma = TestApi::default();
655 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
656 let mut wallet = Wallet::new(keystore);
657 let key_addr = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
658 let id_addr = Address::new_id(888);
659
660 tma.set_key_address_mapping(&id_addr, &key_addr);
661 tma.set_state_sequence(&key_addr, 0);
662 let (mpool, _services, _rx) = make_test_mpool(tma);
663
664 let target = Address::new_id(1001);
665 for i in 0..2 {
666 let msg = create_smsg(&target, &key_addr, wallet.borrow_mut(), i, 1000000, 1);
667 mpool.add(msg).unwrap();
668 }
669
670 let msgs = mpool
672 .pending_for(&id_addr)
673 .expect("should find pending messages");
674 assert_eq!(msgs.len(), 2);
675
676 let msgs2 = mpool
678 .pending_for(&key_addr)
679 .expect("should find pending messages");
680 assert_eq!(msgs2.len(), 2);
681 }
682
683 #[tokio::test]
684 async fn test_add_with_id_from_resolves_to_key_in_pending() {
685 let tma = TestApi::default();
686 let key_addr = Address::new_bls(&[11u8; 48]).unwrap();
687 let id_addr = Address::new_id(777);
688
689 tma.set_key_address_mapping(&id_addr, &key_addr);
690 tma.set_state_sequence(&key_addr, 0);
691 let (mpool, _services, _rx) = make_test_mpool(tma);
692
693 let msg = create_fake_smsg(&mpool, &Address::new_id(1001), &id_addr, 0, 1000000, 1);
695 mpool.add(msg).unwrap();
696
697 assert!(
699 mpool.pending_store.snapshot_for(&key_addr).is_some(),
700 "pending should be keyed by resolved key address"
701 );
702 assert!(
703 mpool.pending_store.snapshot_for(&id_addr).is_none(),
704 "pending should NOT have entry under raw ID address"
705 );
706 }
707
708 #[tokio::test]
709 async fn test_head_change_removes_via_resolved_address() {
710 let tma = TestApi::default();
711 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
712 let mut wallet = Wallet::new(keystore);
713 let key_addr = wallet.generate_addr(SignatureType::Bls).unwrap();
714 let id_addr = Address::new_id(555);
715
716 tma.set_key_address_mapping(&id_addr, &key_addr);
717 tma.set_state_sequence(&key_addr, 0);
718
719 let a = mock_block(1, 1);
720 let (mpool, _services, _rx) = make_test_mpool(tma);
721
722 let target = Address::new_id(1001);
723 let msg0 = create_smsg(&target, &key_addr, wallet.borrow_mut(), 0, 1000000, 1);
724 let msg1 = create_smsg(&target, &key_addr, wallet.borrow_mut(), 1, 1000000, 1);
725 mpool.add(msg0.clone()).unwrap();
726 mpool.add(msg1).unwrap();
727 assert_eq!(mpool.get_sequence(&key_addr).unwrap(), 2);
728
729 mpool.api.inner.lock().set_block_messages(&a, vec![msg0]);
732
733 mpool.api.set_state_sequence(&key_addr, 1);
734
735 mpool
736 .apply_head_change(Vec::new(), vec![Tipset::from(a)])
737 .await
738 .unwrap();
739
740 assert_eq!(mpool.get_sequence(&id_addr).unwrap(), 2);
742 let msgs = mpool
743 .pending_for(&key_addr)
744 .expect("should have remaining msg");
745 assert_eq!(msgs.len(), 1);
746 assert_eq!(msgs[0].sequence(), 1);
747 }
748
749 #[tokio::test]
750 async fn test_async_message_pool() {
751 let TestMpool {
752 mpool,
753 mut wallet,
754 sender,
755 target,
756 services: _services,
757 network_rx: _network_rx,
758 } = make_test_setup();
759
760 let mut smsg_vec = Vec::new();
761 for i in 0..3 {
762 let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
763 smsg_vec.push(msg);
764 }
765
766 assert_eq!(mpool.get_sequence(&sender).unwrap(), 0);
767 mpool.push(smsg_vec[0].clone()).await.unwrap();
768 assert_eq!(mpool.get_sequence(&sender).unwrap(), 1);
769 mpool.push(smsg_vec[1].clone()).await.unwrap();
770 assert_eq!(mpool.get_sequence(&sender).unwrap(), 2);
771 mpool.push(smsg_vec[2].clone()).await.unwrap();
772 assert_eq!(mpool.get_sequence(&sender).unwrap(), 3);
773
774 let header = mock_block(1, 1);
775 let tipset = Tipset::from(&header.clone());
776
777 mpool.api.set_heaviest_tipset(tipset.clone());
778
779 tokio::time::sleep(Duration::new(2, 0)).await;
781
782 let cur_ts = mpool.current_tipset();
783 assert_eq!(cur_ts, tipset);
784 }
785
786 #[tokio::test]
787 async fn test_msg_chains() {
788 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
789 let mut wallet = Wallet::new(keystore);
790 let a1 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
791 let a2 = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
792 let tma = TestApi::default();
793 let gas_limit = 6955002;
794
795 let a = mock_block(1, 1);
796 let ts = Tipset::from(a);
797 let chain_config = ChainConfig::default();
798
799 let mut mset = HashMap::new();
803 let mut smsg_vec = Vec::new();
804 for i in 0..10 {
805 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
806 smsg_vec.push(msg.clone());
807 mset.insert(i, msg);
808 }
809
810 let mut chains = Chains::new();
811 create_message_chains(
812 &tma,
813 &a1,
814 &mset,
815 &TokenAmount::zero(),
816 &ts,
817 &mut chains,
818 &chain_config,
819 )
820 .unwrap();
821 assert_eq!(chains.len(), 1, "expected a single chain");
822 assert_eq!(
823 chains[0].msgs.len(),
824 10,
825 "expected 10 messages in single chain, got: {}",
826 chains[0].msgs.len()
827 );
828 for (i, m) in chains[0].msgs.iter().enumerate() {
829 assert_eq!(
830 m.sequence(),
831 i as u64,
832 "expected sequence {} but got {}",
833 i,
834 m.sequence()
835 );
836 }
837
838 let mut mset = HashMap::new();
841 let mut smsg_vec = Vec::new();
842 for i in 0..10 {
843 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 10 - i);
844 smsg_vec.push(msg.clone());
845 mset.insert(i, msg);
846 }
847 let mut chains = Chains::new();
848 create_message_chains(
849 &tma,
850 &a1,
851 &mset,
852 &TokenAmount::zero(),
853 &ts,
854 &mut chains,
855 &chain_config,
856 )
857 .unwrap();
858 assert_eq!(chains.len(), 10, "expected 10 chains");
859
860 for i in 0..chains.len() {
861 assert_eq!(
862 chains[i].msgs.len(),
863 1,
864 "expected 1 message in chain {} but got {}",
865 i,
866 chains[i].msgs.len()
867 );
868 }
869
870 for i in 0..chains.len() {
871 let m = &chains[i].msgs[0];
872 assert_eq!(
873 m.sequence(),
874 i as u64,
875 "expected sequence {} but got {}",
876 i,
877 m.sequence()
878 );
879 }
880
881 let mut mset = HashMap::new();
885 let mut smsg_vec = Vec::new();
886 for i in 0..10 {
887 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i % 3);
888 smsg_vec.push(msg.clone());
889 mset.insert(i, msg);
890 }
891 let mut chains = Chains::new();
892 create_message_chains(
893 &tma,
894 &a1,
895 &mset,
896 &TokenAmount::zero(),
897 &ts,
898 &mut chains,
899 &chain_config,
900 )
901 .unwrap();
902 assert_eq!(chains.len(), 2, "expected 2 chains");
903 assert_eq!(chains[0].msgs.len(), 9);
904 assert_eq!(chains[1].msgs.len(), 1);
905 let mut next_nonce = 0;
906 for i in 0..chains.len() {
907 for m in chains[i].msgs.iter() {
908 assert_eq!(
909 next_nonce,
910 m.sequence(),
911 "expected nonce {} but got {}",
912 next_nonce,
913 m.sequence()
914 );
915 next_nonce += 1;
916 }
917 }
918
919 let mut mset = HashMap::new();
924 let mut smsg_vec = Vec::new();
925 for i in 0..10 {
926 let bias = (12 - i) / 3;
927 let msg = create_smsg(
928 &a2,
929 &a1,
930 wallet.borrow_mut(),
931 i,
932 gas_limit,
933 1 + i % 3 + bias,
934 );
935 smsg_vec.push(msg.clone());
936 mset.insert(i, msg);
937 }
938
939 let mut chains = Chains::new();
940 create_message_chains(
941 &tma,
942 &a1,
943 &mset,
944 &TokenAmount::zero(),
945 &ts,
946 &mut chains,
947 &chain_config,
948 )
949 .unwrap();
950
951 for i in 0..chains.len() {
952 let expected_len = if i > 2 { 1 } else { 3 };
953 assert_eq!(
954 chains[i].msgs.len(),
955 expected_len,
956 "expected {} message in chain {} but got {}",
957 expected_len,
958 i,
959 chains[i].msgs.len()
960 );
961 }
962
963 let mut next_nonce = 0;
964 for i in 0..chains.len() {
965 for m in chains[i].msgs.iter() {
966 assert_eq!(
967 next_nonce,
968 m.sequence(),
969 "expected nonce {} but got {}",
970 next_nonce,
971 m.sequence()
972 );
973 next_nonce += 1;
974 }
975 }
976
977 let mut mset = HashMap::new();
981 let mut smsg_vec = Vec::new();
982 for i in 0..10 {
983 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i * 2, gas_limit, 1 + i);
984 smsg_vec.push(msg.clone());
985 mset.insert(i, msg);
986 }
987
988 let mut chains = Chains::new();
989 create_message_chains(
990 &tma,
991 &a1,
992 &mset,
993 &TokenAmount::zero(),
994 &ts,
995 &mut chains,
996 &chain_config,
997 )
998 .unwrap();
999 assert_eq!(chains.len(), 1, "expected a single chain");
1000 for (i, m) in chains[0].msgs.iter().enumerate() {
1001 assert_eq!(
1002 m.sequence(),
1003 i as u64,
1004 "expected nonce {} but got {}",
1005 i,
1006 m.sequence()
1007 );
1008 }
1009
1010 let mut mset = HashMap::new();
1014 let mut smsg_vec = Vec::new();
1015 tma.set_state_balance_raw(&a1, TokenAmount::from_atto(1_000_000_000_000_000_000_u64));
1016 for i in 0..10 {
1017 let msg = if i != 5 {
1018 create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i)
1019 } else {
1020 create_smsg(&a2, &a1, wallet.borrow_mut(), i, 1, 1 + i)
1021 };
1022 smsg_vec.push(msg.clone());
1023 mset.insert(i, msg);
1024 }
1025 let mut chains = Chains::new();
1026 create_message_chains(
1027 &tma,
1028 &a1,
1029 &mset,
1030 &TokenAmount::zero(),
1031 &ts,
1032 &mut chains,
1033 &chain_config,
1034 )
1035 .unwrap();
1036 assert_eq!(chains.len(), 1, "expected a single chain");
1037 assert_eq!(chains[0].msgs.len(), 5);
1038 for (i, m) in chains[0].msgs.iter().enumerate() {
1039 assert_eq!(
1040 m.sequence(),
1041 i as u64,
1042 "expected nonce {} but got {}",
1043 i,
1044 m.sequence()
1045 );
1046 }
1047
1048 let mut mset = HashMap::new();
1052 let mut smsg_vec = Vec::new();
1053 let max_messages = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / gas_limit;
1054 let n_messages = max_messages + 1;
1055 for i in 0..n_messages {
1056 let msg = create_smsg(
1057 &a2,
1058 &a1,
1059 wallet.borrow_mut(),
1060 i as u64,
1061 gas_limit,
1062 (1 + i) as u64,
1063 );
1064 smsg_vec.push(msg.clone());
1065 mset.insert(i as u64, msg);
1066 }
1067 let mut chains = Chains::new();
1068 create_message_chains(
1069 &tma,
1070 &a1,
1071 &mset,
1072 &TokenAmount::zero(),
1073 &ts,
1074 &mut chains,
1075 &chain_config,
1076 )
1077 .unwrap();
1078 assert_eq!(chains.len(), 1, "expected a single chain");
1079 assert_eq!(chains[0].msgs.len(), max_messages as usize);
1080 for (i, m) in chains[0].msgs.iter().enumerate() {
1081 assert_eq!(
1082 m.sequence(),
1083 i as u64,
1084 "expected nonce {} but got {}",
1085 i,
1086 m.sequence()
1087 );
1088 }
1089
1090 tma.set_state_balance_raw(&a1, TokenAmount::from_atto(300 * gas_limit + 1));
1092 let mut mset = HashMap::new();
1093 let mut smsg_vec = Vec::new();
1094 for i in 0..10 {
1095 let msg = create_smsg(&a2, &a1, wallet.borrow_mut(), i, gas_limit, 1 + i);
1096 smsg_vec.push(msg.clone());
1097 mset.insert(i, msg);
1098 }
1099 let mut chains = Chains::new();
1100 create_message_chains(
1101 &tma,
1102 &a1,
1103 &mset,
1104 &TokenAmount::zero(),
1105 &ts,
1106 &mut chains,
1107 &chain_config,
1108 )
1109 .unwrap();
1110 assert_eq!(chains.len(), 1, "expected a single chain");
1111 assert_eq!(chains[0].msgs.len(), 2);
1112 for (i, m) in chains[0].msgs.iter().enumerate() {
1113 assert_eq!(
1114 m.sequence(),
1115 i as u64,
1116 "expected nonce {} but got {}",
1117 i,
1118 m.sequence()
1119 );
1120 }
1121 }
1122}