Skip to main content

forest/message_pool/msgpool/
selection.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Contains routines for message selection APIs.
5//! Whenever a miner is ready to create a block for a tipset, it invokes the
6//! `select_messages` API which selects an appropriate set of messages such that
7//! it optimizes miner reward and chain capacity. See <https://docs.filecoin.io/mine/lotus/message-pool/#message-selection> for more details
8
9use std::cmp::Ordering;
10
11use crate::blocks::{BLOCK_MESSAGE_LIMIT, Tipset};
12use crate::message::{MessageRead as _, SignedMessage};
13use crate::message_pool::msg_chain::MsgChainNode;
14use crate::shim::crypto::SignatureType;
15use crate::shim::{address::Address, econ::TokenAmount};
16use crate::state_manager::IdToAddressCache;
17use ahash::{HashMap, HashMapExt};
18use anyhow::{Context, bail, ensure};
19use parking_lot::RwLock;
20use rand::prelude::SliceRandom;
21use tracing::{debug, error, warn};
22
23use crate::shim::crypto::Signature;
24use crate::utils::cache::SizeTrackingLruCache;
25use crate::utils::get_size::CidWrapper;
26
27use super::{MpoolCtx, msg_pool::MessagePool, provider::Provider, utils::recover_sig};
28use crate::message_pool::{
29    Error, add_to_selected_msgs,
30    msg_chain::{Chains, NodeKey, create_message_chains},
31    msg_pool::MsgSet,
32    msgpool::MIN_GAS,
33};
34
35type Pending = HashMap<Address, HashMap<u64, SignedMessage>>;
36
37// A cap on maximum number of message to include in a block
38const MAX_BLOCK_MSGS: usize = 16000;
39const MAX_BLOCKS: usize = 15;
40const CBOR_GEN_LIMIT: usize = 8192; // Same limits as in
41// [cbor-gen](https://github.com/whyrusleeping/cbor-gen/blob/cba3eeea9ae8ec4db1b7283e3654d8c18979affe/gen.go#L32), which Lotus uses.
42
43/// A structure that holds the selected messages for a block.
44/// It tracks the gas limit and the limits for different signature types
45/// to ensure that the block does not exceed the limits set by the protocol.
46struct SelectedMessages {
47    /// The messages selected for inclusion in the block.
48    msgs: Vec<SignedMessage>,
49    /// The remaining gas limit for the block.
50    gas_limit: u64,
51    /// The remaining limit for `secp256k1` messages in the block.
52    secp_limit: u64,
53    /// The remaining limit for `bls` messages in the block.
54    bls_limit: u64,
55}
56
57impl Default for SelectedMessages {
58    fn default() -> Self {
59        SelectedMessages {
60            msgs: Vec::new(),
61            gas_limit: crate::shim::econ::BLOCK_GAS_LIMIT,
62            secp_limit: CBOR_GEN_LIMIT as u64,
63            bls_limit: CBOR_GEN_LIMIT as u64,
64        }
65    }
66}
67
68impl SelectedMessages {
69    fn new(msgs: Vec<SignedMessage>, gas_limit: u64) -> Self {
70        SelectedMessages {
71            msgs,
72            gas_limit,
73            ..Default::default()
74        }
75    }
76
77    /// Returns the number of messages selected for inclusion in the block.
78    fn len(&self) -> usize {
79        self.msgs.len()
80    }
81
82    /// Truncates the selected messages to the specified length.
83    fn truncate(&mut self, len: usize) {
84        self.msgs.truncate(len);
85    }
86
87    /// Reduces the gas limit by the specified amount. It ensures that the gas limit does not
88    /// go below zero (which would cause a panic).
89    fn reduce_gas_limit(&mut self, gas: u64) {
90        self.gas_limit = self.gas_limit.saturating_sub(gas);
91    }
92
93    /// Reduces the BLS limit by the specified amount. It ensures that the BLS message limit does not
94    /// go below zero (which would cause a panic).
95    fn reduce_bls_limit(&mut self, bls: u64) {
96        self.bls_limit = self.bls_limit.saturating_sub(bls);
97    }
98
99    /// Reduces the `Secp256k1` limit by the specified amount. It ensures that the `Secp256k1` message limit
100    /// does not go below zero (which would cause a panic).
101    fn reduce_secp_limit(&mut self, secp: u64) {
102        self.secp_limit = self.secp_limit.saturating_sub(secp);
103    }
104
105    /// Extends the selected messages with the given messages.
106    fn extend(&mut self, msgs: Vec<SignedMessage>) {
107        self.msgs.extend(msgs);
108    }
109
110    /// Tries to add a message chain to the selected messages. Returns an error if the chain can't
111    /// be added due to block constraints.
112    fn try_to_add(&mut self, message_chain: MsgChainNode) -> anyhow::Result<()> {
113        let msg_chain_len = message_chain.msgs.len();
114        ensure!(
115            BLOCK_MESSAGE_LIMIT >= msg_chain_len + self.len(),
116            "Message chain is too long to fit in the block: {} messages, limit is {BLOCK_MESSAGE_LIMIT}",
117            msg_chain_len + self.len(),
118        );
119        ensure!(
120            self.gas_limit >= message_chain.gas_limit,
121            "Message chain gas limit is too high: {} gas, limit is {}",
122            message_chain.gas_limit,
123            self.gas_limit
124        );
125
126        match message_chain.sig_type {
127            Some(SignatureType::Bls) => {
128                ensure!(
129                    self.bls_limit >= msg_chain_len as u64,
130                    "BLS limit is too low: {msg_chain_len} messages, limit is {}",
131                    self.bls_limit
132                );
133
134                self.extend(message_chain.msgs);
135                self.reduce_bls_limit(msg_chain_len as u64);
136                self.reduce_gas_limit(message_chain.gas_limit);
137            }
138            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
139                ensure!(
140                    self.secp_limit >= msg_chain_len as u64,
141                    "Secp256k1 limit is too low: {msg_chain_len} messages, limit is {}",
142                    self.secp_limit
143                );
144
145                self.extend(message_chain.msgs);
146                self.reduce_secp_limit(msg_chain_len as u64);
147                self.reduce_gas_limit(message_chain.gas_limit);
148            }
149            None => {
150                // This is a message with no signature type, which is not allowed in the current
151                // implementation. This _should_ never happen, but we handle it gracefully.
152                warn!("Tried to add a message chain with no signature type");
153            }
154        }
155        Ok(())
156    }
157
158    /// Tries to add a message chain with dependencies to the selected messages.
159    /// It will trim or invalidate if appropriate.
160    fn try_to_add_with_deps(
161        &mut self,
162        idx: usize,
163        chains: &mut Chains,
164        base_fee: &TokenAmount,
165    ) -> anyhow::Result<()> {
166        let message_chain = chains
167            .get_mut_at(idx)
168            .context("Couldn't find required message chain")?;
169        // compute the dependencies that must be merged and the gas limit including deps
170        let mut chain_gas_limit = message_chain.gas_limit;
171        let mut chain_msg_limit = message_chain.msgs.len();
172        let mut dep_gas_limit = 0;
173        let mut dep_message_limit = 0;
174        let selected_messages_limit = match message_chain.sig_type {
175            Some(SignatureType::Bls) => self.bls_limit as usize,
176            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
177                self.secp_limit as usize
178            }
179            None => {
180                // This is a message with no signature type, which is not allowed in the current
181                // implementation. This _should_ never happen, but we handle it gracefully.
182                bail!("Tried to add a message chain with no signature type");
183            }
184        };
185        let selected_messages_limit =
186            selected_messages_limit.min(BLOCK_MESSAGE_LIMIT.saturating_sub(self.len()));
187
188        let mut chain_deps = vec![];
189        let mut cur_chain = message_chain.prev;
190        let _ = message_chain; // drop the mutable borrow to avoid conflicts
191        while let Some(cur_chn) = cur_chain {
192            let node = chains.get(cur_chn).unwrap();
193            if !node.merged {
194                chain_deps.push(cur_chn);
195                chain_gas_limit += node.gas_limit;
196                chain_msg_limit += node.msgs.len();
197                dep_gas_limit += node.gas_limit;
198                dep_message_limit += node.msgs.len();
199                cur_chain = node.prev;
200            } else {
201                break;
202            }
203        }
204
205        // the chain doesn't fit as-is, so trim / invalidate it and return false
206        if chain_gas_limit > self.gas_limit || chain_msg_limit > selected_messages_limit {
207            // it doesn't all fit; now we have to take into account the dependent chains before
208            // making a decision about trimming or invalidating.
209            // if the dependencies exceed the block limits, then we must invalidate the chain
210            // as it can never be included.
211            // Otherwise we can just trim and continue
212            if dep_gas_limit > self.gas_limit || dep_message_limit >= selected_messages_limit {
213                chains.invalidate(chains.get_key_at(idx));
214            } else {
215                // dependencies fit, just trim it
216                chains.trim_msgs_at(
217                    idx,
218                    self.gas_limit.saturating_sub(dep_gas_limit),
219                    selected_messages_limit.saturating_sub(dep_message_limit),
220                    base_fee,
221                );
222            }
223
224            bail!("Chain doesn't fit in the block");
225        }
226        for dep in chain_deps.iter().rev() {
227            let cur_chain = chains.get_mut(*dep);
228            if let Some(node) = cur_chain {
229                node.merged = true;
230                self.extend(node.msgs.clone());
231            } else {
232                bail!("Couldn't find required dependent message chain");
233            }
234        }
235
236        let message_chain = chains
237            .get_mut_at(idx)
238            .context("Couldn't find required message chain")?;
239        message_chain.merged = true;
240        self.extend(message_chain.msgs.clone());
241        self.reduce_gas_limit(chain_gas_limit);
242
243        match message_chain.sig_type {
244            Some(SignatureType::Bls) => {
245                self.reduce_bls_limit(chain_msg_limit as u64);
246            }
247            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
248                self.reduce_secp_limit(chain_msg_limit as u64);
249            }
250            None => {
251                // This is a message with no signature type, which is not allowed in the current
252                // implementation. This _should_ never happen, but we handle it gracefully.
253                bail!("Tried to add a message chain with no signature type");
254            }
255        }
256
257        Ok(())
258    }
259
260    fn trim_chain_at(&mut self, chains: &mut Chains, idx: usize, base_fee: &TokenAmount) {
261        let message_chain = match chains.get_at(idx) {
262            Some(message_chain) => message_chain,
263            None => {
264                error!("Tried to trim a message chain that doesn't exist");
265                return;
266            }
267        };
268        let msg_limit = BLOCK_MESSAGE_LIMIT.saturating_sub(self.len());
269        let msg_limit = match message_chain.sig_type {
270            Some(SignatureType::Bls) => std::cmp::min(self.bls_limit, msg_limit as u64),
271            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
272                std::cmp::min(self.secp_limit, msg_limit as u64)
273            }
274            _ => {
275                // This is a message with no signature type, which is not allowed in the current
276                // implementation. This _should_ never happen, but we handle it gracefully.
277                error!("Tried to trim a message chain with no signature type");
278                return;
279            }
280        };
281
282        if message_chain.gas_limit > self.gas_limit || message_chain.msgs.len() > msg_limit as usize
283        {
284            chains.trim_msgs_at(idx, self.gas_limit, msg_limit as usize, base_fee);
285        }
286    }
287}
288
289impl<T> MessagePool<T>
290where
291    T: Provider,
292{
293    /// Forest employs a sophisticated algorithm for selecting messages
294    /// for inclusion from the pool, given the ticket quality of a miner.
295    /// This method selects messages for including in a block.
296    pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result<Vec<SignedMessage>, Error> {
297        let cur_ts = self.current_tipset();
298        // if the ticket quality is high enough that the first block has higher
299        // probability than any other block, then we don't bother with optimal
300        // selection because the first block will always have higher effective
301        // performance. Otherwise we select message optimally based on effective
302        // performance of chains.
303        let mut msgs = if tq > 0.84 {
304            self.select_messages_greedy(&cur_ts, ts)
305        } else {
306            self.select_messages_optimal(&cur_ts, ts, tq)
307        }?;
308
309        if msgs.len() > MAX_BLOCK_MSGS {
310            warn!(
311                "Message selection chose too many messages: {} > {MAX_BLOCK_MSGS}",
312                msgs.len(),
313            );
314            msgs.truncate(MAX_BLOCK_MSGS)
315        }
316
317        Ok(msgs.msgs)
318    }
319
320    fn select_messages_greedy(
321        &self,
322        cur_ts: &Tipset,
323        ts: &Tipset,
324    ) -> Result<SelectedMessages, Error> {
325        let base_fee = self.api.chain_compute_base_fee(ts)?;
326
327        // 0. Load messages from the target tipset; if it is the same as the current
328        // tipset in    the mpool, then this is just the pending messages
329        let mut pending = self.get_pending_messages(cur_ts, ts)?;
330
331        if pending.is_empty() {
332            return Ok(SelectedMessages::default());
333        }
334
335        // 0b. Select all priority messages that fit in the block
336        let selected_msgs = self.select_priority_messages(&mut pending, &base_fee, ts)?;
337
338        // check if block has been filled
339        if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
340            return Ok(selected_msgs);
341        }
342
343        // 1. Create a list of dependent message chains with maximal gas reward per
344        // limit consumed
345        let mut chains = Chains::new();
346        for (actor, mset) in pending.into_iter() {
347            create_message_chains(
348                self.api.as_ref(),
349                &actor,
350                &mset,
351                &base_fee,
352                ts,
353                &mut chains,
354                &self.chain_config,
355            )?;
356        }
357
358        Ok(merge_and_trim(
359            &mut chains,
360            selected_msgs,
361            &base_fee,
362            MIN_GAS,
363        ))
364    }
365
366    #[allow(clippy::indexing_slicing)]
367    fn select_messages_optimal(
368        &self,
369        cur_ts: &Tipset,
370        target_tipset: &Tipset,
371        ticket_quality: f64,
372    ) -> Result<SelectedMessages, Error> {
373        let base_fee = self.api.chain_compute_base_fee(target_tipset)?;
374
375        // 0. Load messages from the target tipset; if it is the same as the current
376        // tipset in    the mpool, then this is just the pending messages
377        let mut pending = self.get_pending_messages(cur_ts, target_tipset)?;
378
379        if pending.is_empty() {
380            return Ok(SelectedMessages::default());
381        }
382
383        // 0b. Select all priority messages that fit in the block
384        let mut selected_msgs =
385            self.select_priority_messages(&mut pending, &base_fee, target_tipset)?;
386
387        // check if block has been filled
388        if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
389            return Ok(selected_msgs);
390        }
391
392        // 1. Create a list of dependent message chains with maximal gas reward per
393        // limit consumed
394        let mut chains = Chains::new();
395        for (actor, mset) in pending.into_iter() {
396            create_message_chains(
397                self.api.as_ref(),
398                &actor,
399                &mset,
400                &base_fee,
401                target_tipset,
402                &mut chains,
403                &self.chain_config,
404            )?;
405        }
406
407        // 2. Sort the chains
408        chains.sort(false);
409
410        if chains.get_at(0).is_some_and(|it| it.gas_perf < 0.0) {
411            tracing::warn!(
412                "all messages in mpool have non-positive gas performance {}",
413                chains[0].gas_perf
414            );
415            return Ok(selected_msgs);
416        }
417
418        // 3. Partition chains into blocks (without trimming)
419        //    we use the full block_gas_limit (as opposed to the residual `gas_limit`
420        //    from the priority message selection) as we have to account for
421        //    what other block providers are doing
422        let mut next_chain = 0;
423        let mut partitions: Vec<Vec<NodeKey>> = vec![vec![]; MAX_BLOCKS];
424        let mut i = 0;
425        while i < MAX_BLOCKS && next_chain < chains.len() {
426            let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
427            let mut msg_limit = BLOCK_MESSAGE_LIMIT;
428            while next_chain < chains.len() {
429                let chain_key = chains.key_vec[next_chain];
430                next_chain += 1;
431                partitions[i].push(chain_key);
432                let chain = chains.get(chain_key).unwrap();
433                let chain_gas_limit = chain.gas_limit;
434                if gas_limit < chain_gas_limit {
435                    break;
436                }
437                gas_limit = gas_limit.saturating_sub(chain_gas_limit);
438                msg_limit = msg_limit.saturating_sub(chain.msgs.len());
439                if gas_limit < MIN_GAS || msg_limit == 0 {
440                    break;
441                }
442            }
443            i += 1;
444        }
445
446        // 4. Compute effective performance for each chain, based on the partition they
447        // fall into    The effective performance is the gas_perf of the chain *
448        // block probability
449        let block_prob = crate::message_pool::block_probabilities(ticket_quality);
450        let mut eff_chains = 0;
451        for i in 0..MAX_BLOCKS {
452            for k in &partitions[i] {
453                if let Some(node) = chains.get_mut(*k) {
454                    node.eff_perf = node.gas_perf * block_prob[i];
455                }
456            }
457            eff_chains += partitions[i].len();
458        }
459
460        // nullify the effective performance of chains that don't fit in any partition
461        for i in eff_chains..chains.len() {
462            if let Some(node) = chains.get_mut_at(i) {
463                node.set_null_effective_perf();
464            }
465        }
466
467        // 5. Re-sort the chains based on effective performance
468        chains.sort_effective();
469
470        // 6. Merge the head chains to produce the list of messages selected for
471        //    inclusion subject to the residual block limits
472        //    When a chain is merged in, all its previous dependent chains *must* also
473        //    be merged in or we'll have a broken block
474        let mut last = chains.len();
475        for i in 0..chains.len() {
476            // did we run out of performing chains?
477            if chains[i].gas_perf < 0.0 {
478                break;
479            }
480
481            // has it already been merged?
482            if chains[i].merged {
483                continue;
484            }
485
486            match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
487                Ok(_) => {
488                    // adjust the effective performance for all subsequent chains
489                    if let Some(next_key) = chains[i].next {
490                        let next_node = chains.get_mut(next_key).unwrap();
491                        if next_node.eff_perf > 0.0 {
492                            next_node.eff_perf += next_node.parent_offset;
493                            let mut next_next_key = next_node.next;
494                            while let Some(nnk) = next_next_key {
495                                let (nn_node, prev_perfs) = chains.get_mut_with_prev_eff(nnk);
496                                if let Some(nn_node) = nn_node {
497                                    if nn_node.eff_perf > 0.0 {
498                                        nn_node.set_eff_perf(prev_perfs);
499                                        next_next_key = nn_node.next;
500                                    } else {
501                                        break;
502                                    }
503                                } else {
504                                    break;
505                                }
506                            }
507                        }
508                    }
509
510                    // re-sort to account for already merged chains and effective performance
511                    // adjustments the sort *must* be stable or we end up getting
512                    // negative gasPerfs pushed up.
513                    chains.sort_range_effective(i + 1..);
514
515                    continue;
516                }
517                Err(e) => {
518                    debug!("Failed to add message chain with dependencies: {e:#}");
519                }
520            }
521
522            // we can't fit this chain and its dependencies because of block gasLimit -- we
523            // are at the edge
524            last = i;
525            break;
526        }
527
528        // 7. We have reached the edge of what can fit wholesale; if we still hae
529        // available    gasLimit to pack some more chains, then trim the last
530        // chain and push it down.
531        //
532        // Trimming invalidates subsequent dependent chains so that they can't be selected
533        // as their dependency cannot be (fully) included. We do this in a loop because the blocker
534        // might have been inordinately large and we might have to do it
535        // multiple times to satisfy tail packing.
536        'tail_loop: while selected_msgs.gas_limit >= MIN_GAS && last < chains.len() {
537            if !chains[last].valid {
538                // the chain has been invalidated, we can't use it
539                last += 1;
540                continue;
541            }
542
543            // trim if necessary
544            selected_msgs.trim_chain_at(&mut chains, last, &base_fee);
545
546            // push down if it hasn't been invalidated
547            if chains[last].valid {
548                for i in last..chains.len() - 1 {
549                    if chains[i].cmp_effective(&chains[i + 1]) == Ordering::Greater {
550                        break;
551                    }
552                    chains.key_vec.swap(i, i + 1);
553                }
554            }
555
556            // select the next (valid and fitting) chain and its dependencies for inclusion
557            let lst = last; // to make clippy happy, see: https://rust-lang.github.io/rust-clippy/master/index.html#mut_range_bound
558            for i in lst..chains.len() {
559                let chain = &mut chains[i];
560                // has the chain been invalidated
561                if !chain.valid {
562                    continue;
563                }
564
565                // has it already been merged?
566                if chain.merged {
567                    continue;
568                }
569
570                // if gasPerf < 0 we have no more profitable chains
571                if chain.gas_perf < 0.0 {
572                    break 'tail_loop;
573                }
574
575                match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
576                    Ok(_) => continue,
577                    Err(e) => debug!("Failed to add message chain with dependencies: {e:#}"),
578                }
579
580                continue 'tail_loop;
581            }
582
583            // the merge loop ended after processing all the chains and we we probably have
584            // still gas to spare; end the loop.
585            break;
586        }
587
588        // if we have room to spare, pick some random (non-negative) chains to fill
589        // the block
590        // we pick randomly so that we minimize the probability of
591        // duplication among all block producers
592        if selected_msgs.gas_limit >= MIN_GAS && selected_msgs.msgs.len() <= BLOCK_MESSAGE_LIMIT {
593            let pre_random_length = selected_msgs.len();
594
595            chains
596                .key_vec
597                .shuffle(&mut crate::utils::rand::forest_rng());
598
599            for i in 0..chains.len() {
600                if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
601                    break;
602                }
603
604                // has it been merged or invalidated?
605                if chains[i].merged || !chains[i].valid {
606                    continue;
607                }
608
609                // is it negative?
610                if chains[i].gas_perf < 0.0 {
611                    continue;
612                }
613
614                if selected_msgs
615                    .try_to_add_with_deps(i, &mut chains, &base_fee)
616                    .is_ok()
617                {
618                    // we added it, continue
619                    continue;
620                }
621
622                if chains[i].valid {
623                    // chain got trimmer on the previous call to `try_to_add_with_deps`, so it can
624                    // now be included.
625                    selected_msgs
626                        .try_to_add_with_deps(i, &mut chains, &base_fee)
627                        .context("Failed to add message chain with dependencies")?;
628                    continue;
629                }
630            }
631
632            if selected_msgs.len() != pre_random_length {
633                tracing::warn!(
634                    "optimal selection failed to pack a block; picked {} messages with random selection",
635                    selected_msgs.len() - pre_random_length
636                );
637            }
638        }
639
640        Ok(selected_msgs)
641    }
642
643    fn get_pending_messages(&self, cur_ts: &Tipset, ts: &Tipset) -> Result<Pending, Error> {
644        let mut result: Pending = HashMap::new();
645        let mut in_sync = false;
646        if cur_ts.epoch() == ts.epoch() && cur_ts == ts {
647            in_sync = true;
648        }
649
650        for (a, mset) in self.pending.read().iter() {
651            if in_sync {
652                result.insert(*a, mset.msgs.clone());
653            } else {
654                let mut mset_copy = HashMap::new();
655                for (nonce, m) in mset.msgs.iter() {
656                    mset_copy.insert(*nonce, m.clone());
657                }
658                result.insert(*a, mset_copy);
659            }
660        }
661
662        if in_sync {
663            return Ok(result);
664        }
665
666        // Run head change to do reorg detection
667        run_head_change(
668            self.api.as_ref(),
669            &self.bls_sig_cache,
670            &self.pending,
671            &self.key_cache,
672            cur_ts.clone(),
673            ts.clone(),
674            &mut result,
675        )?;
676
677        Ok(result)
678    }
679
680    fn select_priority_messages(
681        &self,
682        pending: &mut Pending,
683        base_fee: &TokenAmount,
684        ts: &Tipset,
685    ) -> Result<SelectedMessages, Error> {
686        let result = Vec::with_capacity(self.config.size_limit_low() as usize);
687        let gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
688        let min_gas = 1298450;
689
690        // 1. Get priority actor chains
691        let priority = self.config.priority_addrs();
692        let mut chains = Chains::new();
693        for actor in priority.iter() {
694            // remove actor from pending set as we are processing these messages.
695            if let Some(mset) = pending.remove(actor) {
696                // create chains for the priority actor
697                create_message_chains(
698                    self.api.as_ref(),
699                    actor,
700                    &mset,
701                    base_fee,
702                    ts,
703                    &mut chains,
704                    &self.chain_config,
705                )?;
706            }
707        }
708
709        if chains.is_empty() {
710            return Ok(SelectedMessages::new(Vec::new(), gas_limit));
711        }
712
713        Ok(merge_and_trim(
714            &mut chains,
715            SelectedMessages::new(result, gas_limit),
716            base_fee,
717            min_gas,
718        ))
719    }
720}
721
722/// Returns merged and trimmed messages with the gas limit
723#[allow(clippy::indexing_slicing)]
724fn merge_and_trim(
725    chains: &mut Chains,
726    mut selected_msgs: SelectedMessages,
727    base_fee: &TokenAmount,
728    min_gas: u64,
729) -> SelectedMessages {
730    if chains.is_empty() {
731        return selected_msgs;
732    }
733
734    // 2. Sort the chains
735    chains.sort(true);
736
737    let first_chain_gas_perf = chains[0].gas_perf;
738
739    if !chains.is_empty() && first_chain_gas_perf < 0.0 {
740        warn!(
741            "all priority messages in mpool have negative gas performance bestGasPerf: {}",
742            first_chain_gas_perf
743        );
744        return selected_msgs;
745    }
746
747    // 3. Merge chains until the block limit, as long as they have non-negative gas
748    // performance
749    let mut last = chains.len();
750    for i in 0..chains.len() {
751        let node = &chains[i];
752
753        if node.gas_perf < 0.0 {
754            break;
755        }
756
757        if selected_msgs.try_to_add(node.clone()).is_ok() {
758            // there was room, we added the chain, keep going
759            continue;
760        }
761
762        // we can't fit this chain because of block gas limit -- we are at the edge
763        last = i;
764        break;
765    }
766
767    'tail_loop: while selected_msgs.gas_limit >= min_gas && last < chains.len() {
768        // trim, discard negative performing messages
769        selected_msgs.trim_chain_at(chains, last, base_fee);
770
771        // push down if it hasn't been invalidated
772        let node = &chains[last];
773        if node.valid {
774            for i in last..chains.len() - 1 {
775                // slot_chains
776                let cur_node = &chains[i];
777                let next_node = &chains[i + 1];
778                if cur_node.compare(next_node) == Ordering::Greater {
779                    break;
780                }
781
782                chains.key_vec.swap(i, i + 1);
783            }
784        }
785
786        // select the next (valid and fitting) chain for inclusion
787        let lst = last; // to make clippy happy, see: https://rust-lang.github.io/rust-clippy/master/index.html#mut_range_bound
788        for i in lst..chains.len() {
789            let chain = chains[i].clone();
790            if !chain.valid {
791                continue;
792            }
793
794            // if gas_perf < 0 then we have no more profitable chains
795            if chain.gas_perf < 0.0 {
796                break 'tail_loop;
797            }
798
799            // does it fit in the block?
800            if selected_msgs.try_to_add(chain).is_ok() {
801                // there was room, we added the chain, keep going
802                continue;
803            }
804
805            // this chain needs to be trimmed
806            last += i;
807            continue 'tail_loop;
808        }
809
810        break;
811    }
812
813    selected_msgs
814}
815
816/// Like `head_change`, except it doesn't change the state of the `MessagePool`.
817/// It simulates a head change call.
818// This logic should probably be implemented in the ChainStore. It handles
819// reorgs.
820pub(in crate::message_pool) fn run_head_change<T>(
821    api: &T,
822    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
823    pending: &RwLock<HashMap<Address, MsgSet>>,
824    key_cache: &IdToAddressCache,
825    from: Tipset,
826    to: Tipset,
827    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
828) -> Result<(), Error>
829where
830    T: Provider,
831{
832    let mut left = from;
833    let mut right = to;
834    let mut left_chain = Vec::new();
835    let mut right_chain = Vec::new();
836    while left != right {
837        if left.epoch() > right.epoch() {
838            left_chain.push(left.clone());
839            let par = api.load_tipset(left.parents())?;
840            left = par;
841        } else {
842            right_chain.push(right.clone());
843            let par = api.load_tipset(right.parents())?;
844            right = par;
845        }
846    }
847    for ts in left_chain {
848        let mut msgs: Vec<SignedMessage> = Vec::new();
849        for block in ts.block_headers() {
850            let (umsg, smsgs) = api.messages_for_block(block)?;
851            msgs.extend(smsgs);
852            for msg in umsg {
853                let msg_cid = msg.cid();
854                if let Ok(smsg) = recover_sig(bls_sig_cache, msg) {
855                    msgs.push(smsg);
856                } else {
857                    tracing::debug!("could not recover signature for bls message {msg_cid}");
858                }
859            }
860        }
861        for msg in msgs {
862            add_to_selected_msgs(msg, rmsgs);
863        }
864    }
865
866    for ts in right_chain {
867        let mpool_ctx = MpoolCtx {
868            api,
869            key_cache,
870            pending,
871            ts: &ts,
872        };
873        for b in ts.block_headers() {
874            let (msgs, smsgs) = api.messages_for_block(b)?;
875
876            for msg in smsgs {
877                mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), rmsgs)?;
878            }
879            for msg in msgs {
880                mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, rmsgs)?;
881            }
882        }
883    }
884    Ok(())
885}
886
887#[cfg(test)]
888mod test_selection {
889    use std::sync::Arc;
890
891    use super::*;
892    use crate::db::MemoryDB;
893    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
894    use crate::message_pool::msgpool::{
895        test_provider::{TestApi, mock_block},
896        tests::{create_fake_smsg, create_smsg},
897    };
898    use crate::shim::crypto::SignatureType;
899    use crate::shim::econ::BLOCK_GAS_LIMIT;
900    use tokio::task::JoinSet;
901
902    const TEST_GAS_LIMIT: i64 = 6955002;
903
904    fn make_test_mpool(joinset: &mut JoinSet<anyhow::Result<()>>) -> MessagePool<TestApi> {
905        let tma = TestApi::default();
906        let (tx, _rx) = flume::bounded(50);
907        MessagePool::new(tma, tx, Default::default(), Arc::default(), joinset).unwrap()
908    }
909
910    /// Creates a tipset with a mocked block and performs a head change to setup the
911    /// [`MessagePool`] for testing.
912    async fn mock_tipset(mpool: &MessagePool<TestApi>) -> Tipset {
913        let b1 = mock_block(1, 1);
914        let ts = Tipset::from(&b1);
915        mpool
916            .apply_head_change(Vec::new(), vec![Tipset::from(b1)])
917            .await
918            .unwrap();
919        ts
920    }
921
922    #[tokio::test]
923    async fn basic_message_selection() {
924        let mut joinset = JoinSet::new();
925        let mpool = make_test_mpool(&mut joinset);
926
927        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
928        let mut w1 = Wallet::new(ks1);
929        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
930
931        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
932        let mut w2 = Wallet::new(ks2);
933        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
934
935        let ts = mock_tipset(&mpool).await;
936
937        mpool
938            .api
939            .set_state_balance_raw(&a1, TokenAmount::from_whole(1));
940        mpool
941            .api
942            .set_state_balance_raw(&a2, TokenAmount::from_whole(1));
943
944        // we create 10 messages from each actor to another, with the first actor paying
945        // higher gas prices than the second; we expect message selection to
946        // order his messages first
947        for i in 0..10 {
948            let m = create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1);
949            mpool.add(m).unwrap();
950        }
951        for i in 0..10 {
952            let m = create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1);
953            mpool.add(m).unwrap();
954        }
955
956        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
957
958        assert_eq!(msgs.len(), 20, "Expected 20 messages, got {}", msgs.len());
959
960        let mut next_nonce = 0;
961        for (i, msg) in msgs.iter().enumerate().take(10) {
962            assert_eq!(
963                msg.from(),
964                a1,
965                "first 10 returned messages should be from actor a1 {i}",
966            );
967            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
968            next_nonce += 1;
969        }
970
971        next_nonce = 0;
972        for (i, msg) in msgs.iter().enumerate().take(20).skip(10) {
973            assert_eq!(
974                msg.from(),
975                a2,
976                "next 10 returned messages should be from actor a2 {i}",
977            );
978            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
979            next_nonce += 1;
980        }
981
982        // now we make a block with all the messages and advance the chain
983        let b2 = mpool.api.next_block();
984        mpool.api.set_block_messages(&b2, msgs);
985        mpool
986            .apply_head_change(Vec::new(), vec![Tipset::from(b2)])
987            .await
988            .unwrap();
989
990        // we should now have no pending messages in the MessagePool
991        // let pending = mpool.pending.read().await;
992        assert!(
993            mpool.pending.read().is_empty(),
994            "Expected no pending messages, but got {}",
995            mpool.pending.read().len()
996        );
997
998        // create a block and advance the chain without applying to the mpool
999        let mut msgs = Vec::with_capacity(20);
1000        for i in 10..20 {
1001            msgs.push(create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1));
1002            msgs.push(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1));
1003        }
1004        let b3 = mpool.api.next_block();
1005        let ts3 = Tipset::from(&b3);
1006        mpool.api.set_block_messages(&b3, msgs);
1007
1008        // Set state sequence to 20 so that nonces 20..30 don't exceed the nonce gap.
1009        mpool.api.set_state_sequence(&a1, 20);
1010        mpool.api.set_state_sequence(&a2, 20);
1011
1012        // now create another set of messages and add them to the mpool
1013        for i in 20..30 {
1014            mpool
1015                .add(create_smsg(
1016                    &a2,
1017                    &a1,
1018                    &mut w1,
1019                    i,
1020                    TEST_GAS_LIMIT,
1021                    2 * i + 200,
1022                ))
1023                .unwrap();
1024            mpool
1025                .add(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1))
1026                .unwrap();
1027        }
1028        // select messages in the last tipset; this should include the missed messages
1029        // as well as the last messages we added, with the first actor's
1030        // messages first first we need to update the nonce on the api
1031        mpool.api.set_state_sequence(&a1, 10);
1032        mpool.api.set_state_sequence(&a2, 10);
1033        let msgs = mpool.select_messages(&ts3, 1.0).unwrap();
1034
1035        assert_eq!(
1036            msgs.len(),
1037            20,
1038            "Expected 20 messages, but got {}",
1039            msgs.len()
1040        );
1041
1042        let mut next_nonce = 20;
1043        for msg in msgs.iter().take(10) {
1044            assert_eq!(
1045                msg.from(),
1046                a1,
1047                "first 10 returned messages should be from actor a1"
1048            );
1049            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1050            next_nonce += 1;
1051        }
1052        next_nonce = 20;
1053        for msg in msgs.iter().take(20).skip(10) {
1054            assert_eq!(
1055                msg.from(),
1056                a2,
1057                "next 10 returned messages should be from actor a2"
1058            );
1059            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1060            next_nonce += 1;
1061        }
1062    }
1063
1064    #[tokio::test]
1065    async fn message_selection_trimming_gas() {
1066        let mut joinset = JoinSet::new();
1067        let mpool = make_test_mpool(&mut joinset);
1068        let ts = mock_tipset(&mpool).await;
1069        let api = mpool.api.clone();
1070
1071        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1072        let mut w1 = Wallet::new(ks1);
1073        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1074
1075        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1076        let mut w2 = Wallet::new(ks2);
1077        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1078
1079        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1080        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1081
1082        let nmsgs = (crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT) + 1;
1083
1084        // make many small chains for the two actors
1085        for i in 0..nmsgs {
1086            let bias = (nmsgs - i) / 3;
1087            let m = create_fake_smsg(
1088                &mpool,
1089                &a2,
1090                &a1,
1091                i as u64,
1092                TEST_GAS_LIMIT,
1093                (1 + i % 3 + bias) as u64,
1094            );
1095            mpool.add(m).unwrap();
1096            let m = create_fake_smsg(
1097                &mpool,
1098                &a1,
1099                &a2,
1100                i as u64,
1101                TEST_GAS_LIMIT,
1102                (1 + i % 3 + bias) as u64,
1103            );
1104            mpool.add(m).unwrap();
1105        }
1106
1107        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1108
1109        let expected = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1110        assert_eq!(msgs.len(), expected as usize);
1111        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1112        assert!(m_gas_limit <= crate::shim::econ::BLOCK_GAS_LIMIT);
1113    }
1114
1115    #[tokio::test]
1116    async fn message_selection_trimming_msgs_basic() {
1117        let mut joinset = JoinSet::new();
1118        let mpool = make_test_mpool(&mut joinset);
1119        let ts = mock_tipset(&mpool).await;
1120        let api = mpool.api.clone();
1121
1122        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1123        let mut wallet = Wallet::new(keystore);
1124        let address = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1125
1126        api.set_state_balance_raw(&address, TokenAmount::from_whole(1));
1127
1128        // create a larger than selectable chain
1129        for i in 0..BLOCK_MESSAGE_LIMIT {
1130            let msg = create_fake_smsg(&mpool, &address, &address, i as u64, 200_000, 100);
1131            mpool.add(msg).unwrap();
1132        }
1133
1134        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1135        assert_eq!(
1136            msgs.len(),
1137            CBOR_GEN_LIMIT,
1138            "Expected {CBOR_GEN_LIMIT} messages, got {}",
1139            msgs.len()
1140        );
1141
1142        // check that the gas limit is not exceeded
1143        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1144        assert!(
1145            m_gas_limit <= BLOCK_GAS_LIMIT,
1146            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1147        );
1148    }
1149
1150    #[tokio::test]
1151    async fn message_selection_trimming_msgs_two_senders() {
1152        let mut joinset = JoinSet::new();
1153        let mpool = make_test_mpool(&mut joinset);
1154        let ts = mock_tipset(&mpool).await;
1155        let api = mpool.api.clone();
1156
1157        let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1158        let mut wallet_1 = Wallet::new(keystore_1);
1159        let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1160
1161        let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1162        let mut wallet_2 = Wallet::new(keystore_2);
1163        let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1164
1165        api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1166        api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1167
1168        // create 2 larger than selectable chains
1169        for i in 0..BLOCK_MESSAGE_LIMIT {
1170            let msg = create_smsg(
1171                &address_2,
1172                &address_1,
1173                &mut wallet_1,
1174                i as u64,
1175                300_000,
1176                100,
1177            );
1178            mpool.add(msg).unwrap();
1179            // higher has price, those should be preferred and fill the block up to
1180            // the [`CBOR_GEN_LIMIT`] messages.
1181            let msg = create_smsg(
1182                &address_1,
1183                &address_2,
1184                &mut wallet_2,
1185                i as u64,
1186                300_000,
1187                1000,
1188            );
1189            mpool.add(msg).unwrap();
1190        }
1191        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1192        // check that the gas limit is not exceeded
1193        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1194        assert!(
1195            m_gas_limit <= BLOCK_GAS_LIMIT,
1196            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1197        );
1198        let bls_msgs = msgs.iter().filter(|m| m.is_bls()).count();
1199        assert_eq!(
1200            CBOR_GEN_LIMIT, bls_msgs,
1201            "Expected {CBOR_GEN_LIMIT} bls messages, got {bls_msgs}."
1202        );
1203        assert_eq!(
1204            msgs.len(),
1205            BLOCK_MESSAGE_LIMIT,
1206            "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1207            msgs.len()
1208        );
1209    }
1210
1211    #[tokio::test]
1212    async fn message_selection_trimming_msgs_two_senders_complex() {
1213        let mut joinset = JoinSet::new();
1214        let mpool = make_test_mpool(&mut joinset);
1215        let ts = mock_tipset(&mpool).await;
1216        let api = mpool.api.clone();
1217
1218        let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1219        let mut wallet_1 = Wallet::new(keystore_1);
1220        let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1221
1222        let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1223        let mut wallet_2 = Wallet::new(keystore_2);
1224        let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1225
1226        api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1227        api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1228
1229        // create two almost max-length chains of equal value
1230        let mut counter = 0;
1231        for i in 0..CBOR_GEN_LIMIT {
1232            counter += 1;
1233            let msg = create_smsg(
1234                &address_2,
1235                &address_1,
1236                &mut wallet_1,
1237                i as u64,
1238                300_000,
1239                100,
1240            );
1241            mpool.add(msg).unwrap();
1242            // higher has price, those should be preferred and fill the block up to
1243            // the [`CBOR_GEN_LIMIT`] messages.
1244            let msg = create_smsg(
1245                &address_1,
1246                &address_2,
1247                &mut wallet_2,
1248                i as u64,
1249                300_000,
1250                100,
1251            );
1252            mpool.add(msg).unwrap();
1253        }
1254
1255        // address_1 8192th message is worth more than address_2 8192th message
1256        let msg = create_smsg(
1257            &address_2,
1258            &address_1,
1259            &mut wallet_1,
1260            counter as u64,
1261            300_000,
1262            1000,
1263        );
1264        mpool.add(msg).unwrap();
1265
1266        let msg = create_smsg(
1267            &address_1,
1268            &address_2,
1269            &mut wallet_2,
1270            counter as u64,
1271            300_000,
1272            100,
1273        );
1274        mpool.add(msg).unwrap();
1275
1276        counter += 1;
1277
1278        // address 2 (uneselectable) message is worth so much!
1279        let msg = create_smsg(
1280            &address_2,
1281            &address_1,
1282            &mut wallet_1,
1283            counter as u64,
1284            400_000,
1285            1_000_000,
1286        );
1287        mpool.add(msg).unwrap();
1288
1289        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1290        // check that the gas limit is not exceeded
1291        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1292        assert!(
1293            m_gas_limit <= BLOCK_GAS_LIMIT,
1294            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1295        );
1296        // We should have taken the SECP chain from address_1.
1297        let secps_len = msgs.iter().filter(|m| m.is_secp256k1()).count();
1298        assert_eq!(
1299            CBOR_GEN_LIMIT, secps_len,
1300            "Expected {CBOR_GEN_LIMIT} secp messages, got {secps_len}."
1301        );
1302        // The remaining messages should be BLS messages.
1303        assert_eq!(
1304            msgs.len(),
1305            BLOCK_MESSAGE_LIMIT,
1306            "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1307            msgs.len()
1308        );
1309    }
1310
1311    #[tokio::test]
1312    async fn message_selection_priority() {
1313        let db = MemoryDB::default();
1314
1315        let mut joinset = JoinSet::new();
1316        let mut mpool = make_test_mpool(&mut joinset);
1317        let ts = mock_tipset(&mpool).await;
1318        let api = mpool.api.clone();
1319
1320        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1321        let mut w1 = Wallet::new(ks1);
1322        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1323
1324        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1325        let mut w2 = Wallet::new(ks2);
1326        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1327
1328        // set priority addrs to a1
1329        let mut mpool_cfg = mpool.get_config().clone();
1330        mpool_cfg.priority_addrs.push(a1);
1331        mpool.set_config(&db, mpool_cfg).unwrap();
1332
1333        // let gas_limit = 6955002;
1334        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1335        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1336
1337        let nmsgs = 10;
1338
1339        // make many small chains for the two actors
1340        for i in 0..nmsgs {
1341            let bias = (nmsgs - i) / 3;
1342            let m = create_smsg(
1343                &a2,
1344                &a1,
1345                &mut w1,
1346                i as u64,
1347                TEST_GAS_LIMIT,
1348                (1 + i % 3 + bias) as u64,
1349            );
1350            mpool.add(m).unwrap();
1351            let m = create_smsg(
1352                &a1,
1353                &a2,
1354                &mut w2,
1355                i as u64,
1356                TEST_GAS_LIMIT,
1357                (1 + i % 3 + bias) as u64,
1358            );
1359            mpool.add(m).unwrap();
1360        }
1361
1362        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1363
1364        assert_eq!(msgs.len(), 20);
1365
1366        let mut next_nonce = 0;
1367        for msg in msgs.iter().take(10) {
1368            assert_eq!(
1369                msg.from(),
1370                a1,
1371                "first 10 returned messages should be from actor a1"
1372            );
1373            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1374            next_nonce += 1;
1375        }
1376        next_nonce = 0;
1377        for msg in msgs.iter().take(20).skip(10) {
1378            assert_eq!(
1379                msg.from(),
1380                a2,
1381                "next 10 returned messages should be from actor a2"
1382            );
1383            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1384            next_nonce += 1;
1385        }
1386    }
1387
1388    #[tokio::test]
1389    async fn test_optimal_msg_selection1() {
1390        // this test uses just a single actor sending messages with a low tq
1391        // the chain dependent merging algorithm should pick messages from the actor
1392        // from the start
1393        let mut joinset = JoinSet::new();
1394        let mpool = make_test_mpool(&mut joinset);
1395        let ts = mock_tipset(&mpool).await;
1396        let api = mpool.api.clone();
1397
1398        // create two actors
1399        let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1400        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1401        let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1402        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1403
1404        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1405        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1406
1407        let n_msgs = 10 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1408
1409        // we create n_msgs messages from each actor to another, with the first actor paying
1410        // higher gas prices than the second; we expect message selection to
1411        // order his messages first
1412        for i in 0..(n_msgs as usize) {
1413            let bias = (n_msgs as usize - i) / 3;
1414            let m = create_fake_smsg(
1415                &mpool,
1416                &a2,
1417                &a1,
1418                i as u64,
1419                TEST_GAS_LIMIT,
1420                (1 + i % 3 + bias) as u64,
1421            );
1422            mpool.add(m).unwrap();
1423        }
1424
1425        let msgs = mpool.select_messages(&ts, 0.25).unwrap();
1426
1427        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1428
1429        assert_eq!(msgs.len(), expected_msgs as usize);
1430
1431        for (next_nonce, m) in msgs.into_iter().enumerate() {
1432            assert_eq!(m.from(), a1, "Expected message from a1");
1433            assert_eq!(
1434                m.message().sequence,
1435                next_nonce as u64,
1436                "expected nonce {} but got {}",
1437                next_nonce,
1438                m.message().sequence
1439            );
1440        }
1441    }
1442
1443    #[tokio::test]
1444    async fn test_optimal_msg_selection2() {
1445        let mut joinset = JoinSet::new();
1446        // this test uses two actors sending messages to each other, with the first
1447        // actor paying (much) higher gas premium than the second.
1448        // We select with a low ticket quality; the chain dependent merging algorithm
1449        // should pick messages from the second actor from the start
1450        let mpool = make_test_mpool(&mut joinset);
1451        let ts = mock_tipset(&mpool).await;
1452        let api = mpool.api.clone();
1453
1454        // create two actors
1455        let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1456        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1457        let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1458        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1459
1460        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1)); // in FIL
1461        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1)); // in FIL
1462
1463        let n_msgs = 5 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1464        for i in 0..n_msgs as usize {
1465            let bias = (n_msgs as usize - i) / 3;
1466            let m = create_fake_smsg(
1467                &mpool,
1468                &a2,
1469                &a1,
1470                i as u64,
1471                TEST_GAS_LIMIT,
1472                (200000 + i % 3 + bias) as u64,
1473            );
1474            mpool.add(m).unwrap();
1475            let m = create_fake_smsg(
1476                &mpool,
1477                &a1,
1478                &a2,
1479                i as u64,
1480                TEST_GAS_LIMIT,
1481                (190000 + i % 3 + bias) as u64,
1482            );
1483            mpool.add(m).unwrap();
1484        }
1485
1486        let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1487
1488        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1489        assert_eq!(
1490            msgs.len(),
1491            expected_msgs as usize,
1492            "Expected {} messages, but got {}",
1493            expected_msgs,
1494            msgs.len()
1495        );
1496
1497        let mut n_from1 = 0;
1498        let mut n_from2 = 0;
1499        let mut next_nonce1 = 0;
1500        let mut next_nonce2 = 0;
1501
1502        for m in msgs {
1503            if m.from() == a1 {
1504                if m.message.sequence != next_nonce1 {
1505                    panic!(
1506                        "Expected nonce {}, but got {}",
1507                        next_nonce1, m.message.sequence
1508                    );
1509                }
1510                next_nonce1 += 1;
1511                n_from1 += 1;
1512            } else {
1513                if m.message.sequence != next_nonce2 {
1514                    panic!(
1515                        "Expected nonce {}, but got {}",
1516                        next_nonce2, m.message.sequence
1517                    );
1518                }
1519                next_nonce2 += 1;
1520                n_from2 += 1;
1521            }
1522        }
1523
1524        if n_from1 > n_from2 {
1525            panic!("Expected more msgs from a2 than a1");
1526        }
1527    }
1528
1529    #[tokio::test]
1530    async fn test_optimal_msg_selection3() {
1531        let mut joinset = JoinSet::new();
1532        // this test uses 10 actors sending a block of messages to each other, with the
1533        // the first actors paying higher gas premium than the subsequent
1534        // actors. We select with a low ticket quality; the chain dependent
1535        // merging algorithm should pick messages from the median actor from the
1536        // start
1537        let mpool = make_test_mpool(&mut joinset);
1538        let ts = mock_tipset(&mpool).await;
1539        let api = mpool.api.clone();
1540
1541        let n_actors = 10;
1542
1543        let mut actors = vec![];
1544        let mut wallets = vec![];
1545
1546        for _ in 0..n_actors {
1547            let mut wallet = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1548            let actor = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1549
1550            actors.push(actor);
1551            wallets.push(wallet);
1552        }
1553
1554        for a in &mut actors {
1555            api.set_state_balance_raw(a, TokenAmount::from_whole(1));
1556        }
1557
1558        let n_msgs = 1 + crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1559        for i in 0..n_msgs {
1560            for j in 0..n_actors {
1561                let premium =
1562                    500000 + 10000 * (n_actors - j) + (n_msgs + 2 - i) / (30 * n_actors) + i % 3;
1563                let m = create_fake_smsg(
1564                    &mpool,
1565                    &actors[j as usize],
1566                    &actors[j as usize],
1567                    i as u64,
1568                    TEST_GAS_LIMIT,
1569                    premium as u64,
1570                );
1571                mpool.add(m).unwrap();
1572            }
1573        }
1574
1575        let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1576        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1577
1578        assert_eq!(
1579            msgs.len(),
1580            expected_msgs as usize,
1581            "Expected {} messages, but got {}",
1582            expected_msgs,
1583            msgs.len()
1584        );
1585
1586        let who_is = |addr| -> usize {
1587            for (i, a) in actors.iter().enumerate() {
1588                if a == &addr {
1589                    return i;
1590                }
1591            }
1592            // Lotus has -1, but since we don't have -ve indexes, set it some unrealistic
1593            // number
1594            9999999
1595        };
1596
1597        let mut nonces = vec![0; n_actors as usize];
1598        for m in &msgs {
1599            let who = who_is(m.from());
1600            if who < 3 {
1601                panic!("got message from {who}th actor",);
1602            }
1603
1604            let next_nonce: u64 = nonces[who];
1605            if m.message.sequence != next_nonce {
1606                panic!(
1607                    "expected nonce {} but got {}",
1608                    next_nonce, m.message.sequence
1609                );
1610            }
1611            nonces[who] += 1;
1612        }
1613    }
1614}