1use std::cmp::Ordering;
10
11use crate::blocks::{BLOCK_MESSAGE_LIMIT, Tipset};
12use crate::message::{MessageRead as _, SignedMessage};
13use crate::message_pool::msg_chain::MsgChainNode;
14use crate::shim::crypto::SignatureType;
15use crate::shim::{address::Address, econ::TokenAmount};
16use crate::state_manager::IdToAddressCache;
17use ahash::{HashMap, HashMapExt};
18use anyhow::{Context, bail, ensure};
19use rand::prelude::SliceRandom;
20use tracing::{debug, error, warn};
21
22use crate::shim::crypto::Signature;
23use crate::utils::cache::SizeTrackingLruCache;
24use crate::utils::get_size::CidWrapper;
25
26use super::{MpoolCtx, msg_pool::MessagePool, provider::Provider, utils::recover_sig};
27use crate::message_pool::{
28 Error, add_to_selected_msgs,
29 msg_chain::{Chains, NodeKey, create_message_chains},
30 msgpool::{MIN_GAS, pending_store::PendingStore},
31};
32
33type Pending = HashMap<Address, HashMap<u64, SignedMessage>>;
34
35const MAX_BLOCK_MSGS: usize = 16000;
37const MAX_BLOCKS: usize = 15;
38const CBOR_GEN_LIMIT: usize = 8192; struct SelectedMessages {
45 msgs: Vec<SignedMessage>,
47 gas_limit: u64,
49 secp_limit: u64,
51 bls_limit: u64,
53}
54
55impl Default for SelectedMessages {
56 fn default() -> Self {
57 SelectedMessages {
58 msgs: Vec::new(),
59 gas_limit: crate::shim::econ::BLOCK_GAS_LIMIT,
60 secp_limit: CBOR_GEN_LIMIT as u64,
61 bls_limit: CBOR_GEN_LIMIT as u64,
62 }
63 }
64}
65
66impl SelectedMessages {
67 fn new(msgs: Vec<SignedMessage>, gas_limit: u64) -> Self {
68 SelectedMessages {
69 msgs,
70 gas_limit,
71 ..Default::default()
72 }
73 }
74
75 fn len(&self) -> usize {
77 self.msgs.len()
78 }
79
80 fn truncate(&mut self, len: usize) {
82 self.msgs.truncate(len);
83 }
84
85 fn reduce_gas_limit(&mut self, gas: u64) {
88 self.gas_limit = self.gas_limit.saturating_sub(gas);
89 }
90
91 fn reduce_bls_limit(&mut self, bls: u64) {
94 self.bls_limit = self.bls_limit.saturating_sub(bls);
95 }
96
97 fn reduce_secp_limit(&mut self, secp: u64) {
100 self.secp_limit = self.secp_limit.saturating_sub(secp);
101 }
102
103 fn extend(&mut self, msgs: Vec<SignedMessage>) {
105 self.msgs.extend(msgs);
106 }
107
108 fn try_to_add(&mut self, message_chain: MsgChainNode) -> anyhow::Result<()> {
111 let msg_chain_len = message_chain.msgs.len();
112 ensure!(
113 BLOCK_MESSAGE_LIMIT >= msg_chain_len + self.len(),
114 "Message chain is too long to fit in the block: {} messages, limit is {BLOCK_MESSAGE_LIMIT}",
115 msg_chain_len + self.len(),
116 );
117 ensure!(
118 self.gas_limit >= message_chain.gas_limit,
119 "Message chain gas limit is too high: {} gas, limit is {}",
120 message_chain.gas_limit,
121 self.gas_limit
122 );
123
124 match message_chain.sig_type {
125 Some(SignatureType::Bls) => {
126 ensure!(
127 self.bls_limit >= msg_chain_len as u64,
128 "BLS limit is too low: {msg_chain_len} messages, limit is {}",
129 self.bls_limit
130 );
131
132 self.extend(message_chain.msgs);
133 self.reduce_bls_limit(msg_chain_len as u64);
134 self.reduce_gas_limit(message_chain.gas_limit);
135 }
136 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
137 ensure!(
138 self.secp_limit >= msg_chain_len as u64,
139 "Secp256k1 limit is too low: {msg_chain_len} messages, limit is {}",
140 self.secp_limit
141 );
142
143 self.extend(message_chain.msgs);
144 self.reduce_secp_limit(msg_chain_len as u64);
145 self.reduce_gas_limit(message_chain.gas_limit);
146 }
147 None => {
148 warn!("Tried to add a message chain with no signature type");
151 }
152 }
153 Ok(())
154 }
155
156 fn try_to_add_with_deps(
159 &mut self,
160 idx: usize,
161 chains: &mut Chains,
162 base_fee: &TokenAmount,
163 ) -> anyhow::Result<()> {
164 let message_chain = chains
165 .get_mut_at(idx)
166 .context("Couldn't find required message chain")?;
167 let mut chain_gas_limit = message_chain.gas_limit;
169 let mut chain_msg_limit = message_chain.msgs.len();
170 let mut dep_gas_limit = 0;
171 let mut dep_message_limit = 0;
172 let selected_messages_limit = match message_chain.sig_type {
173 Some(SignatureType::Bls) => self.bls_limit as usize,
174 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
175 self.secp_limit as usize
176 }
177 None => {
178 bail!("Tried to add a message chain with no signature type");
181 }
182 };
183 let selected_messages_limit =
184 selected_messages_limit.min(BLOCK_MESSAGE_LIMIT.saturating_sub(self.len()));
185
186 let mut chain_deps = vec![];
187 let mut cur_chain = message_chain.prev;
188 let _ = message_chain; while let Some(cur_chn) = cur_chain {
190 let node = chains.get(cur_chn).unwrap();
191 if !node.merged {
192 chain_deps.push(cur_chn);
193 chain_gas_limit += node.gas_limit;
194 chain_msg_limit += node.msgs.len();
195 dep_gas_limit += node.gas_limit;
196 dep_message_limit += node.msgs.len();
197 cur_chain = node.prev;
198 } else {
199 break;
200 }
201 }
202
203 if chain_gas_limit > self.gas_limit || chain_msg_limit > selected_messages_limit {
205 if dep_gas_limit > self.gas_limit || dep_message_limit >= selected_messages_limit {
211 chains.invalidate(chains.get_key_at(idx));
212 } else {
213 chains.trim_msgs_at(
215 idx,
216 self.gas_limit.saturating_sub(dep_gas_limit),
217 selected_messages_limit.saturating_sub(dep_message_limit),
218 base_fee,
219 );
220 }
221
222 bail!("Chain doesn't fit in the block");
223 }
224 for dep in chain_deps.iter().rev() {
225 let cur_chain = chains.get_mut(*dep);
226 if let Some(node) = cur_chain {
227 node.merged = true;
228 self.extend(node.msgs.clone());
229 } else {
230 bail!("Couldn't find required dependent message chain");
231 }
232 }
233
234 let message_chain = chains
235 .get_mut_at(idx)
236 .context("Couldn't find required message chain")?;
237 message_chain.merged = true;
238 self.extend(message_chain.msgs.clone());
239 self.reduce_gas_limit(chain_gas_limit);
240
241 match message_chain.sig_type {
242 Some(SignatureType::Bls) => {
243 self.reduce_bls_limit(chain_msg_limit as u64);
244 }
245 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
246 self.reduce_secp_limit(chain_msg_limit as u64);
247 }
248 None => {
249 bail!("Tried to add a message chain with no signature type");
252 }
253 }
254
255 Ok(())
256 }
257
258 fn trim_chain_at(&mut self, chains: &mut Chains, idx: usize, base_fee: &TokenAmount) {
259 let message_chain = match chains.get_at(idx) {
260 Some(message_chain) => message_chain,
261 None => {
262 error!("Tried to trim a message chain that doesn't exist");
263 return;
264 }
265 };
266 let msg_limit = BLOCK_MESSAGE_LIMIT.saturating_sub(self.len());
267 let msg_limit = match message_chain.sig_type {
268 Some(SignatureType::Bls) => std::cmp::min(self.bls_limit, msg_limit as u64),
269 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
270 std::cmp::min(self.secp_limit, msg_limit as u64)
271 }
272 _ => {
273 error!("Tried to trim a message chain with no signature type");
276 return;
277 }
278 };
279
280 if message_chain.gas_limit > self.gas_limit || message_chain.msgs.len() > msg_limit as usize
281 {
282 chains.trim_msgs_at(idx, self.gas_limit, msg_limit as usize, base_fee);
283 }
284 }
285}
286
287impl<T> MessagePool<T>
288where
289 T: Provider,
290{
291 pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result<Vec<SignedMessage>, Error> {
295 let cur_ts = self.current_tipset();
296 let mut msgs = if tq > 0.84 {
302 self.select_messages_greedy(&cur_ts, ts)
303 } else {
304 self.select_messages_optimal(&cur_ts, ts, tq)
305 }?;
306
307 if msgs.len() > MAX_BLOCK_MSGS {
308 warn!(
309 "Message selection chose too many messages: {} > {MAX_BLOCK_MSGS}",
310 msgs.len(),
311 );
312 msgs.truncate(MAX_BLOCK_MSGS)
313 }
314
315 Ok(msgs.msgs)
316 }
317
318 fn select_messages_greedy(
319 &self,
320 cur_ts: &Tipset,
321 ts: &Tipset,
322 ) -> Result<SelectedMessages, Error> {
323 let base_fee = self.api.chain_compute_base_fee(ts)?;
324
325 let mut pending = self.get_pending_messages(cur_ts, ts)?;
328
329 if pending.is_empty() {
330 return Ok(SelectedMessages::default());
331 }
332
333 let selected_msgs = self.select_priority_messages(&mut pending, &base_fee, ts)?;
335
336 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
338 return Ok(selected_msgs);
339 }
340
341 let mut chains = Chains::new();
344 for (actor, mset) in pending.into_iter() {
345 create_message_chains(
346 self.api.as_ref(),
347 &actor,
348 &mset,
349 &base_fee,
350 ts,
351 &mut chains,
352 &self.chain_config,
353 )?;
354 }
355
356 Ok(merge_and_trim(
357 &mut chains,
358 selected_msgs,
359 &base_fee,
360 MIN_GAS,
361 ))
362 }
363
364 #[allow(clippy::indexing_slicing)]
365 fn select_messages_optimal(
366 &self,
367 cur_ts: &Tipset,
368 target_tipset: &Tipset,
369 ticket_quality: f64,
370 ) -> Result<SelectedMessages, Error> {
371 let base_fee = self.api.chain_compute_base_fee(target_tipset)?;
372
373 let mut pending = self.get_pending_messages(cur_ts, target_tipset)?;
376
377 if pending.is_empty() {
378 return Ok(SelectedMessages::default());
379 }
380
381 let mut selected_msgs =
383 self.select_priority_messages(&mut pending, &base_fee, target_tipset)?;
384
385 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
387 return Ok(selected_msgs);
388 }
389
390 let mut chains = Chains::new();
393 for (actor, mset) in pending.into_iter() {
394 create_message_chains(
395 self.api.as_ref(),
396 &actor,
397 &mset,
398 &base_fee,
399 target_tipset,
400 &mut chains,
401 &self.chain_config,
402 )?;
403 }
404
405 chains.sort(false);
407
408 if chains.get_at(0).is_some_and(|it| it.gas_perf < 0.0) {
409 tracing::warn!(
410 "all messages in mpool have non-positive gas performance {}",
411 chains[0].gas_perf
412 );
413 return Ok(selected_msgs);
414 }
415
416 let mut next_chain = 0;
421 let mut partitions: Vec<Vec<NodeKey>> = vec![vec![]; MAX_BLOCKS];
422 let mut i = 0;
423 while i < MAX_BLOCKS && next_chain < chains.len() {
424 let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
425 let mut msg_limit = BLOCK_MESSAGE_LIMIT;
426 while next_chain < chains.len() {
427 let chain_key = chains.key_vec[next_chain];
428 next_chain += 1;
429 partitions[i].push(chain_key);
430 let chain = chains.get(chain_key).unwrap();
431 let chain_gas_limit = chain.gas_limit;
432 if gas_limit < chain_gas_limit {
433 break;
434 }
435 gas_limit = gas_limit.saturating_sub(chain_gas_limit);
436 msg_limit = msg_limit.saturating_sub(chain.msgs.len());
437 if gas_limit < MIN_GAS || msg_limit == 0 {
438 break;
439 }
440 }
441 i += 1;
442 }
443
444 let block_prob = crate::message_pool::block_probabilities(ticket_quality);
448 let mut eff_chains = 0;
449 for i in 0..MAX_BLOCKS {
450 for k in &partitions[i] {
451 if let Some(node) = chains.get_mut(*k) {
452 node.eff_perf = node.gas_perf * block_prob[i];
453 }
454 }
455 eff_chains += partitions[i].len();
456 }
457
458 for i in eff_chains..chains.len() {
460 if let Some(node) = chains.get_mut_at(i) {
461 node.set_null_effective_perf();
462 }
463 }
464
465 chains.sort_effective();
467
468 let mut last = chains.len();
473 for i in 0..chains.len() {
474 if chains[i].gas_perf < 0.0 {
476 break;
477 }
478
479 if chains[i].merged {
481 continue;
482 }
483
484 match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
485 Ok(_) => {
486 if let Some(next_key) = chains[i].next {
488 let next_node = chains.get_mut(next_key).unwrap();
489 if next_node.eff_perf > 0.0 {
490 next_node.eff_perf += next_node.parent_offset;
491 let mut next_next_key = next_node.next;
492 while let Some(nnk) = next_next_key {
493 let (nn_node, prev_perfs) = chains.get_mut_with_prev_eff(nnk);
494 if let Some(nn_node) = nn_node {
495 if nn_node.eff_perf > 0.0 {
496 nn_node.set_eff_perf(prev_perfs);
497 next_next_key = nn_node.next;
498 } else {
499 break;
500 }
501 } else {
502 break;
503 }
504 }
505 }
506 }
507
508 chains.sort_range_effective(i + 1..);
512
513 continue;
514 }
515 Err(e) => {
516 debug!("Failed to add message chain with dependencies: {e:#}");
517 }
518 }
519
520 last = i;
523 break;
524 }
525
526 'tail_loop: while selected_msgs.gas_limit >= MIN_GAS && last < chains.len() {
535 if !chains[last].valid {
536 last += 1;
538 continue;
539 }
540
541 selected_msgs.trim_chain_at(&mut chains, last, &base_fee);
543
544 if chains[last].valid {
546 for i in last..chains.len() - 1 {
547 if chains[i].cmp_effective(&chains[i + 1]) == Ordering::Greater {
548 break;
549 }
550 chains.key_vec.swap(i, i + 1);
551 }
552 }
553
554 let lst = last; for i in lst..chains.len() {
557 let chain = &mut chains[i];
558 if !chain.valid {
560 continue;
561 }
562
563 if chain.merged {
565 continue;
566 }
567
568 if chain.gas_perf < 0.0 {
570 break 'tail_loop;
571 }
572
573 match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
574 Ok(_) => continue,
575 Err(e) => debug!("Failed to add message chain with dependencies: {e:#}"),
576 }
577
578 continue 'tail_loop;
579 }
580
581 break;
584 }
585
586 if selected_msgs.gas_limit >= MIN_GAS && selected_msgs.msgs.len() <= BLOCK_MESSAGE_LIMIT {
591 let pre_random_length = selected_msgs.len();
592
593 chains
594 .key_vec
595 .shuffle(&mut crate::utils::rand::forest_rng());
596
597 for i in 0..chains.len() {
598 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
599 break;
600 }
601
602 if chains[i].merged || !chains[i].valid {
604 continue;
605 }
606
607 if chains[i].gas_perf < 0.0 {
609 continue;
610 }
611
612 if selected_msgs
613 .try_to_add_with_deps(i, &mut chains, &base_fee)
614 .is_ok()
615 {
616 continue;
618 }
619
620 if chains[i].valid {
621 selected_msgs
624 .try_to_add_with_deps(i, &mut chains, &base_fee)
625 .context("Failed to add message chain with dependencies")?;
626 continue;
627 }
628 }
629
630 if selected_msgs.len() != pre_random_length {
631 tracing::warn!(
632 "optimal selection failed to pack a block; picked {} messages with random selection",
633 selected_msgs.len() - pre_random_length
634 );
635 }
636 }
637
638 Ok(selected_msgs)
639 }
640
641 fn get_pending_messages(&self, cur_ts: &Tipset, ts: &Tipset) -> Result<Pending, Error> {
642 let snapshot = self.pending_store.snapshot();
643 let mut result: Pending = HashMap::with_capacity(snapshot.len());
644 for (a, mset) in snapshot {
645 result.insert(a, mset.msgs);
646 }
647
648 if cur_ts.epoch() == ts.epoch() && cur_ts == ts {
649 return Ok(result);
650 }
651
652 run_head_change(
654 self.api.as_ref(),
655 &self.bls_sig_cache,
656 &self.pending_store,
657 &self.key_cache,
658 cur_ts.clone(),
659 ts.clone(),
660 &mut result,
661 )?;
662
663 Ok(result)
664 }
665
666 fn select_priority_messages(
667 &self,
668 pending: &mut Pending,
669 base_fee: &TokenAmount,
670 ts: &Tipset,
671 ) -> Result<SelectedMessages, Error> {
672 let result = Vec::with_capacity(self.config.size_limit_low() as usize);
673 let gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
674 let min_gas = 1298450;
675
676 let priority = self.config.priority_addrs();
678 let mut chains = Chains::new();
679 for actor in priority.iter() {
680 if let Some(mset) = pending.remove(actor) {
682 create_message_chains(
684 self.api.as_ref(),
685 actor,
686 &mset,
687 base_fee,
688 ts,
689 &mut chains,
690 &self.chain_config,
691 )?;
692 }
693 }
694
695 if chains.is_empty() {
696 return Ok(SelectedMessages::new(Vec::new(), gas_limit));
697 }
698
699 Ok(merge_and_trim(
700 &mut chains,
701 SelectedMessages::new(result, gas_limit),
702 base_fee,
703 min_gas,
704 ))
705 }
706}
707
708#[allow(clippy::indexing_slicing)]
710fn merge_and_trim(
711 chains: &mut Chains,
712 mut selected_msgs: SelectedMessages,
713 base_fee: &TokenAmount,
714 min_gas: u64,
715) -> SelectedMessages {
716 if chains.is_empty() {
717 return selected_msgs;
718 }
719
720 chains.sort(true);
722
723 let first_chain_gas_perf = chains[0].gas_perf;
724
725 if !chains.is_empty() && first_chain_gas_perf < 0.0 {
726 warn!(
727 "all priority messages in mpool have negative gas performance bestGasPerf: {}",
728 first_chain_gas_perf
729 );
730 return selected_msgs;
731 }
732
733 let mut last = chains.len();
736 for i in 0..chains.len() {
737 let node = &chains[i];
738
739 if node.gas_perf < 0.0 {
740 break;
741 }
742
743 if selected_msgs.try_to_add(node.clone()).is_ok() {
744 continue;
746 }
747
748 last = i;
750 break;
751 }
752
753 'tail_loop: while selected_msgs.gas_limit >= min_gas && last < chains.len() {
754 selected_msgs.trim_chain_at(chains, last, base_fee);
756
757 let node = &chains[last];
759 if node.valid {
760 for i in last..chains.len() - 1 {
761 let cur_node = &chains[i];
763 let next_node = &chains[i + 1];
764 if cur_node.compare(next_node) == Ordering::Greater {
765 break;
766 }
767
768 chains.key_vec.swap(i, i + 1);
769 }
770 }
771
772 let lst = last; for i in lst..chains.len() {
775 let chain = chains[i].clone();
776 if !chain.valid {
777 continue;
778 }
779
780 if chain.gas_perf < 0.0 {
782 break 'tail_loop;
783 }
784
785 if selected_msgs.try_to_add(chain).is_ok() {
787 continue;
789 }
790
791 last += i;
793 continue 'tail_loop;
794 }
795
796 break;
797 }
798
799 selected_msgs
800}
801
802pub(in crate::message_pool) fn run_head_change<T>(
807 api: &T,
808 bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
809 pending_store: &PendingStore,
810 key_cache: &IdToAddressCache,
811 from: Tipset,
812 to: Tipset,
813 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
814) -> Result<(), Error>
815where
816 T: Provider,
817{
818 let mut left = from;
819 let mut right = to;
820 let mut left_chain = Vec::new();
821 let mut right_chain = Vec::new();
822 while left != right {
823 if left.epoch() > right.epoch() {
824 left_chain.push(left.clone());
825 let par = api.load_tipset(left.parents())?;
826 left = par;
827 } else {
828 right_chain.push(right.clone());
829 let par = api.load_tipset(right.parents())?;
830 right = par;
831 }
832 }
833 for ts in left_chain {
834 let mut msgs: Vec<SignedMessage> = Vec::new();
835 for block in ts.block_headers() {
836 let (umsg, smsgs) = api.messages_for_block(block)?;
837 msgs.extend(smsgs);
838 for msg in umsg {
839 let msg_cid = msg.cid();
840 if let Ok(smsg) = recover_sig(bls_sig_cache, msg) {
841 msgs.push(smsg);
842 } else {
843 tracing::debug!("could not recover signature for bls message {msg_cid}");
844 }
845 }
846 }
847 for msg in msgs {
848 add_to_selected_msgs(msg, rmsgs);
849 }
850 }
851
852 for ts in right_chain {
853 let mpool_ctx = MpoolCtx {
854 api,
855 key_cache,
856 pending_store,
857 ts: &ts,
858 };
859 for b in ts.block_headers() {
860 let (msgs, smsgs) = api.messages_for_block(b)?;
861
862 for msg in smsgs {
863 mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), rmsgs)?;
864 }
865 for msg in msgs {
866 mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, rmsgs)?;
867 }
868 }
869 }
870 Ok(())
871}
872
873#[cfg(test)]
874mod test_selection {
875 use std::sync::Arc;
876
877 use super::*;
878 use crate::db::MemoryDB;
879 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
880 use crate::message_pool::msgpool::{
881 test_provider::{TestApi, mock_block},
882 tests::{create_fake_smsg, create_smsg},
883 };
884 use crate::shim::crypto::SignatureType;
885 use crate::shim::econ::BLOCK_GAS_LIMIT;
886 use tokio::task::JoinSet;
887
888 const TEST_GAS_LIMIT: i64 = 6955002;
889
890 fn make_test_mpool(joinset: &mut JoinSet<anyhow::Result<()>>) -> MessagePool<TestApi> {
891 let tma = TestApi::default();
892 let (tx, _rx) = flume::bounded(50);
893 MessagePool::new(tma, tx, Default::default(), Arc::default(), joinset).unwrap()
894 }
895
896 async fn mock_tipset(mpool: &MessagePool<TestApi>) -> Tipset {
899 let b1 = mock_block(1, 1);
900 let ts = Tipset::from(&b1);
901 mpool
902 .apply_head_change(Vec::new(), vec![Tipset::from(b1)])
903 .await
904 .unwrap();
905 ts
906 }
907
908 #[tokio::test]
909 async fn basic_message_selection() {
910 let mut joinset = JoinSet::new();
911 let mpool = make_test_mpool(&mut joinset);
912
913 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
914 let mut w1 = Wallet::new(ks1);
915 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
916
917 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
918 let mut w2 = Wallet::new(ks2);
919 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
920
921 let ts = mock_tipset(&mpool).await;
922
923 mpool
924 .api
925 .set_state_balance_raw(&a1, TokenAmount::from_whole(1));
926 mpool
927 .api
928 .set_state_balance_raw(&a2, TokenAmount::from_whole(1));
929
930 for i in 0..10 {
934 let m = create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1);
935 mpool.add(m).unwrap();
936 }
937 for i in 0..10 {
938 let m = create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1);
939 mpool.add(m).unwrap();
940 }
941
942 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
943
944 assert_eq!(msgs.len(), 20, "Expected 20 messages, got {}", msgs.len());
945
946 let mut next_nonce = 0;
947 for (i, msg) in msgs.iter().enumerate().take(10) {
948 assert_eq!(
949 msg.from(),
950 a1,
951 "first 10 returned messages should be from actor a1 {i}",
952 );
953 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
954 next_nonce += 1;
955 }
956
957 next_nonce = 0;
958 for (i, msg) in msgs.iter().enumerate().take(20).skip(10) {
959 assert_eq!(
960 msg.from(),
961 a2,
962 "next 10 returned messages should be from actor a2 {i}",
963 );
964 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
965 next_nonce += 1;
966 }
967
968 let b2 = mpool.api.next_block();
970 mpool.api.set_block_messages(&b2, msgs);
971 mpool
972 .apply_head_change(Vec::new(), vec![Tipset::from(b2)])
973 .await
974 .unwrap();
975
976 let remaining = mpool.pending_store.snapshot();
978 assert!(
979 remaining.is_empty(),
980 "Expected no pending messages, but got {}",
981 remaining.len()
982 );
983
984 let mut msgs = Vec::with_capacity(20);
986 for i in 10..20 {
987 msgs.push(create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1));
988 msgs.push(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1));
989 }
990 let b3 = mpool.api.next_block();
991 let ts3 = Tipset::from(&b3);
992 mpool.api.set_block_messages(&b3, msgs);
993
994 mpool.api.set_state_sequence(&a1, 20);
996 mpool.api.set_state_sequence(&a2, 20);
997
998 for i in 20..30 {
1000 mpool
1001 .add(create_smsg(
1002 &a2,
1003 &a1,
1004 &mut w1,
1005 i,
1006 TEST_GAS_LIMIT,
1007 2 * i + 200,
1008 ))
1009 .unwrap();
1010 mpool
1011 .add(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1))
1012 .unwrap();
1013 }
1014 mpool.api.set_state_sequence(&a1, 10);
1018 mpool.api.set_state_sequence(&a2, 10);
1019 let msgs = mpool.select_messages(&ts3, 1.0).unwrap();
1020
1021 assert_eq!(
1022 msgs.len(),
1023 20,
1024 "Expected 20 messages, but got {}",
1025 msgs.len()
1026 );
1027
1028 let mut next_nonce = 20;
1029 for msg in msgs.iter().take(10) {
1030 assert_eq!(
1031 msg.from(),
1032 a1,
1033 "first 10 returned messages should be from actor a1"
1034 );
1035 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1036 next_nonce += 1;
1037 }
1038 next_nonce = 20;
1039 for msg in msgs.iter().take(20).skip(10) {
1040 assert_eq!(
1041 msg.from(),
1042 a2,
1043 "next 10 returned messages should be from actor a2"
1044 );
1045 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1046 next_nonce += 1;
1047 }
1048 }
1049
1050 #[tokio::test]
1051 async fn message_selection_trimming_gas() {
1052 let mut joinset = JoinSet::new();
1053 let mpool = make_test_mpool(&mut joinset);
1054 let ts = mock_tipset(&mpool).await;
1055 let api = mpool.api.clone();
1056
1057 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1058 let mut w1 = Wallet::new(ks1);
1059 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1060
1061 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1062 let mut w2 = Wallet::new(ks2);
1063 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1064
1065 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1066 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1067
1068 let nmsgs = (crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT) + 1;
1069
1070 for i in 0..nmsgs {
1072 let bias = (nmsgs - i) / 3;
1073 let m = create_fake_smsg(
1074 &mpool,
1075 &a2,
1076 &a1,
1077 i as u64,
1078 TEST_GAS_LIMIT,
1079 (1 + i % 3 + bias) as u64,
1080 );
1081 mpool.add(m).unwrap();
1082 let m = create_fake_smsg(
1083 &mpool,
1084 &a1,
1085 &a2,
1086 i as u64,
1087 TEST_GAS_LIMIT,
1088 (1 + i % 3 + bias) as u64,
1089 );
1090 mpool.add(m).unwrap();
1091 }
1092
1093 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1094
1095 let expected = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1096 assert_eq!(msgs.len(), expected as usize);
1097 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1098 assert!(m_gas_limit <= crate::shim::econ::BLOCK_GAS_LIMIT);
1099 }
1100
1101 #[tokio::test]
1102 async fn message_selection_trimming_msgs_basic() {
1103 let mut joinset = JoinSet::new();
1104 let mpool = make_test_mpool(&mut joinset);
1105 let ts = mock_tipset(&mpool).await;
1106 let api = mpool.api.clone();
1107
1108 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1109 let mut wallet = Wallet::new(keystore);
1110 let address = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1111
1112 api.set_state_balance_raw(&address, TokenAmount::from_whole(1));
1113
1114 for i in 0..BLOCK_MESSAGE_LIMIT {
1116 let msg = create_fake_smsg(&mpool, &address, &address, i as u64, 200_000, 100);
1117 mpool.add(msg).unwrap();
1118 }
1119
1120 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1121 assert_eq!(
1122 msgs.len(),
1123 CBOR_GEN_LIMIT,
1124 "Expected {CBOR_GEN_LIMIT} messages, got {}",
1125 msgs.len()
1126 );
1127
1128 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1130 assert!(
1131 m_gas_limit <= BLOCK_GAS_LIMIT,
1132 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1133 );
1134 }
1135
1136 #[tokio::test]
1137 async fn message_selection_trimming_msgs_two_senders() {
1138 let mut joinset = JoinSet::new();
1139 let mpool = make_test_mpool(&mut joinset);
1140 let ts = mock_tipset(&mpool).await;
1141 let api = mpool.api.clone();
1142
1143 let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1144 let mut wallet_1 = Wallet::new(keystore_1);
1145 let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1146
1147 let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1148 let mut wallet_2 = Wallet::new(keystore_2);
1149 let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1150
1151 api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1152 api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1153
1154 for i in 0..BLOCK_MESSAGE_LIMIT {
1156 let msg = create_smsg(
1157 &address_2,
1158 &address_1,
1159 &mut wallet_1,
1160 i as u64,
1161 300_000,
1162 100,
1163 );
1164 mpool.add(msg).unwrap();
1165 let msg = create_smsg(
1168 &address_1,
1169 &address_2,
1170 &mut wallet_2,
1171 i as u64,
1172 300_000,
1173 1000,
1174 );
1175 mpool.add(msg).unwrap();
1176 }
1177 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1178 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1180 assert!(
1181 m_gas_limit <= BLOCK_GAS_LIMIT,
1182 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1183 );
1184 let bls_msgs = msgs.iter().filter(|m| m.is_bls()).count();
1185 assert_eq!(
1186 CBOR_GEN_LIMIT, bls_msgs,
1187 "Expected {CBOR_GEN_LIMIT} bls messages, got {bls_msgs}."
1188 );
1189 assert_eq!(
1190 msgs.len(),
1191 BLOCK_MESSAGE_LIMIT,
1192 "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1193 msgs.len()
1194 );
1195 }
1196
1197 #[tokio::test]
1198 async fn message_selection_trimming_msgs_two_senders_complex() {
1199 let mut joinset = JoinSet::new();
1200 let mpool = make_test_mpool(&mut joinset);
1201 let ts = mock_tipset(&mpool).await;
1202 let api = mpool.api.clone();
1203
1204 let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1205 let mut wallet_1 = Wallet::new(keystore_1);
1206 let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1207
1208 let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1209 let mut wallet_2 = Wallet::new(keystore_2);
1210 let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1211
1212 api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1213 api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1214
1215 let mut counter = 0;
1217 for i in 0..CBOR_GEN_LIMIT {
1218 counter += 1;
1219 let msg = create_smsg(
1220 &address_2,
1221 &address_1,
1222 &mut wallet_1,
1223 i as u64,
1224 300_000,
1225 100,
1226 );
1227 mpool.add(msg).unwrap();
1228 let msg = create_smsg(
1231 &address_1,
1232 &address_2,
1233 &mut wallet_2,
1234 i as u64,
1235 300_000,
1236 100,
1237 );
1238 mpool.add(msg).unwrap();
1239 }
1240
1241 let msg = create_smsg(
1243 &address_2,
1244 &address_1,
1245 &mut wallet_1,
1246 counter as u64,
1247 300_000,
1248 1000,
1249 );
1250 mpool.add(msg).unwrap();
1251
1252 let msg = create_smsg(
1253 &address_1,
1254 &address_2,
1255 &mut wallet_2,
1256 counter as u64,
1257 300_000,
1258 100,
1259 );
1260 mpool.add(msg).unwrap();
1261
1262 counter += 1;
1263
1264 let msg = create_smsg(
1266 &address_2,
1267 &address_1,
1268 &mut wallet_1,
1269 counter as u64,
1270 400_000,
1271 1_000_000,
1272 );
1273 mpool.add(msg).unwrap();
1274
1275 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1276 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1278 assert!(
1279 m_gas_limit <= BLOCK_GAS_LIMIT,
1280 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1281 );
1282 let secps_len = msgs.iter().filter(|m| m.is_secp256k1()).count();
1284 assert_eq!(
1285 CBOR_GEN_LIMIT, secps_len,
1286 "Expected {CBOR_GEN_LIMIT} secp messages, got {secps_len}."
1287 );
1288 assert_eq!(
1290 msgs.len(),
1291 BLOCK_MESSAGE_LIMIT,
1292 "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1293 msgs.len()
1294 );
1295 }
1296
1297 #[tokio::test]
1298 async fn message_selection_priority() {
1299 let db = MemoryDB::default();
1300
1301 let mut joinset = JoinSet::new();
1302 let mut mpool = make_test_mpool(&mut joinset);
1303 let ts = mock_tipset(&mpool).await;
1304 let api = mpool.api.clone();
1305
1306 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1307 let mut w1 = Wallet::new(ks1);
1308 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1309
1310 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1311 let mut w2 = Wallet::new(ks2);
1312 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1313
1314 let mut mpool_cfg = mpool.get_config().clone();
1316 mpool_cfg.priority_addrs.push(a1);
1317 mpool.set_config(&db, mpool_cfg).unwrap();
1318
1319 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1321 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1322
1323 let nmsgs = 10;
1324
1325 for i in 0..nmsgs {
1327 let bias = (nmsgs - i) / 3;
1328 let m = create_smsg(
1329 &a2,
1330 &a1,
1331 &mut w1,
1332 i as u64,
1333 TEST_GAS_LIMIT,
1334 (1 + i % 3 + bias) as u64,
1335 );
1336 mpool.add(m).unwrap();
1337 let m = create_smsg(
1338 &a1,
1339 &a2,
1340 &mut w2,
1341 i as u64,
1342 TEST_GAS_LIMIT,
1343 (1 + i % 3 + bias) as u64,
1344 );
1345 mpool.add(m).unwrap();
1346 }
1347
1348 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1349
1350 assert_eq!(msgs.len(), 20);
1351
1352 let mut next_nonce = 0;
1353 for msg in msgs.iter().take(10) {
1354 assert_eq!(
1355 msg.from(),
1356 a1,
1357 "first 10 returned messages should be from actor a1"
1358 );
1359 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1360 next_nonce += 1;
1361 }
1362 next_nonce = 0;
1363 for msg in msgs.iter().take(20).skip(10) {
1364 assert_eq!(
1365 msg.from(),
1366 a2,
1367 "next 10 returned messages should be from actor a2"
1368 );
1369 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1370 next_nonce += 1;
1371 }
1372 }
1373
1374 #[tokio::test]
1375 async fn test_optimal_msg_selection1() {
1376 let mut joinset = JoinSet::new();
1380 let mpool = make_test_mpool(&mut joinset);
1381 let ts = mock_tipset(&mpool).await;
1382 let api = mpool.api.clone();
1383
1384 let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1386 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1387 let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1388 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1389
1390 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1391 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1392
1393 let n_msgs = 10 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1394
1395 for i in 0..(n_msgs as usize) {
1399 let bias = (n_msgs as usize - i) / 3;
1400 let m = create_fake_smsg(
1401 &mpool,
1402 &a2,
1403 &a1,
1404 i as u64,
1405 TEST_GAS_LIMIT,
1406 (1 + i % 3 + bias) as u64,
1407 );
1408 mpool.add(m).unwrap();
1409 }
1410
1411 let msgs = mpool.select_messages(&ts, 0.25).unwrap();
1412
1413 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1414
1415 assert_eq!(msgs.len(), expected_msgs as usize);
1416
1417 for (next_nonce, m) in msgs.into_iter().enumerate() {
1418 assert_eq!(m.from(), a1, "Expected message from a1");
1419 assert_eq!(
1420 m.message().sequence,
1421 next_nonce as u64,
1422 "expected nonce {} but got {}",
1423 next_nonce,
1424 m.message().sequence
1425 );
1426 }
1427 }
1428
1429 #[tokio::test]
1430 async fn test_optimal_msg_selection2() {
1431 let mut joinset = JoinSet::new();
1432 let mpool = make_test_mpool(&mut joinset);
1437 let ts = mock_tipset(&mpool).await;
1438 let api = mpool.api.clone();
1439
1440 let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1442 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1443 let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1444 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1445
1446 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1)); api.set_state_balance_raw(&a2, TokenAmount::from_whole(1)); let n_msgs = 5 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1450 for i in 0..n_msgs as usize {
1451 let bias = (n_msgs as usize - i) / 3;
1452 let m = create_fake_smsg(
1453 &mpool,
1454 &a2,
1455 &a1,
1456 i as u64,
1457 TEST_GAS_LIMIT,
1458 (200000 + i % 3 + bias) as u64,
1459 );
1460 mpool.add(m).unwrap();
1461 let m = create_fake_smsg(
1462 &mpool,
1463 &a1,
1464 &a2,
1465 i as u64,
1466 TEST_GAS_LIMIT,
1467 (190000 + i % 3 + bias) as u64,
1468 );
1469 mpool.add(m).unwrap();
1470 }
1471
1472 let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1473
1474 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1475 assert_eq!(
1476 msgs.len(),
1477 expected_msgs as usize,
1478 "Expected {} messages, but got {}",
1479 expected_msgs,
1480 msgs.len()
1481 );
1482
1483 let mut n_from1 = 0;
1484 let mut n_from2 = 0;
1485 let mut next_nonce1 = 0;
1486 let mut next_nonce2 = 0;
1487
1488 for m in msgs {
1489 if m.from() == a1 {
1490 if m.message.sequence != next_nonce1 {
1491 panic!(
1492 "Expected nonce {}, but got {}",
1493 next_nonce1, m.message.sequence
1494 );
1495 }
1496 next_nonce1 += 1;
1497 n_from1 += 1;
1498 } else {
1499 if m.message.sequence != next_nonce2 {
1500 panic!(
1501 "Expected nonce {}, but got {}",
1502 next_nonce2, m.message.sequence
1503 );
1504 }
1505 next_nonce2 += 1;
1506 n_from2 += 1;
1507 }
1508 }
1509
1510 if n_from1 > n_from2 {
1511 panic!("Expected more msgs from a2 than a1");
1512 }
1513 }
1514
1515 #[tokio::test]
1516 async fn test_optimal_msg_selection3() {
1517 let mut joinset = JoinSet::new();
1518 let mpool = make_test_mpool(&mut joinset);
1524 let ts = mock_tipset(&mpool).await;
1525 let api = mpool.api.clone();
1526
1527 let n_actors = 10;
1528
1529 let mut actors = vec![];
1530 let mut wallets = vec![];
1531
1532 for _ in 0..n_actors {
1533 let mut wallet = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1534 let actor = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1535
1536 actors.push(actor);
1537 wallets.push(wallet);
1538 }
1539
1540 for a in &mut actors {
1541 api.set_state_balance_raw(a, TokenAmount::from_whole(1));
1542 }
1543
1544 let n_msgs = 1 + crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1545 for i in 0..n_msgs {
1546 for j in 0..n_actors {
1547 let premium =
1548 500000 + 10000 * (n_actors - j) + (n_msgs + 2 - i) / (30 * n_actors) + i % 3;
1549 let m = create_fake_smsg(
1550 &mpool,
1551 &actors[j as usize],
1552 &actors[j as usize],
1553 i as u64,
1554 TEST_GAS_LIMIT,
1555 premium as u64,
1556 );
1557 mpool.add(m).unwrap();
1558 }
1559 }
1560
1561 let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1562 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1563
1564 assert_eq!(
1565 msgs.len(),
1566 expected_msgs as usize,
1567 "Expected {} messages, but got {}",
1568 expected_msgs,
1569 msgs.len()
1570 );
1571
1572 let who_is = |addr| -> usize {
1573 for (i, a) in actors.iter().enumerate() {
1574 if a == &addr {
1575 return i;
1576 }
1577 }
1578 9999999
1581 };
1582
1583 let mut nonces = vec![0; n_actors as usize];
1584 for m in &msgs {
1585 let who = who_is(m.from());
1586 if who < 3 {
1587 panic!("got message from {who}th actor",);
1588 }
1589
1590 let next_nonce: u64 = nonces[who];
1591 if m.message.sequence != next_nonce {
1592 panic!(
1593 "expected nonce {} but got {}",
1594 next_nonce, m.message.sequence
1595 );
1596 }
1597 nonces[who] += 1;
1598 }
1599 }
1600}