1use std::{borrow::BorrowMut, cmp::Ordering};
10
11use crate::blocks::{BLOCK_MESSAGE_LIMIT, Tipset};
12use crate::message::{Message, SignedMessage};
13use crate::message_pool::msg_chain::MsgChainNode;
14use crate::shim::crypto::SignatureType;
15use crate::shim::{address::Address, econ::TokenAmount};
16use ahash::{HashMap, HashMapExt};
17use anyhow::{Context, bail, ensure};
18use parking_lot::RwLock;
19use rand::prelude::SliceRandom;
20use tracing::{debug, error, warn};
21
22use super::{msg_pool::MessagePool, provider::Provider};
23use crate::message_pool::{
24 Error, add_to_selected_msgs,
25 msg_chain::{Chains, NodeKey, create_message_chains},
26 msg_pool::MsgSet,
27 msgpool::MIN_GAS,
28 remove_from_selected_msgs,
29};
30
31type Pending = HashMap<Address, HashMap<u64, SignedMessage>>;
32
33const MAX_BLOCK_MSGS: usize = 16000;
35const MAX_BLOCKS: usize = 15;
36const CBOR_GEN_LIMIT: usize = 8192; struct SelectedMessages {
43 msgs: Vec<SignedMessage>,
45 gas_limit: u64,
47 secp_limit: u64,
49 bls_limit: u64,
51}
52
53impl Default for SelectedMessages {
54 fn default() -> Self {
55 SelectedMessages {
56 msgs: Vec::new(),
57 gas_limit: crate::shim::econ::BLOCK_GAS_LIMIT,
58 secp_limit: CBOR_GEN_LIMIT as u64,
59 bls_limit: CBOR_GEN_LIMIT as u64,
60 }
61 }
62}
63
64impl SelectedMessages {
65 fn new(msgs: Vec<SignedMessage>, gas_limit: u64) -> Self {
66 SelectedMessages {
67 msgs,
68 gas_limit,
69 ..Default::default()
70 }
71 }
72
73 fn len(&self) -> usize {
75 self.msgs.len()
76 }
77
78 fn truncate(&mut self, len: usize) {
80 self.msgs.truncate(len);
81 }
82
83 fn reduce_gas_limit(&mut self, gas: u64) {
86 self.gas_limit = self.gas_limit.saturating_sub(gas);
87 }
88
89 fn reduce_bls_limit(&mut self, bls: u64) {
92 self.bls_limit = self.bls_limit.saturating_sub(bls);
93 }
94
95 fn reduce_secp_limit(&mut self, secp: u64) {
98 self.secp_limit = self.secp_limit.saturating_sub(secp);
99 }
100
101 fn extend(&mut self, msgs: Vec<SignedMessage>) {
103 self.msgs.extend(msgs);
104 }
105
106 fn try_to_add(&mut self, message_chain: MsgChainNode) -> anyhow::Result<()> {
109 let msg_chain_len = message_chain.msgs.len();
110 ensure!(
111 BLOCK_MESSAGE_LIMIT >= msg_chain_len + self.len(),
112 "Message chain is too long to fit in the block: {} messages, limit is {BLOCK_MESSAGE_LIMIT}",
113 msg_chain_len + self.len(),
114 );
115 ensure!(
116 self.gas_limit >= message_chain.gas_limit,
117 "Message chain gas limit is too high: {} gas, limit is {}",
118 message_chain.gas_limit,
119 self.gas_limit
120 );
121
122 match message_chain.sig_type {
123 Some(SignatureType::Bls) => {
124 ensure!(
125 self.bls_limit >= msg_chain_len as u64,
126 "BLS limit is too low: {msg_chain_len} messages, limit is {}",
127 self.bls_limit
128 );
129
130 self.extend(message_chain.msgs);
131 self.reduce_bls_limit(msg_chain_len as u64);
132 self.reduce_gas_limit(message_chain.gas_limit);
133 }
134 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
135 ensure!(
136 self.secp_limit >= msg_chain_len as u64,
137 "Secp256k1 limit is too low: {msg_chain_len} messages, limit is {}",
138 self.secp_limit
139 );
140
141 self.extend(message_chain.msgs);
142 self.reduce_secp_limit(msg_chain_len as u64);
143 self.reduce_gas_limit(message_chain.gas_limit);
144 }
145 None => {
146 warn!("Tried to add a message chain with no signature type");
149 }
150 }
151 Ok(())
152 }
153
154 fn try_to_add_with_deps(
157 &mut self,
158 idx: usize,
159 chains: &mut Chains,
160 base_fee: &TokenAmount,
161 ) -> anyhow::Result<()> {
162 let message_chain = chains
163 .get_mut_at(idx)
164 .context("Couldn't find required message chain")?;
165 let mut chain_gas_limit = message_chain.gas_limit;
167 let mut chain_msg_limit = message_chain.msgs.len();
168 let mut dep_gas_limit = 0;
169 let mut dep_message_limit = 0;
170 let selected_messages_limit = match message_chain.sig_type {
171 Some(SignatureType::Bls) => self.bls_limit as usize,
172 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
173 self.secp_limit as usize
174 }
175 None => {
176 bail!("Tried to add a message chain with no signature type");
179 }
180 };
181 let selected_messages_limit =
182 selected_messages_limit.min(BLOCK_MESSAGE_LIMIT.saturating_sub(self.len()));
183
184 let mut chain_deps = vec![];
185 let mut cur_chain = message_chain.prev;
186 let _ = message_chain; while let Some(cur_chn) = cur_chain {
188 let node = chains.get(cur_chn).unwrap();
189 if !node.merged {
190 chain_deps.push(cur_chn);
191 chain_gas_limit += node.gas_limit;
192 chain_msg_limit += node.msgs.len();
193 dep_gas_limit += node.gas_limit;
194 dep_message_limit += node.msgs.len();
195 cur_chain = node.prev;
196 } else {
197 break;
198 }
199 }
200
201 if chain_gas_limit > self.gas_limit || chain_msg_limit > selected_messages_limit {
203 if dep_gas_limit > self.gas_limit || dep_message_limit >= selected_messages_limit {
209 chains.invalidate(chains.get_key_at(idx));
210 } else {
211 chains.trim_msgs_at(
213 idx,
214 self.gas_limit.saturating_sub(dep_gas_limit),
215 selected_messages_limit.saturating_sub(dep_message_limit),
216 base_fee,
217 );
218 }
219
220 bail!("Chain doesn't fit in the block");
221 }
222 for dep in chain_deps.iter().rev() {
223 let cur_chain = chains.get_mut(*dep);
224 if let Some(node) = cur_chain {
225 node.merged = true;
226 self.extend(node.msgs.clone());
227 } else {
228 bail!("Couldn't find required dependent message chain");
229 }
230 }
231
232 let message_chain = chains
233 .get_mut_at(idx)
234 .context("Couldn't find required message chain")?;
235 message_chain.merged = true;
236 self.extend(message_chain.msgs.clone());
237 self.reduce_gas_limit(chain_gas_limit);
238
239 match message_chain.sig_type {
240 Some(SignatureType::Bls) => {
241 self.reduce_bls_limit(chain_msg_limit as u64);
242 }
243 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
244 self.reduce_secp_limit(chain_msg_limit as u64);
245 }
246 None => {
247 bail!("Tried to add a message chain with no signature type");
250 }
251 }
252
253 Ok(())
254 }
255
256 fn trim_chain_at(&mut self, chains: &mut Chains, idx: usize, base_fee: &TokenAmount) {
257 let message_chain = match chains.get_at(idx) {
258 Some(message_chain) => message_chain,
259 None => {
260 error!("Tried to trim a message chain that doesn't exist");
261 return;
262 }
263 };
264 let msg_limit = BLOCK_MESSAGE_LIMIT.saturating_sub(self.len());
265 let msg_limit = match message_chain.sig_type {
266 Some(SignatureType::Bls) => std::cmp::min(self.bls_limit, msg_limit as u64),
267 Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
268 std::cmp::min(self.secp_limit, msg_limit as u64)
269 }
270 _ => {
271 error!("Tried to trim a message chain with no signature type");
274 return;
275 }
276 };
277
278 if message_chain.gas_limit > self.gas_limit || message_chain.msgs.len() > msg_limit as usize
279 {
280 chains.trim_msgs_at(idx, self.gas_limit, msg_limit as usize, base_fee);
281 }
282 }
283}
284
285impl<T> MessagePool<T>
286where
287 T: Provider,
288{
289 pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result<Vec<SignedMessage>, Error> {
293 let cur_ts = self.current_tipset();
294 let mut msgs = if tq > 0.84 {
300 self.select_messages_greedy(&cur_ts, ts)
301 } else {
302 self.select_messages_optimal(&cur_ts, ts, tq)
303 }?;
304
305 if msgs.len() > MAX_BLOCK_MSGS {
306 warn!(
307 "Message selection chose too many messages: {} > {MAX_BLOCK_MSGS}",
308 msgs.len(),
309 );
310 msgs.truncate(MAX_BLOCK_MSGS)
311 }
312
313 Ok(msgs.msgs)
314 }
315
316 fn select_messages_greedy(
317 &self,
318 cur_ts: &Tipset,
319 ts: &Tipset,
320 ) -> Result<SelectedMessages, Error> {
321 let base_fee = self.api.chain_compute_base_fee(ts)?;
322
323 let mut pending = self.get_pending_messages(cur_ts, ts)?;
326
327 if pending.is_empty() {
328 return Ok(SelectedMessages::default());
329 }
330
331 let selected_msgs = self.select_priority_messages(&mut pending, &base_fee, ts)?;
333
334 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
336 return Ok(selected_msgs);
337 }
338
339 let mut chains = Chains::new();
342 for (actor, mset) in pending.into_iter() {
343 create_message_chains(
344 self.api.as_ref(),
345 &actor,
346 &mset,
347 &base_fee,
348 ts,
349 &mut chains,
350 &self.chain_config,
351 )?;
352 }
353
354 Ok(merge_and_trim(
355 &mut chains,
356 selected_msgs,
357 &base_fee,
358 MIN_GAS,
359 ))
360 }
361
362 #[allow(clippy::indexing_slicing)]
363 fn select_messages_optimal(
364 &self,
365 cur_ts: &Tipset,
366 target_tipset: &Tipset,
367 ticket_quality: f64,
368 ) -> Result<SelectedMessages, Error> {
369 let base_fee = self.api.chain_compute_base_fee(target_tipset)?;
370
371 let mut pending = self.get_pending_messages(cur_ts, target_tipset)?;
374
375 if pending.is_empty() {
376 return Ok(SelectedMessages::default());
377 }
378
379 let mut selected_msgs =
381 self.select_priority_messages(&mut pending, &base_fee, target_tipset)?;
382
383 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
385 return Ok(selected_msgs);
386 }
387
388 let mut chains = Chains::new();
391 for (actor, mset) in pending.into_iter() {
392 create_message_chains(
393 self.api.as_ref(),
394 &actor,
395 &mset,
396 &base_fee,
397 target_tipset,
398 &mut chains,
399 &self.chain_config,
400 )?;
401 }
402
403 chains.sort(false);
405
406 if chains.get_at(0).is_some_and(|it| it.gas_perf < 0.0) {
407 tracing::warn!(
408 "all messages in mpool have non-positive gas performance {}",
409 chains[0].gas_perf
410 );
411 return Ok(selected_msgs);
412 }
413
414 let mut next_chain = 0;
419 let mut partitions: Vec<Vec<NodeKey>> = vec![vec![]; MAX_BLOCKS];
420 let mut i = 0;
421 while i < MAX_BLOCKS && next_chain < chains.len() {
422 let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
423 let mut msg_limit = BLOCK_MESSAGE_LIMIT;
424 while next_chain < chains.len() {
425 let chain_key = chains.key_vec[next_chain];
426 next_chain += 1;
427 partitions[i].push(chain_key);
428 let chain = chains.get(chain_key).unwrap();
429 let chain_gas_limit = chain.gas_limit;
430 if gas_limit < chain_gas_limit {
431 break;
432 }
433 gas_limit = gas_limit.saturating_sub(chain_gas_limit);
434 msg_limit = msg_limit.saturating_sub(chain.msgs.len());
435 if gas_limit < MIN_GAS || msg_limit == 0 {
436 break;
437 }
438 }
439 i += 1;
440 }
441
442 let block_prob = crate::message_pool::block_probabilities(ticket_quality);
446 let mut eff_chains = 0;
447 for i in 0..MAX_BLOCKS {
448 for k in &partitions[i] {
449 if let Some(node) = chains.get_mut(*k) {
450 node.eff_perf = node.gas_perf * block_prob[i];
451 }
452 }
453 eff_chains += partitions[i].len();
454 }
455
456 for i in eff_chains..chains.len() {
458 if let Some(node) = chains.get_mut_at(i) {
459 node.set_null_effective_perf();
460 }
461 }
462
463 chains.sort_effective();
465
466 let mut last = chains.len();
471 for i in 0..chains.len() {
472 if chains[i].gas_perf < 0.0 {
474 break;
475 }
476
477 if chains[i].merged {
479 continue;
480 }
481
482 match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
483 Ok(_) => {
484 if let Some(next_key) = chains[i].next {
486 let next_node = chains.get_mut(next_key).unwrap();
487 if next_node.eff_perf > 0.0 {
488 next_node.eff_perf += next_node.parent_offset;
489 let mut next_next_key = next_node.next;
490 while let Some(nnk) = next_next_key {
491 let (nn_node, prev_perfs) = chains.get_mut_with_prev_eff(nnk);
492 if let Some(nn_node) = nn_node {
493 if nn_node.eff_perf > 0.0 {
494 nn_node.set_eff_perf(prev_perfs);
495 next_next_key = nn_node.next;
496 } else {
497 break;
498 }
499 } else {
500 break;
501 }
502 }
503 }
504 }
505
506 chains.sort_range_effective(i + 1..);
510
511 continue;
512 }
513 Err(e) => {
514 debug!("Failed to add message chain with dependencies: {e}");
515 }
516 }
517
518 last = i;
521 break;
522 }
523
524 'tail_loop: while selected_msgs.gas_limit >= MIN_GAS && last < chains.len() {
533 if !chains[last].valid {
534 last += 1;
536 continue;
537 }
538
539 selected_msgs.trim_chain_at(&mut chains, last, &base_fee);
541
542 if chains[last].valid {
544 for i in last..chains.len() - 1 {
545 if chains[i].cmp_effective(&chains[i + 1]) == Ordering::Greater {
546 break;
547 }
548 chains.key_vec.swap(i, i + 1);
549 }
550 }
551
552 let lst = last; for i in lst..chains.len() {
555 let chain = &mut chains[i];
556 if !chain.valid {
558 continue;
559 }
560
561 if chain.merged {
563 continue;
564 }
565
566 if chain.gas_perf < 0.0 {
568 break 'tail_loop;
569 }
570
571 match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
572 Ok(_) => continue,
573 Err(e) => debug!("Failed to add message chain with dependencies: {e}"),
574 }
575
576 continue 'tail_loop;
577 }
578
579 break;
582 }
583
584 if selected_msgs.gas_limit >= MIN_GAS && selected_msgs.msgs.len() <= BLOCK_MESSAGE_LIMIT {
589 let pre_random_length = selected_msgs.len();
590
591 chains
592 .key_vec
593 .shuffle(&mut crate::utils::rand::forest_rng());
594
595 for i in 0..chains.len() {
596 if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
597 break;
598 }
599
600 if chains[i].merged || !chains[i].valid {
602 continue;
603 }
604
605 if chains[i].gas_perf < 0.0 {
607 continue;
608 }
609
610 if selected_msgs
611 .try_to_add_with_deps(i, &mut chains, &base_fee)
612 .is_ok()
613 {
614 continue;
616 }
617
618 if chains[i].valid {
619 selected_msgs
622 .try_to_add_with_deps(i, &mut chains, &base_fee)
623 .context("Failed to add message chain with dependencies")?;
624 continue;
625 }
626 }
627
628 if selected_msgs.len() != pre_random_length {
629 tracing::warn!(
630 "optimal selection failed to pack a block; picked {} messages with random selection",
631 selected_msgs.len() - pre_random_length
632 );
633 }
634 }
635
636 Ok(selected_msgs)
637 }
638
639 fn get_pending_messages(&self, cur_ts: &Tipset, ts: &Tipset) -> Result<Pending, Error> {
640 let mut result: Pending = HashMap::new();
641 let mut in_sync = false;
642 if cur_ts.epoch() == ts.epoch() && cur_ts == ts {
643 in_sync = true;
644 }
645
646 for (a, mset) in self.pending.read().iter() {
647 if in_sync {
648 result.insert(*a, mset.msgs.clone());
649 } else {
650 let mut mset_copy = HashMap::new();
651 for (nonce, m) in mset.msgs.iter() {
652 mset_copy.insert(*nonce, m.clone());
653 }
654 result.insert(*a, mset_copy);
655 }
656 }
657
658 if in_sync {
659 return Ok(result);
660 }
661
662 run_head_change(
664 self.api.as_ref(),
665 &self.pending,
666 cur_ts.clone(),
667 ts.clone(),
668 &mut result,
669 )?;
670
671 Ok(result)
672 }
673
674 fn select_priority_messages(
675 &self,
676 pending: &mut Pending,
677 base_fee: &TokenAmount,
678 ts: &Tipset,
679 ) -> Result<SelectedMessages, Error> {
680 let result = Vec::with_capacity(self.config.size_limit_low() as usize);
681 let gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
682 let min_gas = 1298450;
683
684 let priority = self.config.priority_addrs();
686 let mut chains = Chains::new();
687 for actor in priority.iter() {
688 if let Some(mset) = pending.remove(actor) {
690 create_message_chains(
692 self.api.as_ref(),
693 actor,
694 &mset,
695 base_fee,
696 ts,
697 &mut chains,
698 &self.chain_config,
699 )?;
700 }
701 }
702
703 if chains.is_empty() {
704 return Ok(SelectedMessages::new(Vec::new(), gas_limit));
705 }
706
707 Ok(merge_and_trim(
708 &mut chains,
709 SelectedMessages::new(result, gas_limit),
710 base_fee,
711 min_gas,
712 ))
713 }
714}
715
716#[allow(clippy::indexing_slicing)]
718fn merge_and_trim(
719 chains: &mut Chains,
720 mut selected_msgs: SelectedMessages,
721 base_fee: &TokenAmount,
722 min_gas: u64,
723) -> SelectedMessages {
724 if chains.is_empty() {
725 return selected_msgs;
726 }
727
728 chains.sort(true);
730
731 let first_chain_gas_perf = chains[0].gas_perf;
732
733 if !chains.is_empty() && first_chain_gas_perf < 0.0 {
734 warn!(
735 "all priority messages in mpool have negative gas performance bestGasPerf: {}",
736 first_chain_gas_perf
737 );
738 return selected_msgs;
739 }
740
741 let mut last = chains.len();
744 for i in 0..chains.len() {
745 let node = &chains[i];
746
747 if node.gas_perf < 0.0 {
748 break;
749 }
750
751 if selected_msgs.try_to_add(node.clone()).is_ok() {
752 continue;
754 }
755
756 last = i;
758 break;
759 }
760
761 'tail_loop: while selected_msgs.gas_limit >= min_gas && last < chains.len() {
762 selected_msgs.trim_chain_at(chains, last, base_fee);
764
765 let node = &chains[last];
767 if node.valid {
768 for i in last..chains.len() - 1 {
769 let cur_node = &chains[i];
771 let next_node = &chains[i + 1];
772 if cur_node.compare(next_node) == Ordering::Greater {
773 break;
774 }
775
776 chains.key_vec.swap(i, i + 1);
777 }
778 }
779
780 let lst = last; for i in lst..chains.len() {
783 let chain = chains[i].clone();
784 if !chain.valid {
785 continue;
786 }
787
788 if chain.gas_perf < 0.0 {
790 break 'tail_loop;
791 }
792
793 if selected_msgs.try_to_add(chain).is_ok() {
795 continue;
797 }
798
799 last += i;
801 continue 'tail_loop;
802 }
803
804 break;
805 }
806
807 selected_msgs
808}
809
810pub(in crate::message_pool) fn run_head_change<T>(
815 api: &T,
816 pending: &RwLock<HashMap<Address, MsgSet>>,
817 from: Tipset,
818 to: Tipset,
819 rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
820) -> Result<(), Error>
821where
822 T: Provider,
823{
824 let mut left = from;
825 let mut right = to;
826 let mut left_chain = Vec::new();
827 let mut right_chain = Vec::new();
828 while left != right {
829 if left.epoch() > right.epoch() {
830 left_chain.push(left.clone());
831 let par = api.load_tipset(left.parents())?;
832 left = par;
833 } else {
834 right_chain.push(right.clone());
835 let par = api.load_tipset(right.parents())?;
836 right = par;
837 }
838 }
839 for ts in left_chain {
840 let mut msgs: Vec<SignedMessage> = Vec::new();
841 for block in ts.block_headers() {
842 let (_, smsgs) = api.messages_for_block(block)?;
843 msgs.extend(smsgs);
844 }
845 for msg in msgs {
846 add_to_selected_msgs(msg, rmsgs);
847 }
848 }
849
850 for ts in right_chain {
851 for b in ts.block_headers() {
852 let (msgs, smsgs) = api.messages_for_block(b)?;
853
854 for msg in smsgs {
855 remove_from_selected_msgs(
856 &msg.from(),
857 pending,
858 msg.sequence(),
859 rmsgs.borrow_mut(),
860 )?;
861 }
862 for msg in msgs {
863 remove_from_selected_msgs(&msg.from, pending, msg.sequence, rmsgs.borrow_mut())?;
864 }
865 }
866 }
867 Ok(())
868}
869
870#[cfg(test)]
871mod test_selection {
872 use std::sync::Arc;
873
874 use crate::db::MemoryDB;
875 use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
876 use crate::message::Message;
877 use crate::shim::crypto::SignatureType;
878 use crate::shim::econ::BLOCK_GAS_LIMIT;
879 use tokio::task::JoinSet;
880
881 use super::*;
882 use crate::message_pool::{
883 head_change,
884 msgpool::{
885 test_provider::{TestApi, mock_block},
886 tests::{create_fake_smsg, create_smsg},
887 },
888 };
889
890 const TEST_GAS_LIMIT: i64 = 6955002;
891
892 fn make_test_mpool(joinset: &mut JoinSet<anyhow::Result<()>>) -> MessagePool<TestApi> {
893 let tma = TestApi::default();
894 let (tx, _rx) = flume::bounded(50);
895 MessagePool::new(tma, tx, Default::default(), Arc::default(), joinset).unwrap()
896 }
897
898 async fn mock_tipset(mpool: &mut MessagePool<TestApi>) -> Tipset {
901 let b1 = mock_block(1, 1);
902 let ts = Tipset::from(&b1);
903 let api = mpool.api.clone();
904 let bls_sig_cache = mpool.bls_sig_cache.clone();
905 let pending = mpool.pending.clone();
906 let cur_tipset = mpool.cur_tipset.clone();
907 let repub_trigger = mpool.repub_trigger.clone();
908 let republished = mpool.republished.clone();
909
910 head_change(
911 api.as_ref(),
912 bls_sig_cache.as_ref(),
913 repub_trigger.clone(),
914 republished.as_ref(),
915 pending.as_ref(),
916 cur_tipset.as_ref(),
917 Vec::new(),
918 vec![Tipset::from(b1)],
919 )
920 .await
921 .unwrap();
922
923 ts
924 }
925
926 #[tokio::test]
927 async fn basic_message_selection() {
928 let mut joinset = JoinSet::new();
929 let mpool = make_test_mpool(&mut joinset);
930
931 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
932 let mut w1 = Wallet::new(ks1);
933 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
934
935 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
936 let mut w2 = Wallet::new(ks2);
937 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
938
939 let b1 = mock_block(1, 1);
940 let ts = Tipset::from(&b1);
941 let api = mpool.api.clone();
942 let bls_sig_cache = mpool.bls_sig_cache.clone();
943 let pending = mpool.pending.clone();
944 let cur_tipset = mpool.cur_tipset.clone();
945 let repub_trigger = mpool.repub_trigger.clone();
946 let republished = mpool.republished.clone();
947
948 head_change(
949 api.as_ref(),
950 bls_sig_cache.as_ref(),
951 repub_trigger.clone(),
952 republished.as_ref(),
953 pending.as_ref(),
954 cur_tipset.as_ref(),
955 Vec::new(),
956 vec![Tipset::from(b1)],
957 )
958 .await
959 .unwrap();
960
961 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
962 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
963
964 for i in 0..10 {
968 let m = create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1);
969 mpool.add(m).unwrap();
970 }
971 for i in 0..10 {
972 let m = create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1);
973 mpool.add(m).unwrap();
974 }
975
976 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
977
978 assert_eq!(msgs.len(), 20, "Expected 20 messages, got {}", msgs.len());
979
980 let mut next_nonce = 0;
981 for (i, msg) in msgs.iter().enumerate().take(10) {
982 assert_eq!(
983 msg.from(),
984 a1,
985 "first 10 returned messages should be from actor a1 {i}",
986 );
987 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
988 next_nonce += 1;
989 }
990
991 next_nonce = 0;
992 for (i, msg) in msgs.iter().enumerate().take(20).skip(10) {
993 assert_eq!(
994 msg.from(),
995 a2,
996 "next 10 returned messages should be from actor a2 {i}",
997 );
998 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
999 next_nonce += 1;
1000 }
1001
1002 let b2 = mpool.api.next_block();
1004 mpool.api.set_block_messages(&b2, msgs);
1005 head_change(
1006 api.as_ref(),
1007 bls_sig_cache.as_ref(),
1008 repub_trigger.clone(),
1009 republished.as_ref(),
1010 pending.as_ref(),
1011 cur_tipset.as_ref(),
1012 Vec::new(),
1013 vec![Tipset::from(b2)],
1014 )
1015 .await
1016 .unwrap();
1017
1018 assert!(
1021 mpool.pending.read().is_empty(),
1022 "Expected no pending messages, but got {}",
1023 mpool.pending.read().len()
1024 );
1025
1026 let mut msgs = Vec::with_capacity(20);
1028 for i in 10..20 {
1029 msgs.push(create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1));
1030 msgs.push(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1));
1031 }
1032 let b3 = mpool.api.next_block();
1033 let ts3 = Tipset::from(&b3);
1034 mpool.api.set_block_messages(&b3, msgs);
1035
1036 for i in 20..30 {
1038 mpool
1039 .add(create_smsg(
1040 &a2,
1041 &a1,
1042 &mut w1,
1043 i,
1044 TEST_GAS_LIMIT,
1045 2 * i + 200,
1046 ))
1047 .unwrap();
1048 mpool
1049 .add(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1))
1050 .unwrap();
1051 }
1052 mpool.api.set_state_sequence(&a1, 10);
1056 mpool.api.set_state_sequence(&a2, 10);
1057 let msgs = mpool.select_messages(&ts3, 1.0).unwrap();
1058
1059 assert_eq!(
1060 msgs.len(),
1061 20,
1062 "Expected 20 messages, but got {}",
1063 msgs.len()
1064 );
1065
1066 let mut next_nonce = 20;
1067 for msg in msgs.iter().take(10) {
1068 assert_eq!(
1069 msg.from(),
1070 a1,
1071 "first 10 returned messages should be from actor a1"
1072 );
1073 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1074 next_nonce += 1;
1075 }
1076 next_nonce = 20;
1077 for msg in msgs.iter().take(20).skip(10) {
1078 assert_eq!(
1079 msg.from(),
1080 a2,
1081 "next 10 returned messages should be from actor a2"
1082 );
1083 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1084 next_nonce += 1;
1085 }
1086 }
1087
1088 #[tokio::test]
1089 async fn message_selection_trimming_gas() {
1090 let mut joinset = JoinSet::new();
1091 let mut mpool = make_test_mpool(&mut joinset);
1092 let ts = mock_tipset(&mut mpool).await;
1093 let api = mpool.api.clone();
1094
1095 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1096 let mut w1 = Wallet::new(ks1);
1097 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1098
1099 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1100 let mut w2 = Wallet::new(ks2);
1101 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1102
1103 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1104 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1105
1106 let nmsgs = (crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT) + 1;
1107
1108 for i in 0..nmsgs {
1110 let bias = (nmsgs - i) / 3;
1111 let m = create_fake_smsg(
1112 &mpool,
1113 &a2,
1114 &a1,
1115 i as u64,
1116 TEST_GAS_LIMIT,
1117 (1 + i % 3 + bias) as u64,
1118 );
1119 mpool.add(m).unwrap();
1120 let m = create_fake_smsg(
1121 &mpool,
1122 &a1,
1123 &a2,
1124 i as u64,
1125 TEST_GAS_LIMIT,
1126 (1 + i % 3 + bias) as u64,
1127 );
1128 mpool.add(m).unwrap();
1129 }
1130
1131 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1132
1133 let expected = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1134 assert_eq!(msgs.len(), expected as usize);
1135 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1136 assert!(m_gas_limit <= crate::shim::econ::BLOCK_GAS_LIMIT);
1137 }
1138
1139 #[tokio::test]
1140 async fn message_selection_trimming_msgs_basic() {
1141 let mut joinset = JoinSet::new();
1142 let mut mpool = make_test_mpool(&mut joinset);
1143 let ts = mock_tipset(&mut mpool).await;
1144 let api = mpool.api.clone();
1145
1146 let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1147 let mut wallet = Wallet::new(keystore);
1148 let address = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1149
1150 api.set_state_balance_raw(&address, TokenAmount::from_whole(1));
1151
1152 for i in 0..BLOCK_MESSAGE_LIMIT {
1154 let msg = create_fake_smsg(&mpool, &address, &address, i as u64, 200_000, 100);
1155 mpool.add(msg).unwrap();
1156 }
1157
1158 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1159 assert_eq!(
1160 msgs.len(),
1161 CBOR_GEN_LIMIT,
1162 "Expected {CBOR_GEN_LIMIT} messages, got {}",
1163 msgs.len()
1164 );
1165
1166 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1168 assert!(
1169 m_gas_limit <= BLOCK_GAS_LIMIT,
1170 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1171 );
1172 }
1173
1174 #[tokio::test]
1175 async fn message_selection_trimming_msgs_two_senders() {
1176 let mut joinset = JoinSet::new();
1177 let mut mpool = make_test_mpool(&mut joinset);
1178 let ts = mock_tipset(&mut mpool).await;
1179 let api = mpool.api.clone();
1180
1181 let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1182 let mut wallet_1 = Wallet::new(keystore_1);
1183 let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1184
1185 let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1186 let mut wallet_2 = Wallet::new(keystore_2);
1187 let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1188
1189 api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1190 api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1191
1192 for i in 0..BLOCK_MESSAGE_LIMIT {
1194 let msg = create_smsg(
1195 &address_2,
1196 &address_1,
1197 &mut wallet_1,
1198 i as u64,
1199 300_000,
1200 100,
1201 );
1202 mpool.add(msg).unwrap();
1203 let msg = create_smsg(
1206 &address_1,
1207 &address_2,
1208 &mut wallet_2,
1209 i as u64,
1210 300_000,
1211 1000,
1212 );
1213 mpool.add(msg).unwrap();
1214 }
1215 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1216 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1218 assert!(
1219 m_gas_limit <= BLOCK_GAS_LIMIT,
1220 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1221 );
1222 let bls_msgs = msgs.iter().filter(|m| m.is_bls()).count();
1223 assert_eq!(
1224 CBOR_GEN_LIMIT, bls_msgs,
1225 "Expected {CBOR_GEN_LIMIT} bls messages, got {bls_msgs}."
1226 );
1227 assert_eq!(
1228 msgs.len(),
1229 BLOCK_MESSAGE_LIMIT,
1230 "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1231 msgs.len()
1232 );
1233 }
1234
1235 #[tokio::test]
1236 async fn message_selection_trimming_msgs_two_senders_complex() {
1237 let mut joinset = JoinSet::new();
1238 let mut mpool = make_test_mpool(&mut joinset);
1239 let ts = mock_tipset(&mut mpool).await;
1240 let api = mpool.api.clone();
1241
1242 let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1243 let mut wallet_1 = Wallet::new(keystore_1);
1244 let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1245
1246 let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1247 let mut wallet_2 = Wallet::new(keystore_2);
1248 let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1249
1250 api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1251 api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1252
1253 let mut counter = 0;
1255 for i in 0..CBOR_GEN_LIMIT {
1256 counter += 1;
1257 let msg = create_smsg(
1258 &address_2,
1259 &address_1,
1260 &mut wallet_1,
1261 i as u64,
1262 300_000,
1263 100,
1264 );
1265 mpool.add(msg).unwrap();
1266 let msg = create_smsg(
1269 &address_1,
1270 &address_2,
1271 &mut wallet_2,
1272 i as u64,
1273 300_000,
1274 100,
1275 );
1276 mpool.add(msg).unwrap();
1277 }
1278
1279 let msg = create_smsg(
1281 &address_2,
1282 &address_1,
1283 &mut wallet_1,
1284 counter as u64,
1285 300_000,
1286 1000,
1287 );
1288 mpool.add(msg).unwrap();
1289
1290 let msg = create_smsg(
1291 &address_1,
1292 &address_2,
1293 &mut wallet_2,
1294 counter as u64,
1295 300_000,
1296 100,
1297 );
1298 mpool.add(msg).unwrap();
1299
1300 counter += 1;
1301
1302 let msg = create_smsg(
1304 &address_2,
1305 &address_1,
1306 &mut wallet_1,
1307 counter as u64,
1308 400_000,
1309 1_000_000,
1310 );
1311 mpool.add(msg).unwrap();
1312
1313 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1314 let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1316 assert!(
1317 m_gas_limit <= BLOCK_GAS_LIMIT,
1318 "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1319 );
1320 let secps_len = msgs.iter().filter(|m| m.is_secp256k1()).count();
1322 assert_eq!(
1323 CBOR_GEN_LIMIT, secps_len,
1324 "Expected {CBOR_GEN_LIMIT} secp messages, got {secps_len}."
1325 );
1326 assert_eq!(
1328 msgs.len(),
1329 BLOCK_MESSAGE_LIMIT,
1330 "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1331 msgs.len()
1332 );
1333 }
1334
1335 #[tokio::test]
1336 async fn message_selection_priority() {
1337 let db = MemoryDB::default();
1338
1339 let mut joinset = JoinSet::new();
1340 let mut mpool = make_test_mpool(&mut joinset);
1341 let ts = mock_tipset(&mut mpool).await;
1342 let api = mpool.api.clone();
1343
1344 let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1345 let mut w1 = Wallet::new(ks1);
1346 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1347
1348 let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1349 let mut w2 = Wallet::new(ks2);
1350 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1351
1352 let mut mpool_cfg = mpool.get_config().clone();
1354 mpool_cfg.priority_addrs.push(a1);
1355 mpool.set_config(&db, mpool_cfg).unwrap();
1356
1357 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1359 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1360
1361 let nmsgs = 10;
1362
1363 for i in 0..nmsgs {
1365 let bias = (nmsgs - i) / 3;
1366 let m = create_smsg(
1367 &a2,
1368 &a1,
1369 &mut w1,
1370 i as u64,
1371 TEST_GAS_LIMIT,
1372 (1 + i % 3 + bias) as u64,
1373 );
1374 mpool.add(m).unwrap();
1375 let m = create_smsg(
1376 &a1,
1377 &a2,
1378 &mut w2,
1379 i as u64,
1380 TEST_GAS_LIMIT,
1381 (1 + i % 3 + bias) as u64,
1382 );
1383 mpool.add(m).unwrap();
1384 }
1385
1386 let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1387
1388 assert_eq!(msgs.len(), 20);
1389
1390 let mut next_nonce = 0;
1391 for msg in msgs.iter().take(10) {
1392 assert_eq!(
1393 msg.from(),
1394 a1,
1395 "first 10 returned messages should be from actor a1"
1396 );
1397 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1398 next_nonce += 1;
1399 }
1400 next_nonce = 0;
1401 for msg in msgs.iter().take(20).skip(10) {
1402 assert_eq!(
1403 msg.from(),
1404 a2,
1405 "next 10 returned messages should be from actor a2"
1406 );
1407 assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1408 next_nonce += 1;
1409 }
1410 }
1411
1412 #[tokio::test]
1413 async fn test_optimal_msg_selection1() {
1414 let mut joinset = JoinSet::new();
1418 let mut mpool = make_test_mpool(&mut joinset);
1419 let ts = mock_tipset(&mut mpool).await;
1420 let api = mpool.api.clone();
1421
1422 let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1424 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1425 let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1426 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1427
1428 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1429 api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1430
1431 let n_msgs = 10 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1432
1433 for i in 0..(n_msgs as usize) {
1437 let bias = (n_msgs as usize - i) / 3;
1438 let m = create_fake_smsg(
1439 &mpool,
1440 &a2,
1441 &a1,
1442 i as u64,
1443 TEST_GAS_LIMIT,
1444 (1 + i % 3 + bias) as u64,
1445 );
1446 mpool.add(m).unwrap();
1447 }
1448
1449 let msgs = mpool.select_messages(&ts, 0.25).unwrap();
1450
1451 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1452
1453 assert_eq!(msgs.len(), expected_msgs as usize);
1454
1455 for (next_nonce, m) in msgs.into_iter().enumerate() {
1456 assert_eq!(m.from(), a1, "Expected message from a1");
1457 assert_eq!(
1458 m.message().sequence,
1459 next_nonce as u64,
1460 "expected nonce {} but got {}",
1461 next_nonce,
1462 m.message().sequence
1463 );
1464 }
1465 }
1466
1467 #[tokio::test]
1468 async fn test_optimal_msg_selection2() {
1469 let mut joinset = JoinSet::new();
1470 let mut mpool = make_test_mpool(&mut joinset);
1475 let ts = mock_tipset(&mut mpool).await;
1476 let api = mpool.api.clone();
1477
1478 let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1480 let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1481 let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1482 let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1483
1484 api.set_state_balance_raw(&a1, TokenAmount::from_whole(1)); api.set_state_balance_raw(&a2, TokenAmount::from_whole(1)); let n_msgs = 5 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1488 for i in 0..n_msgs as usize {
1489 let bias = (n_msgs as usize - i) / 3;
1490 let m = create_fake_smsg(
1491 &mpool,
1492 &a2,
1493 &a1,
1494 i as u64,
1495 TEST_GAS_LIMIT,
1496 (200000 + i % 3 + bias) as u64,
1497 );
1498 mpool.add(m).unwrap();
1499 let m = create_fake_smsg(
1500 &mpool,
1501 &a1,
1502 &a2,
1503 i as u64,
1504 TEST_GAS_LIMIT,
1505 (190000 + i % 3 + bias) as u64,
1506 );
1507 mpool.add(m).unwrap();
1508 }
1509
1510 let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1511
1512 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1513 assert_eq!(
1514 msgs.len(),
1515 expected_msgs as usize,
1516 "Expected {} messages, but got {}",
1517 expected_msgs,
1518 msgs.len()
1519 );
1520
1521 let mut n_from1 = 0;
1522 let mut n_from2 = 0;
1523 let mut next_nonce1 = 0;
1524 let mut next_nonce2 = 0;
1525
1526 for m in msgs {
1527 if m.from() == a1 {
1528 if m.message.sequence != next_nonce1 {
1529 panic!(
1530 "Expected nonce {}, but got {}",
1531 next_nonce1, m.message.sequence
1532 );
1533 }
1534 next_nonce1 += 1;
1535 n_from1 += 1;
1536 } else {
1537 if m.message.sequence != next_nonce2 {
1538 panic!(
1539 "Expected nonce {}, but got {}",
1540 next_nonce2, m.message.sequence
1541 );
1542 }
1543 next_nonce2 += 1;
1544 n_from2 += 1;
1545 }
1546 }
1547
1548 if n_from1 > n_from2 {
1549 panic!("Expected more msgs from a2 than a1");
1550 }
1551 }
1552
1553 #[tokio::test]
1554 async fn test_optimal_msg_selection3() {
1555 let mut joinset = JoinSet::new();
1556 let mut mpool = make_test_mpool(&mut joinset);
1562 let ts = mock_tipset(&mut mpool).await;
1563 let api = mpool.api.clone();
1564
1565 let n_actors = 10;
1566
1567 let mut actors = vec![];
1568 let mut wallets = vec![];
1569
1570 for _ in 0..n_actors {
1571 let mut wallet = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1572 let actor = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1573
1574 actors.push(actor);
1575 wallets.push(wallet);
1576 }
1577
1578 for a in &mut actors {
1579 api.set_state_balance_raw(a, TokenAmount::from_whole(1));
1580 }
1581
1582 let n_msgs = 1 + crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1583 for i in 0..n_msgs {
1584 for j in 0..n_actors {
1585 let premium =
1586 500000 + 10000 * (n_actors - j) + (n_msgs + 2 - i) / (30 * n_actors) + i % 3;
1587 let m = create_fake_smsg(
1588 &mpool,
1589 &actors[j as usize],
1590 &actors[j as usize],
1591 i as u64,
1592 TEST_GAS_LIMIT,
1593 premium as u64,
1594 );
1595 mpool.add(m).unwrap();
1596 }
1597 }
1598
1599 let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1600 let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1601
1602 assert_eq!(
1603 msgs.len(),
1604 expected_msgs as usize,
1605 "Expected {} messages, but got {}",
1606 expected_msgs,
1607 msgs.len()
1608 );
1609
1610 let who_is = |addr| -> usize {
1611 for (i, a) in actors.iter().enumerate() {
1612 if a == &addr {
1613 return i;
1614 }
1615 }
1616 9999999
1619 };
1620
1621 let mut nonces = vec![0; n_actors as usize];
1622 for m in &msgs {
1623 let who = who_is(m.from());
1624 if who < 3 {
1625 panic!("got message from {who}th actor",);
1626 }
1627
1628 let next_nonce: u64 = nonces[who];
1629 if m.message.sequence != next_nonce {
1630 panic!(
1631 "expected nonce {} but got {}",
1632 next_nonce, m.message.sequence
1633 );
1634 }
1635 nonces[who] += 1;
1636 }
1637 }
1638}