1use std::cmp::Ordering;
10
11use crate::blocks::{BLOCK_MESSAGE_LIMIT, Tipset};
12use crate::message::{MessageRead as _, SignedMessage};
13use crate::message_pool::msg_chain::MsgChainNode;
14use crate::shim::crypto::SignatureType;
15use crate::shim::{address::Address, econ::TokenAmount};
16use crate::state_manager::IdToAddressCache;
17use ahash::{HashMap, HashMapExt};
18use anyhow::{Context, bail, ensure};
19use parking_lot::RwLock;
20use rand::prelude::SliceRandom;
21use tracing::{debug, error, warn};
22
23use crate::shim::crypto::Signature;
24use crate::utils::cache::SizeTrackingLruCache;
25use crate::utils::get_size::CidWrapper;
26
27use super::{MpoolCtx, msg_pool::MessagePool, provider::Provider, utils::recover_sig};
28use crate::message_pool::{
29 Error, add_to_selected_msgs,
30 msg_chain::{Chains, NodeKey, create_message_chains},
31 msg_pool::MsgSet,
32 msgpool::MIN_GAS,
33};
34
35type Pending = HashMap<Address, HashMap<u64, SignedMessage>>;
36
37const MAX_BLOCK_MSGS: usize = 16000;
39const MAX_BLOCKS: usize = 15;
40const CBOR_GEN_LIMIT: usize = 8192; struct SelectedMessages {
47 msgs: Vec<SignedMessage>,
49 gas_limit: u64,
51 secp_limit: u64,
53 bls_limit: u64,
55}
56
57impl Default for SelectedMessages {
58 fn default() -> Self {
59 SelectedMessages {
60 msgs: Vec::new(),
61 gas_limit: crate::shim::econ::BLOCK_GAS_LIMIT,
62 secp_limit: CBOR_GEN_LIMIT as u64,
63 bls_limit: CBOR_GEN_LIMIT as u64,
64 }
65 }
66}
67
68impl SelectedMessages {
69 fn new(msgs: Vec<SignedMessage>, gas_limit: u64) -> Self {
70 SelectedMessages {
71 msgs,
72 gas_limit,
73 ..Default::default()
74 }
75 }
76
77 fn len(&self) -> usize {
79 self.msgs.len()
80 }
81
82 fn truncate(&mut self, len: usize) {
84 self.msgs.truncate(len);
85 }
86
87 fn reduce_gas_limit(&mut self, gas: u64) {
90 self.gas_limit = self.gas_limit.saturating_sub(gas);
91 }
92
93 fn reduce_bls_limit(&mut self, bls: u64) {
96 self.bls_limit = self.bls_limit.saturating_sub(bls);
97 }
98
99 fn reduce_secp_limit(&mut self, secp: u64) {
102 self.secp_limit = self.secp_limit.saturating_sub(secp);
103 }
104
105 fn extend(&mut self, msgs: Vec<SignedMessage>) {
107 self.msgs.extend(msgs);
108 }
109
110 fn try_to_add(&mut self, message_chain: MsgChainNode) -> anyhow::Result<()> {
113 let msg_chain_len = message_chain.msgs.len();
114 ensure!(
115 BLOCK_MESSAGE_LIMIT >= msg_chain_len + self.len(),
116 "Message chain is too long to fit in the block: {} messages, limit is {BLOCK_MESSAGE_LIMIT}",
117 msg_chain_len + self.len(),
118 );
119 ensure!(
120 self.gas_limit >= message_chain.gas_limit,
121 "Message chain gas limit is too high: {} gas, limit is {}",
122 message_chain.gas_limit,
123 self.gas_limit
124 );
125
126 match message_chain.sig_type {
127 Some(SignatureType::Bls) => {
128 ensure!(
129 self.bls_limit >= msg_chain_len as u64,
130 "BLS limit is too low: {msg_chain_len} messages, limit is {}",
131 self.bls_limit
132 );
133
134 self.extend(message_chain.msgs);
135 self.reduce_bls_limit(msg_chain_len as u64);
136 self.reduce_gas_limit(message_chain.gas_limit);
137 }
138 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
139 ensure!(
140 self.secp_limit >= msg_chain_len as u64,
141 "Secp256k1 limit is too low: {msg_chain_len} messages, limit is {}",
142 self.secp_limit
143 );
144
145 self.extend(message_chain.msgs);
146 self.reduce_secp_limit(msg_chain_len as u64);
147 self.reduce_gas_limit(message_chain.gas_limit);
148 }
149 None => {
150 warn!("Tried to add a message chain with no signature type");
153 }
154 }
155 Ok(())
156 }
157
158 fn try_to_add_with_deps(
161 &mut self,
162 idx: usize,
163 chains: &mut Chains,
164 base_fee: &TokenAmount,
165 ) -> anyhow::Result<()> {
166 let message_chain = chains
167 .get_mut_at(idx)
168 .context("Couldn't find required message chain")?;
169 let mut chain_gas_limit = message_chain.gas_limit;
171 let mut chain_msg_limit = message_chain.msgs.len();
172 let mut dep_gas_limit = 0;
173 let mut dep_message_limit = 0;
174 let selected_messages_limit = match message_chain.sig_type {
175 Some(SignatureType::Bls) => self.bls_limit as usize,
176 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
177 self.secp_limit as usize
178 }
179 None => {
180 bail!("Tried to add a message chain with no signature type");
183 }
184 };
185 let selected_messages_limit =
186 selected_messages_limit.min(BLOCK_MESSAGE_LIMIT.saturating_sub(self.len()));
187
188 let mut chain_deps = vec![];
189 let mut cur_chain = message_chain.prev;
190 let _ = message_chain; while let Some(cur_chn) = cur_chain {
192 let node = chains.get(cur_chn).unwrap();
193 if !node.merged {
194 chain_deps.push(cur_chn);
195 chain_gas_limit += node.gas_limit;
196 chain_msg_limit += node.msgs.len();
197 dep_gas_limit += node.gas_limit;
198 dep_message_limit += node.msgs.len();
199 cur_chain = node.prev;
200 } else {
201 break;
202 }
203 }
204
205 if chain_gas_limit > self.gas_limit || chain_msg_limit > selected_messages_limit {
207 if dep_gas_limit > self.gas_limit || dep_message_limit >= selected_messages_limit {
213 chains.invalidate(chains.get_key_at(idx));
214 } else {
215 chains.trim_msgs_at(
217 idx,
218 self.gas_limit.saturating_sub(dep_gas_limit),
219 selected_messages_limit.saturating_sub(dep_message_limit),
220 base_fee,
221 );
222 }
223
224 bail!("Chain doesn't fit in the block");
225 }
226 for dep in chain_deps.iter().rev() {
227 let cur_chain = chains.get_mut(*dep);
228 if let Some(node) = cur_chain {
229 node.merged = true;
230 self.extend(node.msgs.clone());
231 } else {
232 bail!("Couldn't find required dependent message chain");
233 }
234 }
235
236 let message_chain = chains
237 .get_mut_at(idx)
238 .context("Couldn't find required message chain")?;
239 message_chain.merged = true;
240 self.extend(message_chain.msgs.clone());
241 self.reduce_gas_limit(chain_gas_limit);
242
243 match message_chain.sig_type {
244 Some(SignatureType::Bls) => {
245 self.reduce_bls_limit(chain_msg_limit as u64);
246 }
247 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
248 self.reduce_secp_limit(chain_msg_limit as u64);
249 }
250 None => {
251 bail!("Tried to add a message chain with no signature type");
254 }
255 }
256
257 Ok(())
258 }
259
260 fn trim_chain_at(&mut self, chains: &mut Chains, idx: usize, base_fee: &TokenAmount) {
261 let message_chain = match chains.get_at(idx) {
262 Some(message_chain) => message_chain,
263 None => {
264 error!("Tried to trim a message chain that doesn't exist");
265 return;
266 }
267 };
268 let msg_limit = BLOCK_MESSAGE_LIMIT.saturating_sub(self.len());
269 let msg_limit = match message_chain.sig_type {
270 Some(SignatureType::Bls) => std::cmp::min(self.bls_limit, msg_limit as u64),
271 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
272 std::cmp::min(self.secp_limit, msg_limit as u64)
273 }
274 _ => {
275 error!("Tried to trim a message chain with no signature type");
278 return;
279 }
280 };
281
282 if message_chain.gas_limit > self.gas_limit || message_chain.msgs.len() > msg_limit as usize
283 {
284 chains.trim_msgs_at(idx, self.gas_limit, msg_limit as usize, base_fee);
285 }
286 }
287}
288
289impl<T> MessagePool<T>
290where
291 T: Provider,
292{
293 pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result<Vec<SignedMessage>, Error> {
297 let cur_ts = self.current_tipset();
298 let mut msgs = if tq > 0.84 {
304 self.select_messages_greedy(&cur_ts, ts)
305 } else {
306 self.select_messages_optimal(&cur_ts, ts, tq)
307 }?;
308
309 if msgs.len() > MAX_BLOCK_MSGS {
310 warn!(
311 "Message selection chose too many messages: {} > {MAX_BLOCK_MSGS}",
312 msgs.len(),
313 );
314 msgs.truncate(MAX_BLOCK_MSGS)
315 }
316
317 Ok(msgs.msgs)
318 }
319
320 fn select_messages_greedy(
321 &self,
322 cur_ts: &Tipset,
323 ts: &Tipset,
324 ) -> Result<SelectedMessages, Error> {
325 let base_fee = self.api.chain_compute_base_fee(ts)?;
326
327 let mut pending = self.get_pending_messages(cur_ts, ts)?;
330
331 if pending.is_empty() {
332 return Ok(SelectedMessages::default());
333 }
334
335 let selected_msgs = self.select_priority_messages(&mut pending, &base_fee, ts)?;
337
338 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
340 return Ok(selected_msgs);
341 }
342
343 let mut chains = Chains::new();
346 for (actor, mset) in pending.into_iter() {
347 create_message_chains(
348 self.api.as_ref(),
349 &actor,
350 &mset,
351 &base_fee,
352 ts,
353 &mut chains,
354 &self.chain_config,
355 )?;
356 }
357
358 Ok(merge_and_trim(
359 &mut chains,
360 selected_msgs,
361 &base_fee,
362 MIN_GAS,
363 ))
364 }
365
366 #[allow(clippy::indexing_slicing)]
367 fn select_messages_optimal(
368 &self,
369 cur_ts: &Tipset,
370 target_tipset: &Tipset,
371 ticket_quality: f64,
372 ) -> Result<SelectedMessages, Error> {
373 let base_fee = self.api.chain_compute_base_fee(target_tipset)?;
374
375 let mut pending = self.get_pending_messages(cur_ts, target_tipset)?;
378
379 if pending.is_empty() {
380 return Ok(SelectedMessages::default());
381 }
382
383 let mut selected_msgs =
385 self.select_priority_messages(&mut pending, &base_fee, target_tipset)?;
386
387 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
389 return Ok(selected_msgs);
390 }
391
392 let mut chains = Chains::new();
395 for (actor, mset) in pending.into_iter() {
396 create_message_chains(
397 self.api.as_ref(),
398 &actor,
399 &mset,
400 &base_fee,
401 target_tipset,
402 &mut chains,
403 &self.chain_config,
404 )?;
405 }
406
407 chains.sort(false);
409
410 if chains.get_at(0).is_some_and(|it| it.gas_perf < 0.0) {
411 tracing::warn!(
412 "all messages in mpool have non-positive gas performance {}",
413 chains[0].gas_perf
414 );
415 return Ok(selected_msgs);
416 }
417
418 let mut next_chain = 0;
423 let mut partitions: Vec<Vec<NodeKey>> = vec![vec![]; MAX_BLOCKS];
424 let mut i = 0;
425 while i < MAX_BLOCKS && next_chain < chains.len() {
426 let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
427 let mut msg_limit = BLOCK_MESSAGE_LIMIT;
428 while next_chain < chains.len() {
429 let chain_key = chains.key_vec[next_chain];
430 next_chain += 1;
431 partitions[i].push(chain_key);
432 let chain = chains.get(chain_key).unwrap();
433 let chain_gas_limit = chain.gas_limit;
434 if gas_limit < chain_gas_limit {
435 break;
436 }
437 gas_limit = gas_limit.saturating_sub(chain_gas_limit);
438 msg_limit = msg_limit.saturating_sub(chain.msgs.len());
439 if gas_limit < MIN_GAS || msg_limit == 0 {
440 break;
441 }
442 }
443 i += 1;
444 }
445
446 let block_prob = crate::message_pool::block_probabilities(ticket_quality);
450 let mut eff_chains = 0;
451 for i in 0..MAX_BLOCKS {
452 for k in &partitions[i] {
453 if let Some(node) = chains.get_mut(*k) {
454 node.eff_perf = node.gas_perf * block_prob[i];
455 }
456 }
457 eff_chains += partitions[i].len();
458 }
459
460 for i in eff_chains..chains.len() {
462 if let Some(node) = chains.get_mut_at(i) {
463 node.set_null_effective_perf();
464 }
465 }
466
467 chains.sort_effective();
469
470 let mut last = chains.len();
475 for i in 0..chains.len() {
476 if chains[i].gas_perf < 0.0 {
478 break;
479 }
480
481 if chains[i].merged {
483 continue;
484 }
485
486 match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
487 Ok(_) => {
488 if let Some(next_key) = chains[i].next {
490 let next_node = chains.get_mut(next_key).unwrap();
491 if next_node.eff_perf > 0.0 {
492 next_node.eff_perf += next_node.parent_offset;
493 let mut next_next_key = next_node.next;
494 while let Some(nnk) = next_next_key {
495 let (nn_node, prev_perfs) = chains.get_mut_with_prev_eff(nnk);
496 if let Some(nn_node) = nn_node {
497 if nn_node.eff_perf > 0.0 {
498 nn_node.set_eff_perf(prev_perfs);
499 next_next_key = nn_node.next;
500 } else {
501 break;
502 }
503 } else {
504 break;
505 }
506 }
507 }
508 }
509
510 chains.sort_range_effective(i + 1..);
514
515 continue;
516 }
517 Err(e) => {
518 debug!("Failed to add message chain with dependencies: {e:#}");
519 }
520 }
521
522 last = i;
525 break;
526 }
527
528 'tail_loop: while selected_msgs.gas_limit >= MIN_GAS && last < chains.len() {
537 if !chains[last].valid {
538 last += 1;
540 continue;
541 }
542
543 selected_msgs.trim_chain_at(&mut chains, last, &base_fee);
545
546 if chains[last].valid {
548 for i in last..chains.len() - 1 {
549 if chains[i].cmp_effective(&chains[i + 1]) == Ordering::Greater {
550 break;
551 }
552 chains.key_vec.swap(i, i + 1);
553 }
554 }
555
556 let lst = last; for i in lst..chains.len() {
559 let chain = &mut chains[i];
560 if !chain.valid {
562 continue;
563 }
564
565 if chain.merged {
567 continue;
568 }
569
570 if chain.gas_perf < 0.0 {
572 break 'tail_loop;
573 }
574
575 match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
576 Ok(_) => continue,
577 Err(e) => debug!("Failed to add message chain with dependencies: {e:#}"),
578 }
579
580 continue 'tail_loop;
581 }
582
583 break;
586 }
587
588 if selected_msgs.gas_limit >= MIN_GAS && selected_msgs.msgs.len() <= BLOCK_MESSAGE_LIMIT {
593 let pre_random_length = selected_msgs.len();
594
595 chains
596 .key_vec
597 .shuffle(&mut crate::utils::rand::forest_rng());
598
599 for i in 0..chains.len() {
600 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
601 break;
602 }
603
604 if chains[i].merged || !chains[i].valid {
606 continue;
607 }
608
609 if chains[i].gas_perf < 0.0 {
611 continue;
612 }
613
614 if selected_msgs
615 .try_to_add_with_deps(i, &mut chains, &base_fee)
616 .is_ok()
617 {
618 continue;
620 }
621
622 if chains[i].valid {
623 selected_msgs
626 .try_to_add_with_deps(i, &mut chains, &base_fee)
627 .context("Failed to add message chain with dependencies")?;
628 continue;
629 }
630 }
631
632 if selected_msgs.len() != pre_random_length {
633 tracing::warn!(
634 "optimal selection failed to pack a block; picked {} messages with random selection",
635 selected_msgs.len() - pre_random_length
636 );
637 }
638 }
639
640 Ok(selected_msgs)
641 }
642
643 fn get_pending_messages(&self, cur_ts: &Tipset, ts: &Tipset) -> Result<Pending, Error> {
644 let mut result: Pending = HashMap::new();
645 let mut in_sync = false;
646 if cur_ts.epoch() == ts.epoch() && cur_ts == ts {
647 in_sync = true;
648 }
649
650 for (a, mset) in self.pending.read().iter() {
651 if in_sync {
652 result.insert(*a, mset.msgs.clone());
653 } else {
654 let mut mset_copy = HashMap::new();
655 for (nonce, m) in mset.msgs.iter() {
656 mset_copy.insert(*nonce, m.clone());
657 }
658 result.insert(*a, mset_copy);
659 }
660 }
661
662 if in_sync {
663 return Ok(result);
664 }
665
666 run_head_change(
668 self.api.as_ref(),
669 &self.bls_sig_cache,
670 &self.pending,
671 &self.key_cache,
672 cur_ts.clone(),
673 ts.clone(),
674 &mut result,
675 )?;
676
677 Ok(result)
678 }
679
680 fn select_priority_messages(
681 &self,
682 pending: &mut Pending,
683 base_fee: &TokenAmount,
684 ts: &Tipset,
685 ) -> Result<SelectedMessages, Error> {
686 let result = Vec::with_capacity(self.config.size_limit_low() as usize);
687 let gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
688 let min_gas = 1298450;
689
690 let priority = self.config.priority_addrs();
692 let mut chains = Chains::new();
693 for actor in priority.iter() {
694 if let Some(mset) = pending.remove(actor) {
696 create_message_chains(
698 self.api.as_ref(),
699 actor,
700 &mset,
701 base_fee,
702 ts,
703 &mut chains,
704 &self.chain_config,
705 )?;
706 }
707 }
708
709 if chains.is_empty() {
710 return Ok(SelectedMessages::new(Vec::new(), gas_limit));
711 }
712
713 Ok(merge_and_trim(
714 &mut chains,
715 SelectedMessages::new(result, gas_limit),
716 base_fee,
717 min_gas,
718 ))
719 }
720}
721
722#[allow(clippy::indexing_slicing)]
724fn merge_and_trim(
725 chains: &mut Chains,
726 mut selected_msgs: SelectedMessages,
727 base_fee: &TokenAmount,
728 min_gas: u64,
729) -> SelectedMessages {
730 if chains.is_empty() {
731 return selected_msgs;
732 }
733
734 chains.sort(true);
736
737 let first_chain_gas_perf = chains[0].gas_perf;
738
739 if !chains.is_empty() && first_chain_gas_perf < 0.0 {
740 warn!(
741 "all priority messages in mpool have negative gas performance bestGasPerf: {}",
742 first_chain_gas_perf
743 );
744 return selected_msgs;
745 }
746
747 let mut last = chains.len();
750 for i in 0..chains.len() {
751 let node = &chains[i];
752
753 if node.gas_perf < 0.0 {
754 break;
755 }
756
757 if selected_msgs.try_to_add(node.clone()).is_ok() {
758 continue;
760 }
761
762 last = i;
764 break;
765 }
766
767 'tail_loop: while selected_msgs.gas_limit >= min_gas && last < chains.len() {
768 selected_msgs.trim_chain_at(chains, last, base_fee);
770
771 let node = &chains[last];
773 if node.valid {
774 for i in last..chains.len() - 1 {
775 let cur_node = &chains[i];
777 let next_node = &chains[i + 1];
778 if cur_node.compare(next_node) == Ordering::Greater {
779 break;
780 }
781
782 chains.key_vec.swap(i, i + 1);
783 }
784 }
785
786 let lst = last; for i in lst..chains.len() {
789 let chain = chains[i].clone();
790 if !chain.valid {
791 continue;
792 }
793
794 if chain.gas_perf < 0.0 {
796 break 'tail_loop;
797 }
798
799 if selected_msgs.try_to_add(chain).is_ok() {
801 continue;
803 }
804
805 last += i;
807 continue 'tail_loop;
808 }
809
810 break;
811 }
812
813 selected_msgs
814}
815
816pub(in crate::message_pool) fn run_head_change<T>(
821 api: &T,
822 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
823 pending: &RwLock<HashMap<Address, MsgSet>>,
824 key_cache: &IdToAddressCache,
825 from: Tipset,
826 to: Tipset,
827 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
828) -> Result<(), Error>
829where
830 T: Provider,
831{
832 let mut left = from;
833 let mut right = to;
834 let mut left_chain = Vec::new();
835 let mut right_chain = Vec::new();
836 while left != right {
837 if left.epoch() > right.epoch() {
838 left_chain.push(left.clone());
839 let par = api.load_tipset(left.parents())?;
840 left = par;
841 } else {
842 right_chain.push(right.clone());
843 let par = api.load_tipset(right.parents())?;
844 right = par;
845 }
846 }
847 for ts in left_chain {
848 let mut msgs: Vec<SignedMessage> = Vec::new();
849 for block in ts.block_headers() {
850 let (umsg, smsgs) = api.messages_for_block(block)?;
851 msgs.extend(smsgs);
852 for msg in umsg {
853 let msg_cid = msg.cid();
854 if let Ok(smsg) = recover_sig(bls_sig_cache, msg) {
855 msgs.push(smsg);
856 } else {
857 tracing::debug!("could not recover signature for bls message {msg_cid}");
858 }
859 }
860 }
861 for msg in msgs {
862 add_to_selected_msgs(msg, rmsgs);
863 }
864 }
865
866 for ts in right_chain {
867 let mpool_ctx = MpoolCtx {
868 api,
869 key_cache,
870 pending,
871 ts: &ts,
872 };
873 for b in ts.block_headers() {
874 let (msgs, smsgs) = api.messages_for_block(b)?;
875
876 for msg in smsgs {
877 mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), rmsgs)?;
878 }
879 for msg in msgs {
880 mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, rmsgs)?;
881 }
882 }
883 }
884 Ok(())
885}
886
887#[cfg(test)]
888mod test_selection {
889 use std::sync::Arc;
890
891 use super::*;
892 use crate::db::MemoryDB;
893 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
894 use crate::message_pool::msgpool::{
895 test_provider::{TestApi, mock_block},
896 tests::{create_fake_smsg, create_smsg},
897 };
898 use crate::shim::crypto::SignatureType;
899 use crate::shim::econ::BLOCK_GAS_LIMIT;
900 use tokio::task::JoinSet;
901
902 const TEST_GAS_LIMIT: i64 = 6955002;
903
904 fn make_test_mpool(joinset: &mut JoinSet<anyhow::Result<()>>) -> MessagePool<TestApi> {
905 let tma = TestApi::default();
906 let (tx, _rx) = flume::bounded(50);
907 MessagePool::new(tma, tx, Default::default(), Arc::default(), joinset).unwrap()
908 }
909
910 async fn mock_tipset(mpool: &MessagePool<TestApi>) -> Tipset {
913 let b1 = mock_block(1, 1);
914 let ts = Tipset::from(&b1);
915 mpool
916 .apply_head_change(Vec::new(), vec![Tipset::from(b1)])
917 .await
918 .unwrap();
919 ts
920 }
921
922 #[tokio::test]
923 async fn basic_message_selection() {
924 let mut joinset = JoinSet::new();
925 let mpool = make_test_mpool(&mut joinset);
926
927 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
928 let mut w1 = Wallet::new(ks1);
929 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
930
931 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
932 let mut w2 = Wallet::new(ks2);
933 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
934
935 let ts = mock_tipset(&mpool).await;
936
937 mpool
938 .api
939 .set_state_balance_raw(&a1, TokenAmount::from_whole(1));
940 mpool
941 .api
942 .set_state_balance_raw(&a2, TokenAmount::from_whole(1));
943
944 for i in 0..10 {
948 let m = create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1);
949 mpool.add(m).unwrap();
950 }
951 for i in 0..10 {
952 let m = create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1);
953 mpool.add(m).unwrap();
954 }
955
956 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
957
958 assert_eq!(msgs.len(), 20, "Expected 20 messages, got {}", msgs.len());
959
960 let mut next_nonce = 0;
961 for (i, msg) in msgs.iter().enumerate().take(10) {
962 assert_eq!(
963 msg.from(),
964 a1,
965 "first 10 returned messages should be from actor a1 {i}",
966 );
967 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
968 next_nonce += 1;
969 }
970
971 next_nonce = 0;
972 for (i, msg) in msgs.iter().enumerate().take(20).skip(10) {
973 assert_eq!(
974 msg.from(),
975 a2,
976 "next 10 returned messages should be from actor a2 {i}",
977 );
978 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
979 next_nonce += 1;
980 }
981
982 let b2 = mpool.api.next_block();
984 mpool.api.set_block_messages(&b2, msgs);
985 mpool
986 .apply_head_change(Vec::new(), vec![Tipset::from(b2)])
987 .await
988 .unwrap();
989
990 assert!(
993 mpool.pending.read().is_empty(),
994 "Expected no pending messages, but got {}",
995 mpool.pending.read().len()
996 );
997
998 let mut msgs = Vec::with_capacity(20);
1000 for i in 10..20 {
1001 msgs.push(create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1));
1002 msgs.push(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1));
1003 }
1004 let b3 = mpool.api.next_block();
1005 let ts3 = Tipset::from(&b3);
1006 mpool.api.set_block_messages(&b3, msgs);
1007
1008 mpool.api.set_state_sequence(&a1, 20);
1010 mpool.api.set_state_sequence(&a2, 20);
1011
1012 for i in 20..30 {
1014 mpool
1015 .add(create_smsg(
1016 &a2,
1017 &a1,
1018 &mut w1,
1019 i,
1020 TEST_GAS_LIMIT,
1021 2 * i + 200,
1022 ))
1023 .unwrap();
1024 mpool
1025 .add(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1))
1026 .unwrap();
1027 }
1028 mpool.api.set_state_sequence(&a1, 10);
1032 mpool.api.set_state_sequence(&a2, 10);
1033 let msgs = mpool.select_messages(&ts3, 1.0).unwrap();
1034
1035 assert_eq!(
1036 msgs.len(),
1037 20,
1038 "Expected 20 messages, but got {}",
1039 msgs.len()
1040 );
1041
1042 let mut next_nonce = 20;
1043 for msg in msgs.iter().take(10) {
1044 assert_eq!(
1045 msg.from(),
1046 a1,
1047 "first 10 returned messages should be from actor a1"
1048 );
1049 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1050 next_nonce += 1;
1051 }
1052 next_nonce = 20;
1053 for msg in msgs.iter().take(20).skip(10) {
1054 assert_eq!(
1055 msg.from(),
1056 a2,
1057 "next 10 returned messages should be from actor a2"
1058 );
1059 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1060 next_nonce += 1;
1061 }
1062 }
1063
1064 #[tokio::test]
1065 async fn message_selection_trimming_gas() {
1066 let mut joinset = JoinSet::new();
1067 let mpool = make_test_mpool(&mut joinset);
1068 let ts = mock_tipset(&mpool).await;
1069 let api = mpool.api.clone();
1070
1071 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1072 let mut w1 = Wallet::new(ks1);
1073 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1074
1075 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1076 let mut w2 = Wallet::new(ks2);
1077 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1078
1079 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1080 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1081
1082 let nmsgs = (crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT) + 1;
1083
1084 for i in 0..nmsgs {
1086 let bias = (nmsgs - i) / 3;
1087 let m = create_fake_smsg(
1088 &mpool,
1089 &a2,
1090 &a1,
1091 i as u64,
1092 TEST_GAS_LIMIT,
1093 (1 + i % 3 + bias) as u64,
1094 );
1095 mpool.add(m).unwrap();
1096 let m = create_fake_smsg(
1097 &mpool,
1098 &a1,
1099 &a2,
1100 i as u64,
1101 TEST_GAS_LIMIT,
1102 (1 + i % 3 + bias) as u64,
1103 );
1104 mpool.add(m).unwrap();
1105 }
1106
1107 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1108
1109 let expected = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1110 assert_eq!(msgs.len(), expected as usize);
1111 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1112 assert!(m_gas_limit <= crate::shim::econ::BLOCK_GAS_LIMIT);
1113 }
1114
1115 #[tokio::test]
1116 async fn message_selection_trimming_msgs_basic() {
1117 let mut joinset = JoinSet::new();
1118 let mpool = make_test_mpool(&mut joinset);
1119 let ts = mock_tipset(&mpool).await;
1120 let api = mpool.api.clone();
1121
1122 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1123 let mut wallet = Wallet::new(keystore);
1124 let address = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1125
1126 api.set_state_balance_raw(&address, TokenAmount::from_whole(1));
1127
1128 for i in 0..BLOCK_MESSAGE_LIMIT {
1130 let msg = create_fake_smsg(&mpool, &address, &address, i as u64, 200_000, 100);
1131 mpool.add(msg).unwrap();
1132 }
1133
1134 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1135 assert_eq!(
1136 msgs.len(),
1137 CBOR_GEN_LIMIT,
1138 "Expected {CBOR_GEN_LIMIT} messages, got {}",
1139 msgs.len()
1140 );
1141
1142 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1144 assert!(
1145 m_gas_limit <= BLOCK_GAS_LIMIT,
1146 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1147 );
1148 }
1149
1150 #[tokio::test]
1151 async fn message_selection_trimming_msgs_two_senders() {
1152 let mut joinset = JoinSet::new();
1153 let mpool = make_test_mpool(&mut joinset);
1154 let ts = mock_tipset(&mpool).await;
1155 let api = mpool.api.clone();
1156
1157 let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1158 let mut wallet_1 = Wallet::new(keystore_1);
1159 let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1160
1161 let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1162 let mut wallet_2 = Wallet::new(keystore_2);
1163 let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1164
1165 api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1166 api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1167
1168 for i in 0..BLOCK_MESSAGE_LIMIT {
1170 let msg = create_smsg(
1171 &address_2,
1172 &address_1,
1173 &mut wallet_1,
1174 i as u64,
1175 300_000,
1176 100,
1177 );
1178 mpool.add(msg).unwrap();
1179 let msg = create_smsg(
1182 &address_1,
1183 &address_2,
1184 &mut wallet_2,
1185 i as u64,
1186 300_000,
1187 1000,
1188 );
1189 mpool.add(msg).unwrap();
1190 }
1191 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1192 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1194 assert!(
1195 m_gas_limit <= BLOCK_GAS_LIMIT,
1196 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1197 );
1198 let bls_msgs = msgs.iter().filter(|m| m.is_bls()).count();
1199 assert_eq!(
1200 CBOR_GEN_LIMIT, bls_msgs,
1201 "Expected {CBOR_GEN_LIMIT} bls messages, got {bls_msgs}."
1202 );
1203 assert_eq!(
1204 msgs.len(),
1205 BLOCK_MESSAGE_LIMIT,
1206 "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1207 msgs.len()
1208 );
1209 }
1210
1211 #[tokio::test]
1212 async fn message_selection_trimming_msgs_two_senders_complex() {
1213 let mut joinset = JoinSet::new();
1214 let mpool = make_test_mpool(&mut joinset);
1215 let ts = mock_tipset(&mpool).await;
1216 let api = mpool.api.clone();
1217
1218 let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1219 let mut wallet_1 = Wallet::new(keystore_1);
1220 let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1221
1222 let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1223 let mut wallet_2 = Wallet::new(keystore_2);
1224 let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1225
1226 api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1227 api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1228
1229 let mut counter = 0;
1231 for i in 0..CBOR_GEN_LIMIT {
1232 counter += 1;
1233 let msg = create_smsg(
1234 &address_2,
1235 &address_1,
1236 &mut wallet_1,
1237 i as u64,
1238 300_000,
1239 100,
1240 );
1241 mpool.add(msg).unwrap();
1242 let msg = create_smsg(
1245 &address_1,
1246 &address_2,
1247 &mut wallet_2,
1248 i as u64,
1249 300_000,
1250 100,
1251 );
1252 mpool.add(msg).unwrap();
1253 }
1254
1255 let msg = create_smsg(
1257 &address_2,
1258 &address_1,
1259 &mut wallet_1,
1260 counter as u64,
1261 300_000,
1262 1000,
1263 );
1264 mpool.add(msg).unwrap();
1265
1266 let msg = create_smsg(
1267 &address_1,
1268 &address_2,
1269 &mut wallet_2,
1270 counter as u64,
1271 300_000,
1272 100,
1273 );
1274 mpool.add(msg).unwrap();
1275
1276 counter += 1;
1277
1278 let msg = create_smsg(
1280 &address_2,
1281 &address_1,
1282 &mut wallet_1,
1283 counter as u64,
1284 400_000,
1285 1_000_000,
1286 );
1287 mpool.add(msg).unwrap();
1288
1289 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1290 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1292 assert!(
1293 m_gas_limit <= BLOCK_GAS_LIMIT,
1294 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1295 );
1296 let secps_len = msgs.iter().filter(|m| m.is_secp256k1()).count();
1298 assert_eq!(
1299 CBOR_GEN_LIMIT, secps_len,
1300 "Expected {CBOR_GEN_LIMIT} secp messages, got {secps_len}."
1301 );
1302 assert_eq!(
1304 msgs.len(),
1305 BLOCK_MESSAGE_LIMIT,
1306 "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1307 msgs.len()
1308 );
1309 }
1310
1311 #[tokio::test]
1312 async fn message_selection_priority() {
1313 let db = MemoryDB::default();
1314
1315 let mut joinset = JoinSet::new();
1316 let mut mpool = make_test_mpool(&mut joinset);
1317 let ts = mock_tipset(&mpool).await;
1318 let api = mpool.api.clone();
1319
1320 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1321 let mut w1 = Wallet::new(ks1);
1322 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1323
1324 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1325 let mut w2 = Wallet::new(ks2);
1326 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1327
1328 let mut mpool_cfg = mpool.get_config().clone();
1330 mpool_cfg.priority_addrs.push(a1);
1331 mpool.set_config(&db, mpool_cfg).unwrap();
1332
1333 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1335 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1336
1337 let nmsgs = 10;
1338
1339 for i in 0..nmsgs {
1341 let bias = (nmsgs - i) / 3;
1342 let m = create_smsg(
1343 &a2,
1344 &a1,
1345 &mut w1,
1346 i as u64,
1347 TEST_GAS_LIMIT,
1348 (1 + i % 3 + bias) as u64,
1349 );
1350 mpool.add(m).unwrap();
1351 let m = create_smsg(
1352 &a1,
1353 &a2,
1354 &mut w2,
1355 i as u64,
1356 TEST_GAS_LIMIT,
1357 (1 + i % 3 + bias) as u64,
1358 );
1359 mpool.add(m).unwrap();
1360 }
1361
1362 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1363
1364 assert_eq!(msgs.len(), 20);
1365
1366 let mut next_nonce = 0;
1367 for msg in msgs.iter().take(10) {
1368 assert_eq!(
1369 msg.from(),
1370 a1,
1371 "first 10 returned messages should be from actor a1"
1372 );
1373 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1374 next_nonce += 1;
1375 }
1376 next_nonce = 0;
1377 for msg in msgs.iter().take(20).skip(10) {
1378 assert_eq!(
1379 msg.from(),
1380 a2,
1381 "next 10 returned messages should be from actor a2"
1382 );
1383 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1384 next_nonce += 1;
1385 }
1386 }
1387
1388 #[tokio::test]
1389 async fn test_optimal_msg_selection1() {
1390 let mut joinset = JoinSet::new();
1394 let mpool = make_test_mpool(&mut joinset);
1395 let ts = mock_tipset(&mpool).await;
1396 let api = mpool.api.clone();
1397
1398 let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1400 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1401 let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1402 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1403
1404 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1405 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1406
1407 let n_msgs = 10 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1408
1409 for i in 0..(n_msgs as usize) {
1413 let bias = (n_msgs as usize - i) / 3;
1414 let m = create_fake_smsg(
1415 &mpool,
1416 &a2,
1417 &a1,
1418 i as u64,
1419 TEST_GAS_LIMIT,
1420 (1 + i % 3 + bias) as u64,
1421 );
1422 mpool.add(m).unwrap();
1423 }
1424
1425 let msgs = mpool.select_messages(&ts, 0.25).unwrap();
1426
1427 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1428
1429 assert_eq!(msgs.len(), expected_msgs as usize);
1430
1431 for (next_nonce, m) in msgs.into_iter().enumerate() {
1432 assert_eq!(m.from(), a1, "Expected message from a1");
1433 assert_eq!(
1434 m.message().sequence,
1435 next_nonce as u64,
1436 "expected nonce {} but got {}",
1437 next_nonce,
1438 m.message().sequence
1439 );
1440 }
1441 }
1442
1443 #[tokio::test]
1444 async fn test_optimal_msg_selection2() {
1445 let mut joinset = JoinSet::new();
1446 let mpool = make_test_mpool(&mut joinset);
1451 let ts = mock_tipset(&mpool).await;
1452 let api = mpool.api.clone();
1453
1454 let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1456 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1457 let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1458 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1459
1460 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1)); api.set_state_balance_raw(&a2, TokenAmount::from_whole(1)); let n_msgs = 5 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1464 for i in 0..n_msgs as usize {
1465 let bias = (n_msgs as usize - i) / 3;
1466 let m = create_fake_smsg(
1467 &mpool,
1468 &a2,
1469 &a1,
1470 i as u64,
1471 TEST_GAS_LIMIT,
1472 (200000 + i % 3 + bias) as u64,
1473 );
1474 mpool.add(m).unwrap();
1475 let m = create_fake_smsg(
1476 &mpool,
1477 &a1,
1478 &a2,
1479 i as u64,
1480 TEST_GAS_LIMIT,
1481 (190000 + i % 3 + bias) as u64,
1482 );
1483 mpool.add(m).unwrap();
1484 }
1485
1486 let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1487
1488 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1489 assert_eq!(
1490 msgs.len(),
1491 expected_msgs as usize,
1492 "Expected {} messages, but got {}",
1493 expected_msgs,
1494 msgs.len()
1495 );
1496
1497 let mut n_from1 = 0;
1498 let mut n_from2 = 0;
1499 let mut next_nonce1 = 0;
1500 let mut next_nonce2 = 0;
1501
1502 for m in msgs {
1503 if m.from() == a1 {
1504 if m.message.sequence != next_nonce1 {
1505 panic!(
1506 "Expected nonce {}, but got {}",
1507 next_nonce1, m.message.sequence
1508 );
1509 }
1510 next_nonce1 += 1;
1511 n_from1 += 1;
1512 } else {
1513 if m.message.sequence != next_nonce2 {
1514 panic!(
1515 "Expected nonce {}, but got {}",
1516 next_nonce2, m.message.sequence
1517 );
1518 }
1519 next_nonce2 += 1;
1520 n_from2 += 1;
1521 }
1522 }
1523
1524 if n_from1 > n_from2 {
1525 panic!("Expected more msgs from a2 than a1");
1526 }
1527 }
1528
1529 #[tokio::test]
1530 async fn test_optimal_msg_selection3() {
1531 let mut joinset = JoinSet::new();
1532 let mpool = make_test_mpool(&mut joinset);
1538 let ts = mock_tipset(&mpool).await;
1539 let api = mpool.api.clone();
1540
1541 let n_actors = 10;
1542
1543 let mut actors = vec![];
1544 let mut wallets = vec![];
1545
1546 for _ in 0..n_actors {
1547 let mut wallet = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1548 let actor = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1549
1550 actors.push(actor);
1551 wallets.push(wallet);
1552 }
1553
1554 for a in &mut actors {
1555 api.set_state_balance_raw(a, TokenAmount::from_whole(1));
1556 }
1557
1558 let n_msgs = 1 + crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1559 for i in 0..n_msgs {
1560 for j in 0..n_actors {
1561 let premium =
1562 500000 + 10000 * (n_actors - j) + (n_msgs + 2 - i) / (30 * n_actors) + i % 3;
1563 let m = create_fake_smsg(
1564 &mpool,
1565 &actors[j as usize],
1566 &actors[j as usize],
1567 i as u64,
1568 TEST_GAS_LIMIT,
1569 premium as u64,
1570 );
1571 mpool.add(m).unwrap();
1572 }
1573 }
1574
1575 let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1576 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1577
1578 assert_eq!(
1579 msgs.len(),
1580 expected_msgs as usize,
1581 "Expected {} messages, but got {}",
1582 expected_msgs,
1583 msgs.len()
1584 );
1585
1586 let who_is = |addr| -> usize {
1587 for (i, a) in actors.iter().enumerate() {
1588 if a == &addr {
1589 return i;
1590 }
1591 }
1592 9999999
1595 };
1596
1597 let mut nonces = vec![0; n_actors as usize];
1598 for m in &msgs {
1599 let who = who_is(m.from());
1600 if who < 3 {
1601 panic!("got message from {who}th actor",);
1602 }
1603
1604 let next_nonce: u64 = nonces[who];
1605 if m.message.sequence != next_nonce {
1606 panic!(
1607 "expected nonce {} but got {}",
1608 next_nonce, m.message.sequence
1609 );
1610 }
1611 nonces[who] += 1;
1612 }
1613 }
1614}